Skip to content

组件讲解-如何实现高性能延迟队列-发送消息

Redisson延迟队列

使用 Redisson 实现延迟队列具有多个优点,这些优点源于 Redisson 对 Redis 功能的封装和扩展,以及 Redis 本身作为内存数据库的高性能特性。下面是使用 Redisson 延迟队列的主要好处:

1. 简化开发

  • 高级接口:Redisson 提供了一系列高级的数据结构和同步工具,使得在 Java 应用中使用 Redis 更加简单直接。对于延迟队列,Redisson 提供的 RDelayedQueue 接口让开发者可以轻松实现延迟功能,无需深入了解复杂的底层实现细节

2. 高性能和低延迟

  • 内存操作:由于 Redis 是基于内存的数据存储系统,它能够提供极高的读写速度,这对于实现高性能、低延迟的延迟队列非常重要。
  • 优化的通信:Redisson 对 Redis 命令进行了优化,减少网络开销,进一步提高了操作的效率和速度

这里使用Redisson而不是用RocketMQ的原因有两点,一是RocketMQ虽然是本身支持延迟发送功能,但一旦消息发生堆积了。消息达不到指定的延时,而Redisson的方式比MQ的方式性能更叫高效。二是现在的项目中都会依赖Redis,但并不都会依赖MQ,也是为了减少对中间件的依赖

使用 Redisson 的 RDelayedQueue 实现延迟队列

RDelayedQueue 是 Redisson 提供的一个接口,用于实现延迟队列的功能。它允许将元素延迟一段时间后再被消费。实现延迟队列的步骤如下:

  1. 创建 RDelayedQueue:首先需要创建一个普通的队列(例如 RQueueRBlockingQueue),然后使用这个队列创建一个 RDelayedQueue 实例
  2. 添加延迟元素:通过 RDelayedQueueoffer 方法添加元素,并指定延迟时间。元素将在指定的延迟时间后自动转移到原始队列中,随后可被消费
  3. 消费元素:从原始队列中消费元素。如果是 RBlockingQueue,消费者可以阻塞等待直到元素可用

示例代码

下面是一个使用 Redisson 实现延迟队列的简单示例:

java
// 创建 Redisson 客户端实例
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);

// 获取一个 RBlockingQueue 实例
RBlockingQueue<String> queue = redisson.getBlockingQueue("myQueue");

// 使用 RBlockingQueue 创建 RDelayedQueue
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(queue);

// 将一个元素添加到延迟队列中,延迟 10 秒钟
delayedQueue.offer("myElement", 10, TimeUnit.SECONDS);

// 在其他线程或者程序中,从 RBlockingQueue 中消费元素
String element = queue.take(); // 这会阻塞等待直到元素可用

// 关闭 Redisson 客户端
redisson.shutdown();

这个示例展示了如何创建一个延迟队列,向其中添加一个延迟 10 秒的元素,并从原始队列中阻塞消费这个元素

注意事项

  • 使用 RDelayedQueue 时,延迟的元素实际上是首先存储在 Redis 中的一个内部列表中,然后在到期后转移到目标队列。因此,需要保持 Redisson 实例运行,以便它可以处理延迟元素的转移
  • 当 Redisson 客户端重启时,RDelayedQueue 的状态会被自动恢复,因为其状态是持久化在 Redis 中的。这意味着即使应用重启,延迟队列的功能也不会受到影响

而本人在Redisson 延迟队列基础上进一步进行了优化,使用了对消息进行分片的策略,并结合了线程池并发的执行消费。使得执行效率成几倍的提升,也是强烈建议小伙伴学会此组件的设计,在面试上绝对是装逼利器!

话不多说,来开始详细的介绍

damai-service-delay-queue-framework

使用

依赖

xml
<dependency>
    <groupId>com.example</groupId>
    <artifactId>damai-service-delay-queue-framework</artifactId>
    <version>${revision}</version>
</dependency>

相关配置

java
@Data
@ConfigurationProperties(prefix = PREFIX)
public class DelayQueueProperties {

    public static final String PREFIX = "delay.queue";
    
    /**
     * 从队列拉取数据的线程池中的核心线程数量,如果业务过慢可调大
     * */
    private Integer corePoolSize = 4;
    /**
     * 从队列拉取数据的线程池中的最大线程数量,如果业务过慢可调大
     * */
    private Integer maximumPoolSize = 4;
    
