公众号skill消息推送架构优化:从单机到分布式的高并发解决方案

2次阅读
没有评论

共计 2127 个字符,预计需要花费 6 分钟才能阅读完成。

image.webp

背景痛点

最初我们的公众号 skill 消息推送系统采用简单的单机架构,随着用户量从几千暴增到百万级别,逐渐暴露出严重问题:

公众号 skill 消息推送架构优化:从单机到分布式的高并发解决方案

  • 消息积压严重:高峰期单机 Redis 队列积压超过 10 万条
  • 推送延迟明显:用户收到消息的延迟从秒级恶化到分钟级
  • 故障影响面大:单点故障导致整个服务不可用

传统架构的核心瓶颈在于:
1. 同步阻塞处理模型
2. 单节点处理能力上限
3. 无法弹性扩展资源

技术选型

消息队列对比

  • RabbitMQ
  • 优点:协议完善,管理界面友好
  • 缺点:队列积压时性能下降明显

  • Kafka

  • 优点:高吞吐、低延迟、持久化
  • 适合场景:日志收集、消息推送

最终选择 Kafka 的核心考虑:
1. 支持百万级 TPS
2. 消息持久化能力
3. 完善的分布式机制

核心实现

Kafka 削峰配置

// Producer 配置示例
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("acks", "1"); // 平衡可靠性与性能
props.put("retries", 3); // 网络波动时自动重试
props.put("linger.ms", 5); // 适当批量发送
props.put("max.in.flight.requests.per.connection", 1); // 保证顺序性 

关键参数说明:
– 分区数 = 预期峰值吞吐量 / 单分区处理能力
– 建议设置 replication-factor ≥ 2

Redis 缓存设计

采用双 Key 策略:
– user:123:status → 最新状态
– user:123:history → 推送历史记录

使用 Lua 脚本保证原子操作:

local current = redis.call('GET', KEYS[1])
if current == ARGV[1] then
    return 0 
else
    redis.call('SET', KEYS[1], ARGV[1])
    redis.call('RPUSH', KEYS[2], ARGV[2])
    return 1
end

消费者集群实现

Spring Cloud 集成关键配置:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: skill-push
          group: push-group
          consumer:
            concurrency: 4 # 每个实例线程数
      kafka:
        binder:
          brokers: kafka1:9092,kafka2:9092

代码示例

幂等处理注解

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Idempotent {String key(); // 幂等键表达式
    long ttl() default 3600; // 有效期 ( 秒)
}

// 使用 AOP 实现拦截
@Around("@annotation(idempotent)")
public Object checkIdempotent(ProceedingJoinPoint pjp, Idempotent idempotent) {String key = parseKey(pjp, idempotent.key());
    if (redis.setnx(key, "1")) {redis.expire(key, idempotent.ttl());
        return pjp.proceed();}
    throw new IdempotentException("重复请求");
}

批量消费逻辑

@KafkaListener(topics = "skill-push")
public void batchConsume(List<Message> messages) {
    // 1. 按用户分组
    Map<Long, List<Message>> grouped = messages.stream()
        .collect(Collectors.groupingBy(Message::getUserId));

    // 2. 并行处理
    grouped.entrySet().parallelStream().forEach(entry -> {Long userId = entry.getKey();
        List<Message> userMessages = entry.getValue();
        // 合并推送逻辑...
    });
}

性能考量

压测环境:
– 8 核 16G 服务器 × 3
– Kafka 3 节点集群

测试结果对比:
| 指标 | 单机架构 | 新架构 |
|————-|———|——–|
| 峰值 QPS | 1,200 | 28,000 |
| 平均延迟 | 850ms | 65ms |
| 99 线延迟 | 2.1s | 120ms |

避坑指南

  1. 分区数设置
  2. 建议公式:分区数 = max(消费者数量, 峰值 QPS/ 单分区处理能力)
  3. 生产环境建议至少预留 20% 缓冲

  4. Offset 管理

  5. 禁用 auto.commit
  6. 采用手动提交 + 本地存储组合方案

  7. 时钟同步

  8. 所有节点部署 NTP 服务
  9. 关键日志记录机器时间与时区

开放思考

当消息量继续增长到百万级 QPS 时:
1. 如何设计跨机房部署方案?
2. 是否需要引入 Pulsar 等新消息系统?
3. 客户端协议能否从 HTTP 升级到 WebSocket?

优化之路永无止境,期待与大家共同探讨更优解。

正文完
 0
评论(没有评论)