Skip to content

大规模消息推送系统设计

业务场景与技术挑战

以大型电商平台为例,每逢双十一、618等大促活动,需要向千万级用户精准推送活动预热和开抢提醒。这类场景看似简单,实际包含多个技术挑战:

mermaid
graph TD
    A["大规模消息推送"] --> B["数据筛选"]
    A --> C["推送执行"]
    A --> D["质量保障"]
    
    B --> B1["高效查询目标用户"]
    B --> B2["避免数据库压力过大"]
    
    C --> C1["多渠道投递"]
    C --> C2["流量控制"]
    C --> C3["并发处理"]
    
    D --> D1["防重复推送"]
    D --> D2["防骚扰控制"]
    D --> D3["失败重试"]
    D --> D4["效果追踪"]
    
    style A fill:#4A90E2,color:#fff,rx:10,ry:10
    style B fill:#E74C3C,color:#fff,rx:10,ry:10
    style C fill:#9B59B6,color:#fff,rx:10,ry:10
    style D fill:#27AE60,color:#fff,rx:10,ry:10

关于数据规模的理性认识:千万级用户数据虽然看起来庞大,但通过合理的分批处理和索引优化,完全可以在可控时间内完成推送,关键在于任务拆分和流量控制。

推送任务与投递日志设计

大规模消息系统通常采用任务+日志的双表设计模式,将触达任务与实际投递过程分离,便于任务调度和效果追踪。

批次任务表

记录每一次推送活动的整体信息:

sql
CREATE TABLE push_batch (
    batch_id VARCHAR(32) PRIMARY KEY COMMENT '批次编号',
    campaign_name VARCHAR(128) NOT NULL COMMENT '活动名称',
    trigger_type ENUM('SCHEDULED','MANUAL','EVENT') NOT NULL COMMENT '触发方式',
    audience_sql TEXT COMMENT '目标人群筛选条件',
    template_code VARCHAR(64) NOT NULL COMMENT '消息模板编码',
    planned_count INT UNSIGNED DEFAULT 0 COMMENT '计划推送数量',
    success_count INT UNSIGNED DEFAULT 0 COMMENT '成功数量',
    fail_count INT UNSIGNED DEFAULT 0 COMMENT '失败数量',
    exec_status TINYINT DEFAULT 0 COMMENT '执行状态: 0-待执行 1-执行中 2-已完成 3-已取消',
    scheduled_at DATETIME COMMENT '计划执行时间',
    started_at DATETIME COMMENT '实际开始时间',
    finished_at DATETIME COMMENT '执行完成时间',
    operator VARCHAR(64) COMMENT '操作人',
    gmt_create DATETIME DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_exec_status(exec_status),
    INDEX idx_scheduled(scheduled_at)
) COMMENT='推送批次任务表';

投递明细表

记录每条消息的投递轨迹:

sql
CREATE TABLE delivery_log (
    log_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    batch_id VARCHAR(32) NOT NULL COMMENT '关联批次',
    recipient_id BIGINT NOT NULL COMMENT '接收人ID',
    contact_way VARCHAR(64) COMMENT '联系方式(手机/邮箱等)',
    dispatch_channel VARCHAR(16) NOT NULL COMMENT '投递通道',
    msg_body TEXT COMMENT '实际发送内容',
    vendor_resp_code VARCHAR(32) COMMENT '服务商响应码',
    vendor_msg_id VARCHAR(128) COMMENT '服务商流水号',
    delivery_state TINYINT DEFAULT 0 COMMENT '投递状态: 0-排队中 1-已发出 2-送达 3-失败',
    fail_reason VARCHAR(512) COMMENT '失败原因',
    attempt_times TINYINT DEFAULT 1 COMMENT '尝试次数',
    dispatched_at DATETIME COMMENT '发出时间',
    arrived_at DATETIME COMMENT '送达时间',
    opened_at DATETIME COMMENT '打开时间',
    gmt_create DATETIME DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_batch(batch_id),
    INDEX idx_recipient(recipient_id),
    INDEX idx_state_create(delivery_state, gmt_create)
) COMMENT='消息投递明细日志';