    /**
     * 从队列拉取数据的线程池中的最大线程回收时间
     * */
    private long keepAliveTime = 30;
    /**
     * 从队列拉取数据的线程池中的最大线程回收时间的时间单位
     * */
    private TimeUnit unit = TimeUnit.SECONDS;
    /**
     * 从队列拉取数据的线程池中的队列数量,如果业务过慢可调大
     * */
    private Integer workQueueSize = 256;
    
    /**
     * 延时队列的隔离分区数,延时有瓶颈时 可调大次数,但会增大redis的cpu消耗(同一个topic发送者和消费者的隔离分区数必须相同)
     * */
    private Integer isolationRegionCount = 5;
}

延迟订单关闭消息发送

java
public class ProgramOrderConstant {
    
    public static final String DELAY_ORDER_CANCEL_TOPIC ="d_delay_order_cancel_topic";
    
    public static final Long DELAY_ORDER_CANCEL_TIME = 5L;
    
    public static final TimeUnit DELAY_ORDER_CANCEL_TIME_UNIT = TimeUnit.SECONDS;
    
    public static final String DELAY_OPERATE_PROGRAM_DATA_TOPIC = "d_delay_operate_program_data_topic";
    
    public static final Long ORDER_TABLE_COUNT = 4L;
}
java
@Slf4j
@Component
public class DelayOrderCancelSend {
    
    @Autowired
    private DelayQueueContext delayQueueContext;
    
    public void sendMessage(String message){
        try {
            delayQueueContext.sendMessage(SpringUtil.getPrefixDistinctionName() + "-" + DELAY_ORDER_CANCEL_TOPIC,
                    message, DELAY_ORDER_CANCEL_TIME, DELAY_ORDER_CANCEL_TIME_UNIT);
        }catch (Exception e) {
            log.error("send message error message : {}",message,e);
        }
        
    }
}

延迟订单关闭消息消费

java
public static final String DELAY_ORDER_CANCEL_TOPIC ="d_delay_order_cancel_topic";
java
@Slf4j
@Component
public class DelayOrderCancelConsumer implements ConsumerTask {
    
    @Autowired
    private OrderService orderService;
    
    @Override
    public void execute(String content) {
        log.info("延迟订单取消消息进行消费 content : {}", content);
        if (StringUtil.isEmpty(content)) {
            log.error("延迟队列消息不存在");
            return;
        }
        DelayOrderCancelDto delayOrderCancelDto = JSON.parseObject(content, DelayOrderCancelDto.class);
        
        //取消订单
        OrderCancelDto orderCancelDto = new OrderCancelDto();
        orderCancelDto.setOrderNumber(delayOrderCancelDto.getOrderNumber());
        boolean cancel = orderService.cancel(orderCancelDto);
        if (cancel) {
            log.info("延迟订单取消成功 orderCancelDto : {}",content);
        }else {
            log.error("延迟订单取消失败 orderCancelDto : {}",content);
        }
    }
    
    @Override
    public String topic() {
        return SpringUtil.getPrefixDistinctionName() + "-" + DELAY_ORDER_CANCEL_TOPIC;
    }
}

上面就是关于延迟队列的使用介绍,可以看到使用起来非常的简单上手,下面介绍设计的过程,其中有很经典的分片思想,强烈建议小伙伴认真阅读

发送消息

java
@EnableConfigurationProperties(DelayQueueProperties.class)
public class DelayQueueAutoConfig {
    
    /**
     * 消费者处理器
     * */
    @Bean
    public DelayQueueInitHandler delayQueueInitHandler(DelayQueueBasePart delayQueueBasePart){
        return new DelayQueueInitHandler(delayQueueBasePart);
    }
    /**
     * 配置信息装载
     * */
    @Bean
    public DelayQueueBasePart delayQueueBasePart(RedissonClient redissonClient,DelayQueueProperties delayQueueProperties){
        return new DelayQueueBasePart(redissonClient,delayQueueProperties);
    }
    /**
     * 发送者上下文
     * */
    @Bean
    public DelayQueueContext delayQueueContext(DelayQueueBasePart delayQueueBasePart){
        return new DelayQueueContext(delayQueueBasePart);
    }
}

