组件讲解-如何实现高性能延迟队列-发送消息
Redisson延迟队列
使用 Redisson 实现延迟队列具有多个优点,这些优点源于 Redisson 对 Redis 功能的封装和扩展,以及 Redis 本身作为内存数据库的高性能特性。下面是使用 Redisson 延迟队列的主要好处:
1. 简化开发
- 高级接口:Redisson 提供了一系列高级的数据结构和同步工具,使得在 Java 应用中使用 Redis 更加简单直接。对于延迟队列,Redisson 提供的
RDelayedQueue接口让开发者可以轻松实现延迟功能,无需深入了解复杂的底层实现细节
2. 高性能和低延迟
- 内存操作:由于 Redis 是基于内存的数据存储系统,它能够提供极高的读写速度,这对于实现高性能、低延迟的延迟队列非常重要。
- 优化的通信:Redisson 对 Redis 命令进行了优化,减少网络开销,进一步提高了操作的效率和速度
这里使用Redisson而不是用RocketMQ的原因有两点,一是RocketMQ虽然是本身支持延迟发送功能,但一旦消息发生堆积了。消息达不到指定的延时,而Redisson的方式比MQ的方式性能更叫高效。二是现在的项目中都会依赖Redis,但并不都会依赖MQ,也是为了减少对中间件的依赖
使用 Redisson 的 RDelayedQueue 实现延迟队列
RDelayedQueue 是 Redisson 提供的一个接口,用于实现延迟队列的功能。它允许将元素延迟一段时间后再被消费。实现延迟队列的步骤如下:
- 创建 RDelayedQueue:首先需要创建一个普通的队列(例如
RQueue或RBlockingQueue),然后使用这个队列创建一个RDelayedQueue实例 - 添加延迟元素:通过
RDelayedQueue的offer方法添加元素,并指定延迟时间。元素将在指定的延迟时间后自动转移到原始队列中,随后可被消费 - 消费元素:从原始队列中消费元素。如果是
RBlockingQueue,消费者可以阻塞等待直到元素可用
示例代码
下面是一个使用 Redisson 实现延迟队列的简单示例:
// 创建 Redisson 客户端实例
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
// 获取一个 RBlockingQueue 实例
RBlockingQueue<String> queue = redisson.getBlockingQueue("myQueue");
// 使用 RBlockingQueue 创建 RDelayedQueue
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(queue);
// 将一个元素添加到延迟队列中,延迟 10 秒钟
delayedQueue.offer("myElement", 10, TimeUnit.SECONDS);
// 在其他线程或者程序中,从 RBlockingQueue 中消费元素
String element = queue.take(); // 这会阻塞等待直到元素可用
// 关闭 Redisson 客户端
redisson.shutdown();这个示例展示了如何创建一个延迟队列,向其中添加一个延迟 10 秒的元素,并从原始队列中阻塞消费这个元素
注意事项
- 使用
RDelayedQueue时,延迟的元素实际上是首先存储在 Redis 中的一个内部列表中,然后在到期后转移到目标队列。因此,需要保持 Redisson 实例运行,以便它可以处理延迟元素的转移 - 当 Redisson 客户端重启时,
RDelayedQueue的状态会被自动恢复,因为其状态是持久化在 Redis 中的。这意味着即使应用重启,延迟队列的功能也不会受到影响
而本人在Redisson 延迟队列基础上进一步进行了优化,使用了对消息进行分片的策略,并结合了线程池并发的执行消费。使得执行效率成几倍的提升,也是强烈建议小伙伴学会此组件的设计,在面试上绝对是装逼利器!
话不多说,来开始详细的介绍
damai-service-delay-queue-framework
使用
依赖
<dependency>
<groupId>com.example</groupId>
<artifactId>damai-service-delay-queue-framework</artifactId>
<version>${revision}</version>
</dependency>相关配置
@Data
@ConfigurationProperties(prefix = PREFIX)
public class DelayQueueProperties {
public static final String PREFIX = "delay.queue";
/**
* 从队列拉取数据的线程池中的核心线程数量,如果业务过慢可调大
* */
private Integer corePoolSize = 4;
/**
* 从队列拉取数据的线程池中的最大线程数量,如果业务过慢可调大
* */
private Integer maximumPoolSize = 4;
/**
* 从队列拉取数据的线程池中的最大线程回收时间
* */
private long keepAliveTime = 30;
/**
* 从队列拉取数据的线程池中的最大线程回收时间的时间单位
* */
private TimeUnit unit = TimeUnit.SECONDS;
/**
* 从队列拉取数据的线程池中的队列数量,如果业务过慢可调大
* */
private Integer workQueueSize = 256;
/**
* 延时队列的隔离分区数,延时有瓶颈时 可调大次数,但会增大redis的cpu消耗(同一个topic发送者和消费者的隔离分区数必须相同)
* */
private Integer isolationRegionCount = 5;
}延迟订单关闭消息发送
public class ProgramOrderConstant {
public static final String DELAY_ORDER_CANCEL_TOPIC ="d_delay_order_cancel_topic";
public static final Long DELAY_ORDER_CANCEL_TIME = 5L;
public static final TimeUnit DELAY_ORDER_CANCEL_TIME_UNIT = TimeUnit.SECONDS;
public static final String DELAY_OPERATE_PROGRAM_DATA_TOPIC = "d_delay_operate_program_data_topic";
public static final Long ORDER_TABLE_COUNT = 4L;
}@Slf4j
@Component
public class DelayOrderCancelSend {
@Autowired
private DelayQueueContext delayQueueContext;
public void sendMessage(String message){
try {
delayQueueContext.sendMessage(SpringUtil.getPrefixDistinctionName() + "-" + DELAY_ORDER_CANCEL_TOPIC,
message, DELAY_ORDER_CANCEL_TIME, DELAY_ORDER_CANCEL_TIME_UNIT);
}catch (Exception e) {
log.error("send message error message : {}",message,e);
}
}
}延迟订单关闭消息消费
public static final String DELAY_ORDER_CANCEL_TOPIC ="d_delay_order_cancel_topic";@Slf4j
@Component
public class DelayOrderCancelConsumer implements ConsumerTask {
@Autowired
private OrderService orderService;
@Override
public void execute(String content) {
log.info("延迟订单取消消息进行消费 content : {}", content);
if (StringUtil.isEmpty(content)) {
log.error("延迟队列消息不存在");
return;
}
DelayOrderCancelDto delayOrderCancelDto = JSON.parseObject(content, DelayOrderCancelDto.class);
//取消订单
OrderCancelDto orderCancelDto = new OrderCancelDto();
orderCancelDto.setOrderNumber(delayOrderCancelDto.getOrderNumber());
boolean cancel = orderService.cancel(orderCancelDto);
if (cancel) {
log.info("延迟订单取消成功 orderCancelDto : {}",content);
}else {
log.error("延迟订单取消失败 orderCancelDto : {}",content);
}
}
@Override
public String topic() {
return SpringUtil.getPrefixDistinctionName() + "-" + DELAY_ORDER_CANCEL_TOPIC;
}
}上面就是关于延迟队列的使用介绍,可以看到使用起来非常的简单上手,下面介绍设计的过程,其中有很经典的分片思想,强烈建议小伙伴认真阅读
发送消息
@EnableConfigurationProperties(DelayQueueProperties.class)
public class DelayQueueAutoConfig {
/**
* 消费者处理器
* */
@Bean
public DelayQueueInitHandler delayQueueInitHandler(DelayQueueBasePart delayQueueBasePart){
return new DelayQueueInitHandler(delayQueueBasePart);
}
/**
* 配置信息装载
* */
@Bean
public DelayQueueBasePart delayQueueBasePart(RedissonClient redissonClient,DelayQueueProperties delayQueueProperties){
return new DelayQueueBasePart(redissonClient,delayQueueProperties);
}
/**
* 发送者上下文
* */
@Bean
public DelayQueueContext delayQueueContext(DelayQueueBasePart delayQueueBasePart){
return new DelayQueueContext(delayQueueBasePart);
}
}DelayQueueContext
DelayQueueContext是发送消息的上下文,里面存放着topic和发送消息处理的信息
public class DelayQueueContext {
private final DelayQueueBasePart delayQueueBasePart;
//key为topc主题,value为发送消息的处理器
private final Map<String, DelayQueueProduceCombine> delayQueueProduceCombineMap = new ConcurrentHashMap<>();
public DelayQueueContext(DelayQueueBasePart delayQueueBasePart){
this.delayQueueBasePart = delayQueueBasePart;
}
public void sendMessage(String topic,String content,long delayTime, TimeUnit timeUnit) {
DelayQueueProduceCombine delayQueueProduceCombine = delayQueueProduceCombineMap.computeIfAbsent(
//构建消息发送处理器
topic, k -> new DelayQueueProduceCombine(delayQueueBasePart,topic));
delayQueueProduceCombine.offer(content,delayTime,timeUnit);
}
}构建DelayQueueProduceCombine消息发送处理器
在发送消息时,会先判断消息上下文delayQueueProduceCombineMap中是否存在,如果不存在,则进行构建DelayQueueProduceCombine消息发送处理器
public DelayQueueProduceCombine(DelayQueueBasePart delayQueueBasePart,String topic){
//消息的分区数
Integer isolationRegionCount = delayQueueBasePart.getDelayQueueProperties().getIsolationRegionCount();
//根据分区数来构建分区选择器
isolationRegionSelector =new IsolationRegionSelector(isolationRegionCount);
for(int i = 0; i < isolationRegionCount; i++) {
//按照分区数来构建发送者延迟队列,然后放到存储里
delayProduceQueueList.add(new DelayProduceQueue(delayQueueBasePart.getRedissonClient(),topic + "-" + i));
}
}构建DelayQueueProduceCombine消息发送处理器时,先执行new IsolationRegionSelector(isolationRegionCount)构建出分区选择器
IsolationRegionSelector分区选择器
public class IsolationRegionSelector {
private final AtomicInteger count = new AtomicInteger(0);
private final Integer thresholdValue;
public IsolationRegionSelector(Integer thresholdValue) {
this.thresholdValue = thresholdValue;
}
private int reset() {
count.set(0);
return count.get();
}
public synchronized int getIndex() {
int cur = count.get();
if (cur >= thresholdValue) {
cur = reset();
} else {
count.incrementAndGet();
}
return cur;
}
}构建好分区选择器后,开始构建消息发送队列,构建好后放入存储集合中
for(int i = 0; i < isolationRegionCount; i++) {
delayProduceQueueList.add(new DelayProduceQueue(delayQueueBasePart.getRedissonClient(),topic + "-" + i));
}重点
topic + "-" + i 就是将主题的消息进行分片了,比如说分区数是5,那么就把这个主题下的消息分成了5份,每份由DelayProduceQueue来承载,接着把DelayProduceQueue放进了delayProduceQueueList中
DelayProduceQueue消息发送队列
public class DelayProduceQueue extends DelayBaseQueue{
private final RDelayedQueue<String> delayedQueue;
public DelayProduceQueue(RedissonClient redissonClient, final String relTopic) {
super(redissonClient, relTopic);
this.delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
}
public void offer(String content, long delayTime, TimeUnit timeUnit) {
delayedQueue.offer(content,delayTime,timeUnit);
}
}public class DelayBaseQueue {
protected final RedissonClient redissonClient;
protected final RBlockingQueue<String> blockingQueue;
public DelayBaseQueue(RedissonClient redissonClient,String relTopic){
this.redissonClient = redissonClient;
this.blockingQueue = redissonClient.getBlockingQueue(relTopic);
}
}DelayProduceQueue是发送者队列,在购建时,就要传入redissonClient和relTopic了,到这步,就要调用到Redisson的RBlockingQueue和RDelayedQueue了
这里将Redisson的RBlockingQueue和RDelayedQueue分别抽取了出来,因为发送消息用到的是RDelayedQueue,接受消息用到的是RBlockingQueue,
RDelayedQueue封装进了DelayProduceQueue中,RBlockingQueue封装进了DelayBaseQueue中,
而DelayProduceQueue需要继承DelayBaseQueue的原因是RDelayedQueue需要靠RBlockingQueue来得到
这段流程是使用Redisson的核心思想,我再梳理一遍这部分,希望小伙伴好好体会
- 构建DelayProduceQueue时,先构建父类DelayBaseQueue
- 构建DelayBaseQueue时,把redissonClient和relTopic传入,得到了RBlockingQueue,也就是消费消息需要的阻塞队列
- 构建父类DelayBaseQueue结束后,开始构建自己DelayProduceQueue,这时当前于DelayProduceQueue自己也有了RBlockingQueue
- 通过RBlockingQueue拿到了RDelayedQueue
到这时,RDelayedQueue 在 DelayProduceQueue中,RBlockingQueue 在 DelayBaseQueue中,
接着将每一个DelayProduceQueue都放进delayProduceQueueList中,这样每个delayProduceQueueList就保存了所有的DelayProduceQueue了
到这时就将DelayQueueProduceCombine消息发送处理器整个部分都构建完毕了,接口下就是要开始发送消息了,也就是
delayQueueProduceCombine.offer(content,delayTime,timeUnit);让我们结合发送流程再将DelayQueueProduceCombine消息发送处理器的代码看一遍
DelayQueueProduceCombine消息发送处理器
public class DelayQueueProduceCombine {
private final IsolationRegionSelector isolationRegionSelector;
private final List<DelayProduceQueue> delayProduceQueueList = new ArrayList<>();
public DelayQueueProduceCombine(DelayQueueBasePart delayQueueBasePart,String topic){
//消息的分区数
Integer isolationRegionCount = delayQueueBasePart.getDelayQueueProperties().getIsolationRegionCount();
//根据分区数来构建分区选择器
isolationRegionSelector =new IsolationRegionSelector(isolationRegionCount);
for(int i = 0; i < isolationRegionCount; i++) {
//按照分区数来构建发送者延迟队列,然后放到存储里
delayProduceQueueList.add(new DelayProduceQueue(delayQueueBasePart.getRedissonClient(),topic + "-" + i));
}
}
/**
* 发送消息
* */
public void offer(String content,long delayTime, TimeUnit timeUnit){
//通过选择器获取分区索引
int index = isolationRegionSelector.getIndex();
//根据索引从集合中获取到分区的发送队列,然后发送消息
delayProduceQueueList.get(index).offer(content, delayTime, timeUnit);
}
}这时候再看DelayQueueProduceCombine是不是就清晰很多了,我们接着来分析发送消息的流程
isolationRegionSelector.getIndex()
private int reset() {
count.set(0);
return count.get();
}
public synchronized int getIndex() {
int cur = count.get();
if (cur >= thresholdValue) {
cur = reset();
} else {
count.incrementAndGet();
}
return cur;
}选择器获取分区索引的原理就是轮训获取,获取一次就对索引值+1,如果达到了分区数阈值,那么再将索引值重置为0
获取到了分区索引后,用索引去集合中获取对应的发送队列DelayProduceQueue,然后执行offer发送消息
DelayProduceQueue.offer
public class DelayProduceQueue extends DelayBaseQueue{
private final RDelayedQueue<String> delayedQueue;
public DelayProduceQueue(RedissonClient redissonClient, final String relTopic) {
super(redissonClient, relTopic);
this.delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
}
/**
* 发送消息
*/
public void offer(String content, long delayTime, TimeUnit timeUnit) {
delayedQueue.offer(content,delayTime,timeUnit);
}
}以上就是发送消息的流程了,建议小伙伴多认真看几遍,就会梳理清楚了,关于消费消息和整个流程的流程图,会在下篇文章讲解,可跳转到相应文档查看
更新: 2025-11-11 09:55:10
原文: https://www.yuque.com/u22210564/ykdrdh/zhgo0z2smmvnz6ah