这种双表设计的优势在于:批次表支撑运营侧的任务管理和数据看板,明细表则用于追踪单条消息的完整生命周期,两者职责清晰。

目标用户筛选策略

高效查询设计

以大促活动通知为例,需要筛选出符合条件的目标用户(如近30天有浏览行为、加购未下单的用户):

sql
-- 筛选近期活跃且有潜在购买意向的用户
SELECT u.id, u.user_id, u.phone, u.push_token
FROM user_profile u
INNER JOIN user_behavior b ON u.user_id = b.user_id
WHERE b.last_visit_time >= CURDATE() - INTERVAL 30 DAY
  AND b.cart_item_count > 0
  AND u.push_enabled = 1;

注意事项

  • 避免使用DATEDIFF等函数,会导致索引失效
  • last_visit_time字段上建立索引
  • 通过业务条件过滤,减少推送范围

分布式任务分片

当数据量较大时,可以借助分布式任务框架进行分片处理:

mermaid
graph TD
    A["XXL-JOB调度"] --> B["分片广播"]
    
    B --> C["节点1<br/>处理尾号0,1"]
    B --> D["节点2<br/>处理尾号2,3"]
    B --> E["节点3<br/>处理尾号4,5"]
    B --> F["..."]
    B --> G["节点5<br/>处理尾号8,9"]
    
    C --> H["扫描用户"]
    D --> H
    E --> H
    G --> H
    
    H --> I["投递MQ"]
    
    style A fill:#4A90E2,color:#fff,rx:10,ry:10
    style B fill:#9B59B6,color:#fff,rx:10,ry:10
    style C fill:#27AE60,color:#fff,rx:10,ry:10
    style D fill:#27AE60,color:#fff,rx:10,ry:10
    style E fill:#27AE60,color:#fff,rx:10,ry:10
    style G fill:#27AE60,color:#fff,rx:10,ry:10

分片任务实现

java
@XxlJob("promotionPushJob")
public void executeShardingTask() {
    // 获取分片参数
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();
    
    log.info("执行分片任务: {}/{}", shardIndex, shardTotal);
    
    // 按用户ID尾号分片查询目标用户
    List<UserProfile> targetUsers = userProfileMapper.selectActiveUsers(
        LocalDate.now().minusDays(30),
        shardIndex,
        shardTotal
    );
    
    // 批量投递消息队列
    for (UserProfile user : targetUsers) {
        mqTemplate.asyncSend("promotion-notify-topic", 
            MessageBuilder.withPayload(user.getUserId()).build());
    }
    
    XxlJobHelper.log("处理完成,共{}条", targetUsers.size());
}

优化like查询

如果使用用户ID尾号分片,可以通过冗余字段优化查询:

sql
-- 添加逆序用户ID字段,支持前缀匹配
ALTER TABLE user_profile ADD COLUMN reverse_uid VARCHAR(32) 
    COMMENT '用户ID逆序存储,用于分片查询';

-- 查询尾号为0的活跃用户(利用逆序后的前缀匹配)
SELECT u.id, u.user_id, u.phone, u.push_token
FROM user_profile u
INNER JOIN user_behavior b ON u.user_id = b.user_id
WHERE b.last_visit_time >= ?
  AND u.reverse_uid LIKE '0%';

异步推送架构

任务解耦设计

将用户筛选和消息发送解耦,提升系统吞吐量:

mermaid
sequenceDiagram
    participant Scheduler as 定时调度
    participant Scanner as 扫描服务
    participant MQ as 消息队列
    participant Sender as 发送服务
    participant Channel as 推送渠道
    
    Scheduler->>Scanner: 触发扫描任务
    Scanner->>Scanner: 查询待通知用户
    Scanner->>MQ: 批量投递用户ID
    
    loop 消费处理
        MQ->>Sender: 拉取消息
        Sender->>Sender: 查询用户详情
        Sender->>Sender: 渲染消息模板
        Sender->>Channel: 调用推送接口
        Channel-->>Sender: 返回结果
        Sender->>Sender: 更新推送状态
    end

多渠道推送实现

java
@Service
public class NotificationService {
    
    @Autowired
    private Map<String, NotificationChannel> channels;
    
    @Autowired
    private NotificationRecordMapper recordMapper;
    
    /**
     * 发送通知消息
     */
    public void sendNotification(NotificationRequest request) {
        // 渲染消息内容
        String content = templateEngine.render(
            request.getTemplateCode(), 
            request.getParams());
        
        // 创建推送记录
        NotificationRecord record = new NotificationRecord();
        record.setUserId(request.getUserId());
        record.setNotificationType(request.getType());
        record.setChannel(request.getChannel());
        record.setContent(content);
        record.setStatus(NotificationStatus.PENDING);
        recordMapper.insert(record);
        
        try {
            // 获取对应渠道发送器
            NotificationChannel channel = channels.get(request.getChannel());
            SendResult result = channel.send(request.getTarget(), content);
            
            // 更新发送结果
            record.setStatus(result.isSuccess() ? 
                NotificationStatus.SENT : NotificationStatus.FAILED);
            record.setThirdMsgId(result.getMsgId());
            record.setSendTime(LocalDateTime.now());
            
        } catch (Exception e) {
            record.setStatus(NotificationStatus.FAILED);
            record.setErrorInfo(e.getMessage());
        }
        
        recordMapper.updateById(record);
    }
}

// 短信渠道实现
@Component("sms")
public class SmsChannel implements NotificationChannel {
    
    @Override
    public SendResult send(String phone, String content) {
        // 调用短信服务商API
        SmsResponse response = smsClient.send(phone, content);
        return new SendResult(response.isSuccess(), response.getMsgId());
    }
}

// 站内信渠道实现
@Component("inbox")
public class InboxChannel implements NotificationChannel {
    
    @Override
    public SendResult send(String userId, String content) {
        // 写入站内信表
        InboxMessage message = new InboxMessage();
        message.setUserId(Long.parseLong(userId));
        message.setContent(content);
        inboxMapper.insert(message);
        return new SendResult(true, message.getId().toString());
    }
}

防疲劳控制

疲劳度规则设计

防止用户被频繁打扰是消息系统的重要功能:

mermaid
graph TD
    A["推送请求"] --> B{"时段检查"}
    B -->|禁止时段| C["延迟到合适时段"]
    B -->|允许时段| D{"频率检查"}
    
    D -->|超过阈值| E["丢弃或延迟"]
    D -->|未超阈值| F{"渠道额度检查"}
    
    F -->|额度不足| G["降级其他渠道"]
    F -->|额度充足| H["正常发送"]
    
    H --> I["更新疲劳度计数"]
    
    style A fill:#4A90E2,color:#fff,rx:10,ry:10
    style C fill:#E67E22,color:#fff,rx:10,ry:10
    style E fill:#E74C3C,color:#fff,rx:10,ry:10
    style H fill:#27AE60,color:#fff,rx:10,ry:10

基于Redis的疲劳度控制

java
@Component
public class FatigueController {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    /**
     * 检查是否可以发送
     */
    public boolean canSend(Long userId, String channel, String type) {
        String key = buildFatigueKey(userId, channel, type);
        
        // 检查今日是否已发送
        Boolean exists = redisTemplate.hasKey(key);
        if (Boolean.TRUE.equals(exists)) {
            return false;
        }
        
        // 检查是否在禁止时段
        int hour = LocalTime.now().getHour();
        if (hour >= 22 || hour < 8) {
            return false;  // 晚10点到早8点不发送
        }
        
        // 检查总频率
        String totalKey = "fatigue:total:" + userId + ":" + 
            LocalDate.now().toString();
        Long count = redisTemplate.opsForValue().increment(totalKey);
        if (count != null && count > 5) {  // 每天最多5条
            return false;
        }
        
        return true;
    }
    