DelayQueueContext

DelayQueueContext是发送消息的上下文,里面存放着topic和发送消息处理的信息

java
public class DelayQueueContext {
    
    private final DelayQueueBasePart delayQueueBasePart;
    //key为topc主题,value为发送消息的处理器
    private final Map<String, DelayQueueProduceCombine> delayQueueProduceCombineMap = new ConcurrentHashMap<>();
    
    public DelayQueueContext(DelayQueueBasePart delayQueueBasePart){
        this.delayQueueBasePart = delayQueueBasePart;
    }
    
    public void sendMessage(String topic,String content,long delayTime, TimeUnit timeUnit) {
        DelayQueueProduceCombine delayQueueProduceCombine = delayQueueProduceCombineMap.computeIfAbsent(
            	//构建消息发送处理器
                topic, k -> new DelayQueueProduceCombine(delayQueueBasePart,topic));
        delayQueueProduceCombine.offer(content,delayTime,timeUnit);
    }
}

构建DelayQueueProduceCombine消息发送处理器

在发送消息时,会先判断消息上下文delayQueueProduceCombineMap中是否存在,如果不存在,则进行构建DelayQueueProduceCombine消息发送处理器

java
public DelayQueueProduceCombine(DelayQueueBasePart delayQueueBasePart,String topic){
    //消息的分区数
    Integer isolationRegionCount = delayQueueBasePart.getDelayQueueProperties().getIsolationRegionCount();
    //根据分区数来构建分区选择器
    isolationRegionSelector =new IsolationRegionSelector(isolationRegionCount);
    for(int i = 0; i < isolationRegionCount; i++) {
        //按照分区数来构建发送者延迟队列,然后放到存储里
        delayProduceQueueList.add(new DelayProduceQueue(delayQueueBasePart.getRedissonClient(),topic + "-" + i));
    }
}

构建DelayQueueProduceCombine消息发送处理器时,先执行new IsolationRegionSelector(isolationRegionCount)构建出分区选择器

IsolationRegionSelector分区选择器

java
public class IsolationRegionSelector {

	private final AtomicInteger count = new AtomicInteger(0);

	private final Integer thresholdValue;

	public IsolationRegionSelector(Integer thresholdValue) {
		this.thresholdValue = thresholdValue;
	}

	private int reset() {
		count.set(0);
		return count.get();
	}
	
	public synchronized int getIndex() {
		int cur = count.get();
		if (cur >= thresholdValue) {
			cur = reset();
		} else {
			count.incrementAndGet();
		}
		return cur;
	}
}

构建好分区选择器后,开始构建消息发送队列,构建好后放入存储集合中

java
for(int i = 0; i < isolationRegionCount; i++) {
    delayProduceQueueList.add(new DelayProduceQueue(delayQueueBasePart.getRedissonClient(),topic + "-" + i));
}

重点

topic + "-" + i 就是将主题的消息进行分片了,比如说分区数是5,那么就把这个主题下的消息分成了5份,每份由DelayProduceQueue来承载,接着把DelayProduceQueue放进了delayProduceQueueList

DelayProduceQueue消息发送队列

java
public class DelayProduceQueue extends DelayBaseQueue{
    
    private final RDelayedQueue<String> delayedQueue;
    
    public DelayProduceQueue(RedissonClient redissonClient, final String relTopic) {
        super(redissonClient, relTopic);
        this.delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
    }
    
    public void offer(String content, long delayTime, TimeUnit timeUnit) {
        delayedQueue.offer(content,delayTime,timeUnit);
    }
}
java
public class DelayBaseQueue {
    
    protected final RedissonClient redissonClient;
    protected final RBlockingQueue<String> blockingQueue;
    
    
    public DelayBaseQueue(RedissonClient redissonClient,String relTopic){
        this.redissonClient = redissonClient;
        this.blockingQueue = redissonClient.getBlockingQueue(relTopic);
    }
}

DelayProduceQueue是发送者队列,在购建时,就要传入redissonClient和relTopic了,到这步,就要调用到Redisson的RBlockingQueueRDelayedQueue

这里将Redisson的RBlockingQueueRDelayedQueue分别抽取了出来,因为发送消息用到的是RDelayedQueue,接受消息用到的是RBlockingQueue

