Skip to content

消息队列设计核心要点

消息队列架构概述

设计一个消息队列系统需要从多个维度进行考量,包括架构设计、消息可靠性、高性能实现以及功能扩展等方面。

核心架构组成

mermaid
graph TD
    subgraph 生产端
        A["Producer生产者"]
    end
    
    subgraph Broker服务端
        B["消息接收"]
        C["消息存储"]
        D["消息索引"]
        E["消息分发"]
    end
    
    subgraph 消费端
        F["Consumer消费者"]
        G["消费者组"]
    end
    
    subgraph 元数据管理
        H["Topic管理"]
        I["分区管理"]
        J["消费进度"]
    end
    
    A --> B
    B --> C
    C --> D
    D --> E
    E --> F
    E --> G
    
    H -.-> C
    I -.-> C
    J -.-> F
    
    style A fill:#4A90E2,color:#fff,rx:10,ry:10
    style C fill:#27AE60,color:#fff,rx:10,ry:10
    style F fill:#E74C3C,color:#fff,rx:10,ry:10
    style G fill:#E67E22,color:#fff,rx:10,ry:10

核心概念定义

概念说明作用
Producer消息生产者负责发送消息到Broker
Consumer消息消费者负责从Broker获取消息
Broker消息服务端负责消息存储、转发
Topic消息主题消息分类标识
Partition分区Topic的物理分割,提升并行度
Consumer Group消费者组多消费者协作消费

消息存储设计

存储介质选择

mermaid
graph LR
    A["存储方式"] --> B["内存存储"]
    A --> C["磁盘存储"]
    A --> D["混合存储"]
    
    B --> B1["速度快"]
    B --> B2["易丢失"]
    
    C --> C1["持久化"]
    C --> C2["相对较慢"]
    
    D --> D1["写内存"]
    D --> D2["异步刷盘"]
    
    style A fill:#4A90E2,color:#fff,rx:10,ry:10
    style B fill:#E74C3C,color:#fff,rx:10,ry:10
    style C fill:#27AE60,color:#fff,rx:10,ry:10
    style D fill:#9B59B6,color:#fff,rx:10,ry:10

生产级方案通常采用混合存储

  • 消息先写入内存缓冲区
  • 异步或同步刷盘到磁盘
  • 通过配置平衡性能与可靠性

顺序写入优化

磁盘顺序写性能可以接近内存随机写,这是消息队列高性能的关键:

java
public class MessageStore {
    
    private final FileChannel fileChannel;
    private final ByteBuffer writeBuffer;
    private final AtomicLong wrotePosition = new AtomicLong(0);
    
    /**
     * 追加消息到存储文件
     */
    public long appendMessage(byte[] messageBody) throws IOException {
        long currentPos = wrotePosition.get();
        
        // 消息格式: 长度(4字节) + 内容
        int msgSize = 4 + messageBody.length;
        ByteBuffer buffer = ByteBuffer.allocate(msgSize);
        buffer.putInt(messageBody.length);
        buffer.put(messageBody);
        buffer.flip();
        
        // 顺序写入
        synchronized (this) {
            fileChannel.position(currentPos);
            fileChannel.write(buffer);
            wrotePosition.addAndGet(msgSize);
        }
        
        return currentPos;  // 返回消息偏移量
    }
    
    /**
     * 刷盘策略
     */
    public void flush(boolean sync) throws IOException {
        if (sync) {
            fileChannel.force(false);  // 同步刷盘
        }
        // 异步刷盘由后台线程定期执行
    }
}

零拷贝技术

使用零拷贝减少数据在用户空间和内核空间的复制:

mermaid
graph LR
    subgraph 传统方式
        A1["磁盘"] --> B1["内核缓冲区"]
        B1 --> C1["用户缓冲区"]
        C1 --> D1["Socket缓冲区"]
        D1 --> E1["网卡"]
    end
    
    subgraph 零拷贝
        A2["磁盘"] --> B2["内核缓冲区"]
        B2 --> C2["网卡"]
    end
    
    style A1 fill:#E74C3C,color:#fff,rx:10,ry:10
    style A2 fill:#27AE60,color:#fff,rx:10,ry:10
java
public class ZeroCopyTransfer {
    
    /**
     * 使用FileChannel的transferTo实现零拷贝
     */
    public long transferToSocket(FileChannel fileChannel, 
                                  SocketChannel socketChannel,
                                  long position, 
                                  long count) throws IOException {
        return fileChannel.transferTo(position, count, socketChannel);
    }
    
    /**
     * 使用MappedByteBuffer内存映射
     */
    public MappedByteBuffer mapFile(FileChannel fileChannel, 
                                     long position, 
                                     long size) throws IOException {
        return fileChannel.map(FileChannel.MapMode.READ_ONLY, position, size);
    }
}

消息可靠性保障

生产端可靠性