    /**
     * 记录发送
     */
    public void recordSend(Long userId, String channel, String type) {
        String key = buildFatigueKey(userId, channel, type);
        // 设置24小时过期
        redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS);
    }
    
    private String buildFatigueKey(Long userId, String channel, String type) {
        return String.format("fatigue:%s:%s:%d:%s", 
            channel, type, userId, LocalDate.now().toString());
    }
}

失败处理与重试机制

分级重试策略

mermaid
graph TD
    A["发送失败"] --> B{"失败类型判断"}
    
    B -->|可重试错误| C["加入重试队列"]
    B -->|不可重试错误| D["标记失败"]
    
    C --> E{"重试次数"}
    E -->|<= 3次| F["延迟重试"]
    E -->|> 3次| G["转人工处理"]
    
    F --> H["第1次: 5分钟后"]
    F --> I["第2次: 30分钟后"]
    F --> J["第3次: 2小时后"]
    
    style A fill:#E74C3C,color:#fff,rx:10,ry:10
    style C fill:#E67E22,color:#fff,rx:10,ry:10
    style D fill:#9B59B6,color:#fff,rx:10,ry:10
    style G fill:#4A90E2,color:#fff,rx:10,ry:10

定时扫描重试

java
@Scheduled(fixedRate = 300000)  // 每5分钟执行
public void retryFailedNotifications() {
    // 查询待重试的记录
    List<NotificationRecord> failedRecords = recordMapper.selectForRetry(
        NotificationStatus.FAILED,
        3,  // 最大重试次数
        LocalDateTime.now().minusMinutes(5)  // 距上次尝试5分钟以上
    );
    
    for (NotificationRecord record : failedRecords) {
        try {
            // 重新发送
            NotificationChannel channel = channels.get(record.getChannel());
            SendResult result = channel.send(
                getTarget(record), 
                record.getContent());
            
            if (result.isSuccess()) {
                record.setStatus(NotificationStatus.SENT);
                record.setSendTime(LocalDateTime.now());
            } else {
                record.setRetryCount(record.getRetryCount() + 1);
                record.setErrorInfo(result.getErrorMsg());
            }
        } catch (Exception e) {
            record.setRetryCount(record.getRetryCount() + 1);
            record.setErrorInfo(e.getMessage());
        }
        
        recordMapper.updateById(record);
    }
}

监控与告警

关键指标监控

建立完善的监控体系,及时发现异常:

指标类型具体指标告警阈值
成功率各渠道发送成功率< 95%
时效性平均发送延迟> 5分钟
堆积量待发送消息数量> 10万
重试率需要重试的比例> 10%

大规模失败告警

java
@Scheduled(fixedRate = 60000)
public void checkFailureRate() {
    // 统计最近10分钟的发送情况
    LocalDateTime startTime = LocalDateTime.now().minusMinutes(10);
    
    Map<String, SendStats> channelStats = recordMapper
        .countByChannelAndStatus(startTime);
    
    for (Map.Entry<String, SendStats> entry : channelStats.entrySet()) {
        String channel = entry.getKey();
        SendStats stats = entry.getValue();
        
        double failureRate = (double) stats.getFailedCount() / 
            stats.getTotalCount();
        
        if (failureRate > 0.1) {  // 失败率超过10%
            alertService.sendAlert(
                String.format("[告警]%s渠道失败率异常: %.2f%%", 
                    channel, failureRate * 100));
        }
    }
}

更新: 2025-12-06 17:30:56
原文: https://www.yuque.com/u22210564/zoxfmt/sglrak1lrbzzh0fo

Java 后端面试知识库