RDelayedQueue封装进了DelayProduceQueue中,RBlockingQueue封装进了DelayBaseQueue中,

DelayProduceQueue需要继承DelayBaseQueue的原因是RDelayedQueue需要靠RBlockingQueue来得到

这段流程是使用Redisson的核心思想,我再梳理一遍这部分,希望小伙伴好好体会

  • 构建DelayProduceQueue时,先构建父类DelayBaseQueue
  • 构建DelayBaseQueue时,把redissonClient和relTopic传入,得到了RBlockingQueue,也就是消费消息需要的阻塞队列
  • 构建父类DelayBaseQueue结束后,开始构建自己DelayProduceQueue,这时当前于DelayProduceQueue自己也有了RBlockingQueue
  • 通过RBlockingQueue拿到了RDelayedQueue

到这时,RDelayedQueueDelayProduceQueue中,RBlockingQueueDelayBaseQueue中,

接着将每一个DelayProduceQueue都放进delayProduceQueueList中,这样每个delayProduceQueueList就保存了所有的DelayProduceQueue

到这时就将DelayQueueProduceCombine消息发送处理器整个部分都构建完毕了,接口下就是要开始发送消息了,也就是

java
delayQueueProduceCombine.offer(content,delayTime,timeUnit);

让我们结合发送流程再将DelayQueueProduceCombine消息发送处理器的代码看一遍

DelayQueueProduceCombine消息发送处理器

java
public class DelayQueueProduceCombine {
    
    private final IsolationRegionSelector isolationRegionSelector;
    
    private final List<DelayProduceQueue> delayProduceQueueList = new ArrayList<>();
    
    public DelayQueueProduceCombine(DelayQueueBasePart delayQueueBasePart,String topic){
        //消息的分区数
        Integer isolationRegionCount = delayQueueBasePart.getDelayQueueProperties().getIsolationRegionCount();
        //根据分区数来构建分区选择器
        isolationRegionSelector =new IsolationRegionSelector(isolationRegionCount);
        for(int i = 0; i < isolationRegionCount; i++) {
            //按照分区数来构建发送者延迟队列,然后放到存储里
            delayProduceQueueList.add(new DelayProduceQueue(delayQueueBasePart.getRedissonClient(),topic + "-" + i));
        }
    }
    /**
     * 发送消息
     * */
    public void offer(String content,long delayTime, TimeUnit timeUnit){
        //通过选择器获取分区索引
        int index = isolationRegionSelector.getIndex();
        //根据索引从集合中获取到分区的发送队列,然后发送消息
        delayProduceQueueList.get(index).offer(content, delayTime, timeUnit);
    }
}

这时候再看DelayQueueProduceCombine是不是就清晰很多了,我们接着来分析发送消息的流程

isolationRegionSelector.getIndex()

java
private int reset() {
    count.set(0);
    return count.get();
}

public synchronized int getIndex() {
    int cur = count.get();
    if (cur >= thresholdValue) {
        cur = reset();
    } else {
        count.incrementAndGet();
    }
    return cur;
}

选择器获取分区索引的原理就是轮训获取,获取一次就对索引值+1,如果达到了分区数阈值,那么再将索引值重置为0

获取到了分区索引后,用索引去集合中获取对应的发送队列DelayProduceQueue,然后执行offer发送消息

DelayProduceQueue.offer

java
public class DelayProduceQueue extends DelayBaseQueue{
    
    private final RDelayedQueue<String> delayedQueue;
    public DelayProduceQueue(RedissonClient redissonClient, final String relTopic) {
        super(redissonClient, relTopic);
        this.delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
    }
    /**
    * 发送消息
    */
    public void offer(String content, long delayTime, TimeUnit timeUnit) {
        delayedQueue.offer(content,delayTime,timeUnit);
    }
}

以上就是发送消息的流程了,建议小伙伴多认真看几遍,就会梳理清楚了,关于消费消息和整个流程的流程图,会在下篇文章讲解,可跳转到相应文档查看

组件讲解-如何实现高性能延迟队列-消费消息

更新: 2025-11-11 09:55:10
原文: https://www.yuque.com/u22210564/ykdrdh/zhgo0z2smmvnz6ah

Java 后端面试知识库