消息队列设计核心要点
消息队列架构概述
设计一个消息队列系统需要从多个维度进行考量,包括架构设计、消息可靠性、高性能实现以及功能扩展等方面。
核心架构组成
mermaid
graph TD
subgraph 生产端
A["Producer生产者"]
end
subgraph Broker服务端
B["消息接收"]
C["消息存储"]
D["消息索引"]
E["消息分发"]
end
subgraph 消费端
F["Consumer消费者"]
G["消费者组"]
end
subgraph 元数据管理
H["Topic管理"]
I["分区管理"]
J["消费进度"]
end
A --> B
B --> C
C --> D
D --> E
E --> F
E --> G
H -.-> C
I -.-> C
J -.-> F
style A fill:#4A90E2,color:#fff,rx:10,ry:10
style C fill:#27AE60,color:#fff,rx:10,ry:10
style F fill:#E74C3C,color:#fff,rx:10,ry:10
style G fill:#E67E22,color:#fff,rx:10,ry:10核心概念定义
| 概念 | 说明 | 作用 |
|---|---|---|
| Producer | 消息生产者 | 负责发送消息到Broker |
| Consumer | 消息消费者 | 负责从Broker获取消息 |
| Broker | 消息服务端 | 负责消息存储、转发 |
| Topic | 消息主题 | 消息分类标识 |
| Partition | 分区 | Topic的物理分割,提升并行度 |
| Consumer Group | 消费者组 | 多消费者协作消费 |
消息存储设计
存储介质选择
mermaid
graph LR
A["存储方式"] --> B["内存存储"]
A --> C["磁盘存储"]
A --> D["混合存储"]
B --> B1["速度快"]
B --> B2["易丢失"]
C --> C1["持久化"]
C --> C2["相对较慢"]
D --> D1["写内存"]
D --> D2["异步刷盘"]
style A fill:#4A90E2,color:#fff,rx:10,ry:10
style B fill:#E74C3C,color:#fff,rx:10,ry:10
style C fill:#27AE60,color:#fff,rx:10,ry:10
style D fill:#9B59B6,color:#fff,rx:10,ry:10生产级方案通常采用混合存储:
- 消息先写入内存缓冲区
- 异步或同步刷盘到磁盘
- 通过配置平衡性能与可靠性
顺序写入优化
磁盘顺序写性能可以接近内存随机写,这是消息队列高性能的关键:
java
public class MessageStore {
private final FileChannel fileChannel;
private final ByteBuffer writeBuffer;
private final AtomicLong wrotePosition = new AtomicLong(0);
/**
* 追加消息到存储文件
*/
public long appendMessage(byte[] messageBody) throws IOException {
long currentPos = wrotePosition.get();
// 消息格式: 长度(4字节) + 内容
int msgSize = 4 + messageBody.length;
ByteBuffer buffer = ByteBuffer.allocate(msgSize);
buffer.putInt(messageBody.length);
buffer.put(messageBody);
buffer.flip();
// 顺序写入
synchronized (this) {
fileChannel.position(currentPos);
fileChannel.write(buffer);
wrotePosition.addAndGet(msgSize);
}
return currentPos; // 返回消息偏移量
}
/**
* 刷盘策略
*/
public void flush(boolean sync) throws IOException {
if (sync) {
fileChannel.force(false); // 同步刷盘
}
// 异步刷盘由后台线程定期执行
}
}零拷贝技术
使用零拷贝减少数据在用户空间和内核空间的复制:
mermaid
graph LR
subgraph 传统方式
A1["磁盘"] --> B1["内核缓冲区"]
B1 --> C1["用户缓冲区"]
C1 --> D1["Socket缓冲区"]
D1 --> E1["网卡"]
end
subgraph 零拷贝
A2["磁盘"] --> B2["内核缓冲区"]
B2 --> C2["网卡"]
end
style A1 fill:#E74C3C,color:#fff,rx:10,ry:10
style A2 fill:#27AE60,color:#fff,rx:10,ry:10java
public class ZeroCopyTransfer {
/**
* 使用FileChannel的transferTo实现零拷贝
*/
public long transferToSocket(FileChannel fileChannel,
SocketChannel socketChannel,
long position,
long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}
/**
* 使用MappedByteBuffer内存映射
*/
public MappedByteBuffer mapFile(FileChannel fileChannel,
long position,
long size) throws IOException {
return fileChannel.map(FileChannel.MapMode.READ_ONLY, position, size);
}
}消息可靠性保障
生产端可靠性
mermaid
sequenceDiagram
participant Producer as 生产者
participant Broker as Broker主节点
participant Slave as Broker从节点
Producer->>Broker: 发送消息
Broker->>Broker: 写入本地存储
alt 同步复制
Broker->>Slave: 同步复制到从节点
Slave-->>Broker: 复制成功确认
Broker-->>Producer: 发送成功ACK
else 异步复制
Broker-->>Producer: 发送成功ACK
Broker->>Slave: 异步复制到从节点
end确认机制配置:
java
public class ProducerConfig {
public enum AckMode {
NONE(0), // 不等待确认,最快但可能丢消息
LEADER(1), // 等待主节点确认
ALL(-1); // 等待所有副本确认,最安全
private final int value;
}
private AckMode ackMode = AckMode.LEADER;
private int retries = 3;
private long retryBackoffMs = 100;
}消费端可靠性
mermaid
graph TD
A["消费消息"] --> B["业务处理"]
B --> C{"处理结果"}
C -->|成功| D["提交偏移量"]
C -->|失败| E{"重试策略"}
E -->|可重试| F["重新消费"]
E -->|不可重试| G["进入死信队列"]
D --> H["更新消费进度"]
style A fill:#4A90E2,color:#fff,rx:10,ry:10
style D fill:#27AE60,color:#fff,rx:10,ry:10
style G fill:#E74C3C,color:#fff,rx:10,ry:10消费确认实现:
java
@Service
public class MessageConsumer {
/**
* 手动确认消费
*/
public void consumeWithManualAck(MessageRecord record) {
try {
// 业务处理
processMessage(record.getBody());
// 处理成功,提交偏移量
consumer.commitSync(Collections.singletonMap(
record.getTopicPartition(),
new OffsetAndMetadata(record.getOffset() + 1)
));
} catch (RetryableException e) {
// 可重试异常,不提交偏移量,等待重新消费
log.warn("消息处理失败,将重试: {}", record.getOffset());
} catch (Exception e) {
// 不可恢复异常,发送到死信队列
sendToDeadLetterQueue(record);
// 提交偏移量,避免阻塞
consumer.commitSync(Collections.singletonMap(
record.getTopicPartition(),
new OffsetAndMetadata(record.getOffset() + 1)
));
}
}
}消息分发机制
点对点与广播模式
mermaid
graph TD
subgraph 点对点模式
A1["Topic A"] --> B1["Consumer Group"]
B1 --> C1["Consumer 1"]
B1 --> D1["Consumer 2"]
B1 --> E1["Consumer 3"]
end
subgraph 广播模式
A2["Topic B"] --> C2["Consumer 1"]
A2 --> D2["Consumer 2"]
A2 --> E2["Consumer 3"]
end
style A1 fill:#4A90E2,color:#fff,rx:10,ry:10
style A2 fill:#9B59B6,color:#fff,rx:10,ry:10
style B1 fill:#E67E22,color:#fff,rx:10,ry:10点对点模式:同一消费者组内的消费者竞争消费,每条消息只被一个消费者处理。
广播模式:每条消息被所有消费者接收处理。
推拉模式选择
mermaid
graph LR
subgraph 推模式-Push
A1["Broker"] -->|主动推送| B1["Consumer"]
end
subgraph 拉模式-Pull
A2["Consumer"] -->|主动拉取| B2["Broker"]
end
subgraph 长轮询
A3["Consumer"] -->|请求| B3["Broker"]
B3 -->|有消息立即返回<br/>无消息挂起等待| A3
end
style A1 fill:#E74C3C,color:#fff,rx:10,ry:10
style A2 fill:#27AE60,color:#fff,rx:10,ry:10
style B3 fill:#9B59B6,color:#fff,rx:10,ry:10长轮询实现:结合推拉优点的最佳实践
java
public class LongPollingConsumer {
private static final long POLL_TIMEOUT_MS = 30000;
/**
* 长轮询拉取消息
*/
public List<Message> poll() {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < POLL_TIMEOUT_MS) {
// 尝试拉取消息
List<Message> messages = broker.fetchMessages(
topic, partition, offset, maxBytes);
if (!messages.isEmpty()) {
return messages;
}
// 无消息,短暂等待后重试
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
return Collections.emptyList();
}
}高级功能设计
顺序消息
保证消息按发送顺序消费:
mermaid
graph TD
A["订单消息"] --> B{"路由策略"}
B -->|订单ID取模| C["Partition 0"]
B -->|订单ID取模| D["Partition 1"]
B -->|订单ID取模| E["Partition 2"]
C --> F["Consumer 0"]
D --> G["Consumer 1"]
E --> H["Consumer 2"]
style A fill:#4A90E2,color:#fff,rx:10,ry:10
style B fill:#E67E22,color:#fff,rx:10,ry:10实现关键:
- 相同业务Key的消息路由到同一分区
- 单分区内保证顺序
- 消费者单线程处理分区消息
延迟消息
mermaid
graph TD
A["延迟消息"] --> B["延迟队列"]
B --> C["时间轮/定时器"]
C --> D{"到达时间?"}
D -->|是| E["投递到目标Topic"]
D -->|否| F["继续等待"]
style A fill:#4A90E2,color:#fff,rx:10,ry:10
style C fill:#9B59B6,color:#fff,rx:10,ry:10
style E fill:#27AE60,color:#fff,rx:10,ry:10延迟队列实现思路:
java
public class DelayMessageScheduler {
// 延迟级别: 1s, 5s, 10s, 30s, 1m, 5m, 10m, 30m, 1h, 2h
private static final long[] DELAY_LEVELS = {
1000, 5000, 10000, 30000, 60000,
300000, 600000, 1800000, 3600000, 7200000
};
/**
* 发送延迟消息
*/
public void sendDelayMessage(Message message, int delayLevel) {
// 计算投递时间
long deliverTime = System.currentTimeMillis() +
DELAY_LEVELS[delayLevel - 1];
message.setDeliverTime(deliverTime);
// 发送到延迟Topic
String delayTopic = "SCHEDULE_TOPIC_" + delayLevel;
producer.send(delayTopic, message);
}
/**
* 定时扫描到期消息
*/
@Scheduled(fixedRate = 1000)
public void scanDelayMessages() {
for (int level = 1; level <= DELAY_LEVELS.length; level++) {
String delayTopic = "SCHEDULE_TOPIC_" + level;
List<Message> messages = consumer.poll(delayTopic);
for (Message msg : messages) {
if (msg.getDeliverTime() <= System.currentTimeMillis()) {
// 投递到真实Topic
producer.send(msg.getRealTopic(), msg);
}
}
}
}
}事务消息
保证本地事务与消息发送的一致性:
mermaid
sequenceDiagram
participant Producer as 生产者
participant Broker as Broker
participant DB as 本地数据库
Producer->>Broker: 1. 发送半消息
Broker-->>Producer: 2. 返回半消息发送结果
Producer->>DB: 3. 执行本地事务
alt 本地事务成功
Producer->>Broker: 4a. 提交消息(Commit)
Broker->>Broker: 投递给消费者
else 本地事务失败
Producer->>Broker: 4b. 回滚消息(Rollback)
Broker->>Broker: 删除半消息
end
Note over Broker: 定期回查未确认的半消息
Broker->>Producer: 5. 回查事务状态
Producer-->>Broker: 6. 返回Commit/Rollback消息堆积处理
堆积监控
java
@Component
public class MessageBacklogMonitor {
@Scheduled(fixedRate = 60000)
public void checkBacklog() {
Map<String, Long> topicBacklog = new HashMap<>();
for (TopicPartition tp : consumer.assignment()) {
long currentOffset = consumer.position(tp);
long endOffset = consumer.endOffsets(
Collections.singleton(tp)).get(tp);
long backlog = endOffset - currentOffset;
topicBacklog.merge(tp.topic(), backlog, Long::sum);
}
for (Map.Entry<String, Long> entry : topicBacklog.entrySet()) {
if (entry.getValue() > 100000) {
alertService.sendAlert(
String.format("Topic[%s]消息堆积: %d条",
entry.getKey(), entry.getValue()));
}
}
}
}堆积应急处理
mermaid
graph TD
A["发现消息堆积"] --> B["评估堆积原因"]
B --> C["消费能力不足"]
B --> D["下游系统故障"]
B --> E["消费逻辑异常"]
C --> F["扩容消费者"]
D --> G["临时降级/跳过"]
E --> H["修复代码重启"]
F --> I["增加消费者实例"]
F --> J["增加消费线程"]
style A fill:#E74C3C,color:#fff,rx:10,ry:10
style F fill:#27AE60,color:#fff,rx:10,ry:10
style G fill:#E67E22,color:#fff,rx:10,ry:10
style H fill:#9B59B6,color:#fff,rx:10,ry:10更新: 2025-12-06 17:32:32
原文: https://www.yuque.com/u22210564/zoxfmt/edbwn2psgby1tnse