mermaid
sequenceDiagram
    participant Producer as 生产者
    participant Broker as Broker主节点
    participant Slave as Broker从节点
    
    Producer->>Broker: 发送消息
    Broker->>Broker: 写入本地存储
    
    alt 同步复制
        Broker->>Slave: 同步复制到从节点
        Slave-->>Broker: 复制成功确认
        Broker-->>Producer: 发送成功ACK
    else 异步复制
        Broker-->>Producer: 发送成功ACK
        Broker->>Slave: 异步复制到从节点
    end

确认机制配置

java
public class ProducerConfig {
    
    public enum AckMode {
        NONE(0),      // 不等待确认,最快但可能丢消息
        LEADER(1),    // 等待主节点确认
        ALL(-1);      // 等待所有副本确认,最安全
        
        private final int value;
    }
    
    private AckMode ackMode = AckMode.LEADER;
    private int retries = 3;
    private long retryBackoffMs = 100;
}

消费端可靠性

mermaid
graph TD
    A["消费消息"] --> B["业务处理"]
    B --> C{"处理结果"}
    
    C -->|成功| D["提交偏移量"]
    C -->|失败| E{"重试策略"}
    
    E -->|可重试| F["重新消费"]
    E -->|不可重试| G["进入死信队列"]
    
    D --> H["更新消费进度"]
    
    style A fill:#4A90E2,color:#fff,rx:10,ry:10
    style D fill:#27AE60,color:#fff,rx:10,ry:10
    style G fill:#E74C3C,color:#fff,rx:10,ry:10

消费确认实现

java
@Service
public class MessageConsumer {
    
    /**
     * 手动确认消费
     */
    public void consumeWithManualAck(MessageRecord record) {
        try {
            // 业务处理
            processMessage(record.getBody());
            
            // 处理成功,提交偏移量
            consumer.commitSync(Collections.singletonMap(
                record.getTopicPartition(),
                new OffsetAndMetadata(record.getOffset() + 1)
            ));
            
        } catch (RetryableException e) {
            // 可重试异常,不提交偏移量,等待重新消费
            log.warn("消息处理失败,将重试: {}", record.getOffset());
            
        } catch (Exception e) {
            // 不可恢复异常,发送到死信队列
            sendToDeadLetterQueue(record);
            // 提交偏移量,避免阻塞
            consumer.commitSync(Collections.singletonMap(
                record.getTopicPartition(),
                new OffsetAndMetadata(record.getOffset() + 1)
            ));
        }
    }
}

消息分发机制

点对点与广播模式

mermaid
graph TD
    subgraph 点对点模式
        A1["Topic A"] --> B1["Consumer Group"]
        B1 --> C1["Consumer 1"]
        B1 --> D1["Consumer 2"]
        B1 --> E1["Consumer 3"]
    end
    
    subgraph 广播模式
        A2["Topic B"] --> C2["Consumer 1"]
        A2 --> D2["Consumer 2"]
        A2 --> E2["Consumer 3"]
    end
    
    style A1 fill:#4A90E2,color:#fff,rx:10,ry:10
    style A2 fill:#9B59B6,color:#fff,rx:10,ry:10
    style B1 fill:#E67E22,color:#fff,rx:10,ry:10

点对点模式:同一消费者组内的消费者竞争消费,每条消息只被一个消费者处理。

广播模式:每条消息被所有消费者接收处理。

推拉模式选择

mermaid
graph LR
    subgraph 推模式-Push
        A1["Broker"] -->|主动推送| B1["Consumer"]
    end
    
    subgraph 拉模式-Pull
        A2["Consumer"] -->|主动拉取| B2["Broker"]
    end
    
    subgraph 长轮询
        A3["Consumer"] -->|请求| B3["Broker"]
        B3 -->|有消息立即返回<br/>无消息挂起等待| A3
    end
    
    style A1 fill:#E74C3C,color:#fff,rx:10,ry:10
    style A2 fill:#27AE60,color:#fff,rx:10,ry:10
    style B3 fill:#9B59B6,color:#fff,rx:10,ry:10

长轮询实现:结合推拉优点的最佳实践

java
public class LongPollingConsumer {
    
    private static final long POLL_TIMEOUT_MS = 30000;
    
    /**
     * 长轮询拉取消息
     */
    public List<Message> poll() {
        long startTime = System.currentTimeMillis();
        
        while (System.currentTimeMillis() - startTime < POLL_TIMEOUT_MS) {
            // 尝试拉取消息
            List<Message> messages = broker.fetchMessages(
                topic, partition, offset, maxBytes);
            
            if (!messages.isEmpty()) {
                return messages;
            }
            
            // 无消息,短暂等待后重试
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        
        return Collections.emptyList();
    }
}

高级功能设计

顺序消息

保证消息按发送顺序消费:

mermaid
graph TD
    A["订单消息"] --> B{"路由策略"}
    
    B -->|订单ID取模| C["Partition 0"]
    B -->|订单ID取模| D["Partition 1"]
    B -->|订单ID取模| E["Partition 2"]
    
