组件讲解-如何实现高性能延迟队列-消费消息
在本文中,我们来介绍延迟队列组件damai-service-delay-queue-framework消费消息的流程,关于组件的使用和发送消息的流程,可跳转到相应文档查看
建议小伙伴先学习完发送消息的流程后,再来学习本人内容。
消费消息
使用
下面我们来开始分析,以订单延迟关闭为例,
@Slf4j
@Component
public class DelayOrderCancelConsumer implements ConsumerTask {
@Autowired
private OrderService orderService;
@Override
public void execute(String content) {
log.info("延迟订单取消消息进行消费 content : {}", content);
if (StringUtil.isEmpty(content)) {
log.error("延迟队列消息不存在");
return;
}
DelayOrderCancelDto delayOrderCancelDto = JSON.parseObject(content, DelayOrderCancelDto.class);
//取消订单
OrderCancelDto orderCancelDto = new OrderCancelDto();
orderCancelDto.setOrderNumber(delayOrderCancelDto.getOrderNumber());
boolean cancel = orderService.cancel(orderCancelDto);
if (cancel) {
log.info("延迟订单取消成功 orderCancelDto : {}",content);
}else {
log.error("延迟订单取消失败 orderCancelDto : {}",content);
}
}
@Override
public String topic() {
return SpringUtil.getPrefixDistinctionName() + "-" + DELAY_ORDER_CANCEL_TOPIC;
}
}DelayOrderCancelConsumer是监听消息的处理类,实现了
public interface ConsumerTask {
/**
* 消费任务
* @param content 具体参数
* */
void execute(String content);
/**
* 主题
* @return 主题
* */
String topic();
}使用起来很简单,只要实现ConsumerTask接口的方法即可,然后注入到Spring中即可,
要注意同一个topic下的发送者和消费者配置的分区数要相同,默认为5
delay.queue.isolationRegionCount = 5分析
从服务启动来入手
DelayQueueInitHandler
@AllArgsConstructor
public class DelayQueueInitHandler implements ApplicationListener<ApplicationStartedEvent> {
private final DelayQueueBasePart delayQueueBasePart;
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
//获取ConsumerTask集合
Map<String, ConsumerTask> consumerTaskMap = event.getApplicationContext().getBeansOfType(ConsumerTask.class);
if (CollectionUtil.isEmpty(consumerTaskMap)) {
return;
}
for (ConsumerTask consumerTask : consumerTaskMap.values()) {
//构建消息主题数据
DelayQueuePart delayQueuePart = new DelayQueuePart(delayQueueBasePart,consumerTask);
//获取分区数
Integer isolationRegionCount = delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties()
.getIsolationRegionCount();
//根据分区数来创建消息消费队列
for(int i = 0; i < isolationRegionCount; i++) {
DelayConsumerQueue delayConsumerQueue = new DelayConsumerQueue(delayQueuePart,
delayQueuePart.getConsumerTask().topic() + "-" + i);
//创建队列后监听消息
delayConsumerQueue.listenStart();
}
}
}
}- 项目启动后执行
onApplicationEvent方法,从Spring中获取ConsumerTask,也就是消息消费的业务bean对象集合 - 循环集合,构建DelayQueuePart 消息主题数据
- 获取主题的分区数isolationRegionCount
- 根据分区数来创建出消息消费队列DelayConsumerQueue,将DelayQueuePart 消息主题数据和分区topic传入
- 创建好DelayConsumerQueue后,启动监听消息
DelayQueuePart构建
@Data
public class DelayQueuePart {
/**
* 延迟队列配置信息
* */
private final DelayQueueBasePart delayQueueBasePart;
/**
* 客户端对象
* */
private final ConsumerTask consumerTask;
public DelayQueuePart(DelayQueueBasePart delayQueueBasePart, ConsumerTask consumerTask){
this.delayQueueBasePart = delayQueueBasePart;
this.consumerTask = consumerTask;
}
}消息主题分区(重点!!!)
delayQueuePart.getConsumerTask().topic() + "-" + i** **是根据分区数,来将主题进行分片,比如说分区数是5,那么就把主题分成了5份。top为d_delay_order_cancel_topic,分区后实际的主题为d_delay_order_cancel_topic-0 ... d_delay_order_cancel_topic-4
DelayConsumerQueue构建
@Slf4j
public class DelayConsumerQueue extends DelayBaseQueue{
/**
* 监听消息线程数
* */
private final AtomicInteger listenStartThreadCount = new AtomicInteger(1);
/**
* 消费消息线程数
* */
private final AtomicInteger executeTaskThreadCount = new AtomicInteger(1);
/**
* 监听消息线程池
* */
private final ThreadPoolExecutor listenStartThreadPool;
/**
* 消费消息线程池
* */
private final ThreadPoolExecutor executeTaskThreadPool;
/**
* 监控消费启动标识
* */
private final AtomicBoolean runFlag = new AtomicBoolean(false);
/**
* 消息处理
* */
private final ConsumerTask consumerTask;
public DelayConsumerQueue(DelayQueuePart delayQueuePart, String relTopic){
//构建RBlockingQueue
super(delayQueuePart.getDelayQueueBasePart().getRedissonClient(),relTopic);
//监听消息线程池
this.listenStartThreadPool = new ThreadPoolExecutor(1,1,60,
TimeUnit.SECONDS,new LinkedBlockingQueue<>(),r -> new Thread(Thread.currentThread().getThreadGroup(), r,
"listen-start-thread-" + listenStartThreadCount.getAndIncrement()));
//消费消息线程池
this.executeTaskThreadPool = new ThreadPoolExecutor(
delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties().getCorePoolSize(),
delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties().getMaximumPoolSize(),
delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties().getKeepAliveTime(),
delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties().getUnit(),
new LinkedBlockingQueue<>(delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties().getWorkQueueSize()),
r -> new Thread(Thread.currentThread().getThreadGroup(), r,
"delay-queue-consume-thread-" + executeTaskThreadCount.getAndIncrement()));
//消息处理逻辑
this.consumerTask = delayQueuePart.getConsumerTask();
}
/**
* 启动消息监听
* */
public synchronized void listenStart(){
if (!runFlag.get()) {
runFlag.set(true);
listenStartThreadPool.execute(() -> {
while (!Thread.interrupted()) {
try {
assert blockingQueue != null;
String content = blockingQueue.take();
executeTaskThreadPool.execute(() -> {
try {
consumerTask.execute(content);
}catch (Exception e) {
log.error("consumer execute error",e);
}
});
} catch (InterruptedException e) {
destroy(executeTaskThreadPool);
} catch (Throwable e) {
log.error("blockingQueue take error",e);
}
}
});
}
}
public void destroy(ExecutorService executorService) {
try {
if (Objects.nonNull(executorService)) {
executorService.shutdown();
}
} catch (Exception e) {
log.error("destroy error",e);
}
}
}下面我们来详细分析每步骤的作用
super(delayQueuePart.getDelayQueueBasePart().getRedissonClient(),relTopic)
@Slf4j
public class DelayBaseQueue {
protected final RedissonClient redissonClient;
protected final RBlockingQueue<String> blockingQueue;
public DelayBaseQueue(RedissonClient redissonClient,String relTopic){
this.redissonClient = redissonClient;
this.blockingQueue = redissonClient.getBlockingQueue(relTopic);
}
}在构建DelayConsumerQueue时,先构建父类DelayBaseQueue,将RBlockingQueue构建出来,
这样消息消费队列DelayConsumerQueue就含有Redisson的RBlockingQueue了。
将RBlockingQueue单独抽取出来的原因是,发送者和消费者都需要RBlockingQueue,单独抽取出来就可以实现共用了
listenStartThreadPool
//监听消息线程池
this.listenStartThreadPool = new ThreadPoolExecutor(1,1,60,
TimeUnit.SECONDS,new LinkedBlockingQueue<>(),r -> new Thread(Thread.currentThread().getThreadGroup(), r,
"listen-start-thread-" + listenStartThreadCount.getAndIncrement()));listenStartThreadPool的作用是异步启动消息监听,核心线程为1,因为只执行消息监听即可
executeTaskThreadPool
//消费消息线程池
this.executeTaskThreadPool = new ThreadPoolExecutor(
delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties().getCorePoolSize(),
delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties().getMaximumPoolSize(),
delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties().getKeepAliveTime(),
delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties().getUnit(),
new LinkedBlockingQueue<>(delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties().getWorkQueueSize()),
r -> new Thread(Thread.currentThread().getThreadGroup(), r,
"delay-queue-consume-thread-" + executeTaskThreadCount.getAndIncrement()));executeTaskThreadPool的作用是当监听到了消息后,用来执行消息处理逻辑的,线程池的参数可根据业务特点来调整,参数在DelayQueueProperties,我们再来回顾一下
@Data
@ConfigurationProperties(prefix = PREFIX)
public class DelayQueueProperties {
public static final String PREFIX = "delay.queue";
/**
* 从队列拉取数据的线程池中的核心线程数量,如果业务过慢可调大
* */
private Integer corePoolSize = 4;
/**
* 从队列拉取数据的线程池中的最大线程数量,如果业务过慢可调大
* */
private Integer maximumPoolSize = 4;
/**
* 从队列拉取数据的线程池中的最大线程回收时间
* */
private long keepAliveTime = 30;
/**
* 从队列拉取数据的线程池中的最大线程回收时间的时间单位
* */
private TimeUnit unit = TimeUnit.SECONDS;
/**
* 从队列拉取数据的线程池中的队列数量,如果业务过慢可调大
* */
private Integer workQueueSize = 256;
/**
* 延时队列的隔离分区数,延时有瓶颈时 可调大次数,但会增大redis的cpu消耗(同一个topic发送者和消费者的隔离分区数必须相同)
* */
private Integer isolationRegionCount = 5;
}delayQueuePart.getConsumerTask()
//消息处理逻辑
this.consumerTask = delayQueuePart.getConsumerTask();consumerTask就是对消息进行消费的业务处理了
到这里就是将DelayConsumerQueue构建完毕了,我们再回顾一下初始化的部分
@AllArgsConstructor
public class DelayQueueInitHandler implements ApplicationListener<ApplicationStartedEvent> {
private final DelayQueueBasePart delayQueueBasePart;
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
//获取ConsumerTask集合
Map<String, ConsumerTask> consumerTaskMap = event.getApplicationContext().getBeansOfType(ConsumerTask.class);
if (CollectionUtil.isEmpty(consumerTaskMap)) {
return;
}
for (ConsumerTask consumerTask : consumerTaskMap.values()) {
//构建消息主题数据
DelayQueuePart delayQueuePart = new DelayQueuePart(delayQueueBasePart,consumerTask);
//获取分区数
Integer isolationRegionCount = delayQueuePart.getDelayQueueBasePart().getDelayQueueProperties()
.getIsolationRegionCount();
//根据分区数来创建消息消费队列
for(int i = 0; i < isolationRegionCount; i++) {
DelayConsumerQueue delayConsumerQueue = new DelayConsumerQueue(delayQueuePart,
delayQueuePart.getConsumerTask().topic() + "-" + i);
//创建队列后监听消息
delayConsumerQueue.listenStart();
}
}
}
}当构建好DelayConsumerQueue后,接着执行**delayConsumerQueue.listenStart()**开始启动监听消息的线程
delayConsumerQueue.listenStart()
/**
* 启动消息监听
* */
public synchronized void listenStart(){
//如果runFlag为false,说明监听没有启动过
if (!runFlag.get()) {
//将runFlag为true
runFlag.set(true);
//异步执行监听逻辑
listenStartThreadPool.execute(() -> {
while (!Thread.interrupted()) {
try {
assert blockingQueue != null;
//从Redisson的RBlockingQueue中监听消息
String content = blockingQueue.take();
//如果监听到消息,则执行处理逻辑
executeTaskThreadPool.execute(() -> {
try {
consumerTask.execute(content);
}catch (Exception e) {
log.error("consumer execute error",e);
}
});
} catch (InterruptedException e) {
//如果出现中断异常,则将线程池关闭
destroy(executeTaskThreadPool);
} catch (Throwable e) {
log.error("blockingQueue take error",e);
}
}
});
}
}
public void destroy(ExecutorService executorService) {
try {
if (Objects.nonNull(executorService)) {
executorService.shutdown();
}
} catch (Exception e) {
log.error("destroy error",e);
}
}通过 String content = blockingQueue.take(); 监听到消息后,调用 executeTaskThreadPool.execute 线程池来异步消费消息
总结
- 如果没有启动过监听,则将监听消息的任务进行异步启动
- 启动监听消息任务后,从RBlockingQueue执行take方法,等待消息的到来
- 当消息到来后,执行通过executeTaskThreadPool线程池异步执行消息处理任务
以上就是将消息的消费流程分析完毕,下面来结合流程图帮助小伙伴更加清晰的理解

更新: 2025-10-13 11:53:13
原文: https://www.yuque.com/u22210564/ykdrdh/acr9dg80m26zq9hq