    C --> F["Consumer 0"]
    D --> G["Consumer 1"]
    E --> H["Consumer 2"]
    
    style A fill:#4A90E2,color:#fff,rx:10,ry:10
    style B fill:#E67E22,color:#fff,rx:10,ry:10

实现关键

  • 相同业务Key的消息路由到同一分区
  • 单分区内保证顺序
  • 消费者单线程处理分区消息

延迟消息

mermaid
graph TD
    A["延迟消息"] --> B["延迟队列"]
    B --> C["时间轮/定时器"]
    C --> D{"到达时间?"}
    D -->|是| E["投递到目标Topic"]
    D -->|否| F["继续等待"]
    
    style A fill:#4A90E2,color:#fff,rx:10,ry:10
    style C fill:#9B59B6,color:#fff,rx:10,ry:10
    style E fill:#27AE60,color:#fff,rx:10,ry:10

延迟队列实现思路

java
public class DelayMessageScheduler {
    
    // 延迟级别: 1s, 5s, 10s, 30s, 1m, 5m, 10m, 30m, 1h, 2h
    private static final long[] DELAY_LEVELS = {
        1000, 5000, 10000, 30000, 60000, 
        300000, 600000, 1800000, 3600000, 7200000
    };
    
    /**
     * 发送延迟消息
     */
    public void sendDelayMessage(Message message, int delayLevel) {
        // 计算投递时间
        long deliverTime = System.currentTimeMillis() + 
            DELAY_LEVELS[delayLevel - 1];
        message.setDeliverTime(deliverTime);
        
        // 发送到延迟Topic
        String delayTopic = "SCHEDULE_TOPIC_" + delayLevel;
        producer.send(delayTopic, message);
    }
    
    /**
     * 定时扫描到期消息
     */
    @Scheduled(fixedRate = 1000)
    public void scanDelayMessages() {
        for (int level = 1; level <= DELAY_LEVELS.length; level++) {
            String delayTopic = "SCHEDULE_TOPIC_" + level;
            List<Message> messages = consumer.poll(delayTopic);
            
            for (Message msg : messages) {
                if (msg.getDeliverTime() <= System.currentTimeMillis()) {
                    // 投递到真实Topic
                    producer.send(msg.getRealTopic(), msg);
                }
            }
        }
    }
}

事务消息

保证本地事务与消息发送的一致性:

mermaid
sequenceDiagram
    participant Producer as 生产者
    participant Broker as Broker
    participant DB as 本地数据库
    
    Producer->>Broker: 1. 发送半消息
    Broker-->>Producer: 2. 返回半消息发送结果
    
    Producer->>DB: 3. 执行本地事务
    
    alt 本地事务成功
        Producer->>Broker: 4a. 提交消息(Commit)
        Broker->>Broker: 投递给消费者
    else 本地事务失败
        Producer->>Broker: 4b. 回滚消息(Rollback)
        Broker->>Broker: 删除半消息
    end
    
    Note over Broker: 定期回查未确认的半消息
    Broker->>Producer: 5. 回查事务状态
    Producer-->>Broker: 6. 返回Commit/Rollback

消息堆积处理

堆积监控

java
@Component
public class MessageBacklogMonitor {
    
    @Scheduled(fixedRate = 60000)
    public void checkBacklog() {
        Map<String, Long> topicBacklog = new HashMap<>();
        
        for (TopicPartition tp : consumer.assignment()) {
            long currentOffset = consumer.position(tp);
            long endOffset = consumer.endOffsets(
                Collections.singleton(tp)).get(tp);
            
            long backlog = endOffset - currentOffset;
            topicBacklog.merge(tp.topic(), backlog, Long::sum);
        }
        
        for (Map.Entry<String, Long> entry : topicBacklog.entrySet()) {
            if (entry.getValue() > 100000) {
                alertService.sendAlert(
                    String.format("Topic[%s]消息堆积: %d条", 
                        entry.getKey(), entry.getValue()));
            }
        }
    }
}

堆积应急处理

mermaid
graph TD
    A["发现消息堆积"] --> B["评估堆积原因"]
    
    B --> C["消费能力不足"]
    B --> D["下游系统故障"]
    B --> E["消费逻辑异常"]
    
    C --> F["扩容消费者"]
    D --> G["临时降级/跳过"]
    E --> H["修复代码重启"]
    
    F --> I["增加消费者实例"]
    F --> J["增加消费线程"]
    
    style A fill:#E74C3C,color:#fff,rx:10,ry:10
    style F fill:#27AE60,color:#fff,rx:10,ry:10
    style G fill:#E67E22,color:#fff,rx:10,ry:10
    style H fill:#9B59B6,color:#fff,rx:10,ry:10

更新: 2025-12-06 17:32:32
原文: https://www.yuque.com/u22210564/zoxfmt/edbwn2psgby1tnse

Java 后端面试知识库