方式一:DelayQueue 延时队列
介绍:DelayQueue是JDK提供的一组实现延迟队列的API,当我们向DelayQueue队列添加元素时,会给元素一个 Delay(延迟时间)作为排序条件,排序条件最小的元素会优先放在队首,当队列的元素到了Delay的时间后,会被从队列中取出
注意:队列中可以放基本数据类型或者自定义实体类,当存放基本数据类型时,队列中的元素默认升序排列,当存放自定义实体类时,排序就需要我们根据属性值比较计算了
应用场景举例:添加三个订单入队,分别设置订单距离当前时间的5秒、10秒、15秒后取消

实现过程:要实现DelayQueue延时队列,队列中的元素需要继承Delayed接口,通过实现getDelay方法设置延期时间、通过实现compareTo方法对队列中的元素进行排序
代码示例
javaimport java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Order implements Delayed{ // 延迟时间 private long delayTime; // 订单名称 String orderName; public Order(String orderName, long period, TimeUnit unit) { this.orderName = orderName; this.delayTime = System.currentTimeMillis() + (period > 0 ? unit.toMillis(period) : 0); } // 对队列元素进行排序 @Override public int compareTo(Delayed o) { Order Order = (Order) o; long diff = this.delayTime - Order.delayTime; if (diff <= 0) { return -1; } else { return 1; } } // 设置延期时间 @Override public long getDelay(TimeUnit unit) { return delayTime - System.currentTimeMillis(); } }javaimport java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; public class Main{ public static void main(String[] args) throws Exception { Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS); Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS); Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS); // 定义队列 DelayQueue<Order> delayQueue = new DelayQueue<>(); delayQueue.put(Order1); delayQueue.put(Order2); delayQueue.put(Order3); System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); while (delayQueue.size() != 0) { // poll()获取头部元素,如果头部元素过期,则结果不为null,反之为null Order task = delayQueue.poll(); if (task != null) { System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.orderName, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); } Thread.sleep(1000); } } }
方式二:Quartz定时任务
引入pom
xml<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency>在启动类上添加 @EnableScheduling 注解开启定时任务功能
java@EnableScheduling @SpringBootApplication public class DelayqueueApplication { public static void main(String[] args) { SpringApplication.run(DelayqueueApplication.class, args); } }编写定时任务,每5秒执行一次
java@Component public class QuartzTask { //每隔五秒执行一次 @Scheduled(cron = "0/5 * * * * ? ") public void process(){ System.out.println("执行查询订单是否失效的定时任务了"); } }
方式三:使用Redis zset实现
zset简单介绍:zset(有序集合)是一个没有重复元素的集合,zset给每一个元素都关联了一个属性score,该属性是对集合中的元素进行排序的依据
zset常用命令
命令 作用 语法 示例 ZADD 添加成员 ZADD key [NX|XX] [CH] [INCR] score member [score member ...] ZADD delayqueue 1000 order 1 2000 order2 3000 orde3 ZRANGE 按索引范围获取成员 ZRANGE key start stop [WITHSCORES] ZRANGE delayqueue 0 -1 WITHSCORES
返回:所有成员及其分数,按分数升序排列ZREVRANGE 按索引范围逆序获取成员 ZREVRANGE key start stop [WITHSCORES] ZREVRANGE delayqueue 0 2 WITHSCORES
返回:分数最高的前3名成员ZRANGEBYSCORE 按分数范围获取成员 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] ZRANGEBYSCORE delayqueue 1000 2000 WITHSCORES
返回:分数在1000-2000之间的成员ZREM 删除成员 ZREM key member [member ...] ZREM delayqueue "member1"
返回:成功删除的成员数量ZCARD 获取集合成员数量 ZCARD key ZCARD delayqueue
返回:集合中的成员总数ZCOUNT 统计分数范围内的成员数量 ZCOUNT key min max ZCOUNT delayqueue 1000 1500
返回:分数在1000-1500之间的成员数量ZSCORE 获取成员分数 ZSCORE key member ZSCORE delayqueue "member1"
返回:member1的分数ZINCRBY 增加成员分数 ZINCRBY key increment member ZINCRBY delayqueue 500 "memeber1"
返回:Alice增加500后的新分数ZRANK 获取成员排名(升序) ZRANK key member ZRANK delayqueue "member1"
返回:member1的排名ZREVRANK 获取成员排名(降序) ZREVRANK key member ZREVRANK delayqueue "member1"
返回:member1在逆序排列中的排名ZREMRANGEBYRANK 按排名范围删除成员 ZREMRANGEBYRANK key start stop ZREMRANGEBYRANK delayqueue 0 2
返回:删除排名0-2的成员ZREMRANGEBYSCORE 按分数范围删除成员 ZREMRANGEBYSCORE key min max ZREMRANGEBYSCORE delayqueue 0 999
返回:删除分数在0-999之间的成员使用zset实现延迟队列的思路
生产者:将元素加入集合时,将 当前时间时间戳+元素有效期时间戳 作为score
消费者:while循环轮询查询,使用ZRANGEBYSCORE查询 0~当前时间戳 内的元素,找到了就消费,消费完毕后,从集合移除

代码示例
实体类
java@Data @NoArgsConstructor @AllArgsConstructor public class Order { private String id; private String name; }延迟队列服务类
java@Service public class DelayQueueService { @Autowired StringRedisTemplate redisTemplate; // 延迟队列名称 private static final String DELY_QUEUE_NAME = "dely_queue"; /** * 将元素加入集合 * @param order 消息体 * @param period 延迟跨度 * @param unit 时间单位 */ public Boolean push(Order order, long period, TimeUnit unit){ long score = System.currentTimeMillis() + (period > 0 ? unit.toMillis(period) : 0); String json = JSONObject.toJSONString(order); Boolean result = redisTemplate.opsForZSet().add(DELY_QUEUE_NAME, json, score); return result; } /** * 从集合中取出元素 * 根据rangeByScore获取score介于 0~当前时间戳 内的的元素 */ public List<Order> poll() { List<Message> orderList =new ArrayList<>(); try { Set<String> jsonSet = redisTemplate.opsForZSet().rangeByScore(DELY_QUEUE_NAME, 0, System.currentTimeMillis()); if (CollectionUtils.isEmpty(jsonSet)) { return null; } orderList = jsonSet.stream().map(json -> { Order order = null; try { order = JSONObject.parseObject(json, Order.class); } catch (Exception e) { e.printStackTrace(); } return order; }).collect(Collectors.toList()); } catch (Exception e) { log.error(e.toString()); } return orderList; } /** * 从集合中移除元素 */ public Long remove(Order order) { return redisTemplate.opsForZSet().remove(DELY_QUEUE_NAME, JSONObject.toJSONString(order)); } }生产者
java@Service public class DelayQueueProducer { @Autowired DelayQueueService delayQueueService; /** * 将元素加入集合 */ public Boolean push(){ Order order = new Order("1", "订单名称"); return delayQueueService.push(order, 10, TimeUnit.SECONDS); } }消费者
java@Service public class DelayQueueConsumer { @Autowired DelayQueueService delayQueueService; // 线程池数量 private int threadPoolSize = 1; @PostConstruct public void timer() { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(threadPoolSize); // 每一分钟执行一次 scheduledExecutorService.scheduleAtFixedRate(new HandleDelayQueue(), 0, 1, TimeUnit.MINUTES); } public class HandleDelayQueue implements Runnable { @Override public void run() { // 取出过期元素 List<Order> orderList = delayQueueService.pull(); for (Order order : orderList) { System.out.println("对过期订单进行业务处理..."); // 移除元素 delayQueueService.remove(order); } } } }
方式四:使用redis key过期回调
修改 redis.conf 文件,开启notify-keyspace-events
texnotify-keyspace-events ExRedis监听配置
java@Configuration public class RedisListenerConfig { @Bean RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(connectionFactory); return redisMessageListenerContainer; } }Redis过期监听方法(必须继承 KeyExpirationEventMessageListener)
java@Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) { super(redisMessageListenerContainer); } @Override public void onMessage(Message message, byte[] pattern) { String expiredKey = message.toString(); System.out.println("监听到key:" + expiredKey + "已过期"); } }
方式五:使用RabbitMQ消息队列
实现思路:使用DLX(死信交换机)+ TTL(消息超时时间)实现,假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在”蹲点“这个死信队列,消息一进入死信队列,就立马被消费了
实现步骤
- 创建一个普通的业务队列(我们称之为
normal.queue),不设置任何消费者 - 为这个
normal.queue配置一个死信交换机(dead.letter.exchange) - 当生产者发送一条消息时,我们为其设置一个 TTL(例如10秒),并将其发送到与
normal.queue绑定的业务交换机(normal.exchange) - 由于
normal.queue没有消费者,消息会在队列中静静地等待 - 10秒后,消息的 TTL 到期,它变成了"死信"
- RabbitMQ 自动将这条死信消息从
normal.queue中移除,并将其路由到预设的dead.letter.exchange dead.letter.exchange再根据其路由规则,将消息投递到最终的"死信队列"(dead.letter.queue)- 我们的消费者只监听这个死信队列。一旦收到消息,就意味着延迟时间已到,可以开始处理业务
- 创建一个普通的业务队列(我们称之为
代码实现
配置类
javaimport org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DelayedConfig { // 1. 声明普通的业务交换机和队列 public static final String NORMAL_EXCHANGE = "normal.exchange"; public static final String NORMAL_QUEUE = "normal.queue"; public static final String NORMAL_ROUTING_KEY = "normal.key"; // 2. 声明死信交换机和死信队列 public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange"; public static final String DEAD_LETTER_QUEUE = "dead.letter.queue"; public static final String DEAD_LETTER_ROUTING_KEY = "dead.key"; @Bean public DirectExchange normalExchange() { return new DirectExchange(NORMAL_EXCHANGE); } /** * x-dead-letter-exchange:指定了当队列里的消息变成死信后,应该被发往哪个交换机 * x-dead-letter-routing-key:指定了死信消息被发送到死信交换机时,使用哪个路由键。这允许我们更灵活地控制死信消息的流向 */ @Bean public Queue normalQueue() { return QueueBuilder.durable(NORMAL_QUEUE) .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE) .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY) .build(); } @Bean public Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY); } // 3. 确保死信交换机和死信队列也作为 Bean 被声明 @Bean public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE); } @Bean public Queue deadLetterQueue() { return new Queue(DEAD_LETTER_QUEUE); } @Bean public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) { // 死信队列绑定到死信交换机,使用普通队列指定的 dead-letter-routing-key return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY); } }生产者:生产者将消息发送到业务交换机
normal.exchange,并为每条消息动态设置expiration属性java@RestController @RequestMapping public class DelayController { private final RabbitTemplate rabbitTemplate; public DelayController(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @RequestMapping("/delay") public String delay(){ //发送带ttl的消息 System.out.println("发送延迟消息, 当前时间: " + new Date()); rabbitTemplate.convertAndSend("normal.exchange", "normal.key", "delay test with ttl 10s..."+new Date(),message -> { message.getMessageProperties().setExpiration("10000"); return message; }); rabbitTemplate.convertAndSend("normal.exchange", "normal.key", "delay test with ttl 20s..."+new Date(), message -> { message.getMessageProperties().setExpiration("20000"); return message; }); return "success"; } }消费者:消费者只关心最终的业务处理,所以它监听的是
dead.letter.queuejavaimport com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; @Component public class DelayConsumer { @RabbitListener(queues = "dead.letter.queue") public void ListenerDLXQueue(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n", new Date(), new String(message.getBody(),"UTF-8"), deliveryTag); } }
缺点:RabbitMQ 只会检查队列头部的消息是否过期,如果队头的消息没有过期,那么后面的消息就算已经过期了,也无法被投递到死信交换机,也就是说,如果20s的消息先入队位于队头,10s的消息后入队位于其后,等20s的消息过期被投递到死信队列后,10s的消息已经结束了,直接进行投递,但依旧会造成延迟时间被延后
改进方案:使用延迟消息插件
方式六:使用RabbitMQ的插件
安装RabbitMQ
- Windows:windows安装RabbitMQ
- Ubuntu:
安装插件
Windows:我的windows上的RabbitMQ版本是4.1.4,所以下载插件时选择v4.1.0应该没问题
插件版本下载

将下载的.ez文件复制到RabbitMQ安装文件夹下的plugins文件夹下
进入RabbitMQ安装文件夹下的sbin文件夹执行命令启用插件
shellrabbitmq-plugins enable rabbitmq_delayed_message_exchange-4.1.0如果上述命令安装提示安装失败,就去掉版本号再次执行,应该就能安装成功了
shellrabbitmq-plugins enable rabbitmq_delayed_message_exchange
验证是否安装成功:重启RabbitMQ(必须!!!),进入管理端 http://localhost:15672, 进入exchanges选项卡 → Add a new exchange → Type,查看是否存在 x-delayed-message 的选项,有的话说明已经安装成功了

Ubuntu环境
查看RabbitMQ版本
shellsudo rabbitmqctl status | grep "RabbitMQ version" # 或 sudo rabbitmqctl version
下载与RabbitMQ版本匹配的插件并上传到 /usr/lib/rabbitmq/lib/rabbitmq_server-<你的版本version>/plugins/ 目录,我的RabbitMQ版本是3.12.1,所以插件选择v3.12.0
shell# 上传附件到服务器命令,备忘 scp "D:\workSpace\5_practical_tools_download\edge\rabbitmq_delayed_message_exchange-3.12.0.ez" root@服务器ip:/usr/lib/rabbitmq/lib/rabbitmq_server-3.12.1/plugins进入 /usr/lib/rabbitmq/lib/rabbitmq_server-<你的版本version>/sbin 目录下,执行下面命令启用延迟队列插件
shellsudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
验证是否安装成功:重启RabbitMQ(必须!!!),进入管理端 http://服务器ip:15672, 进入exchanges选项卡 → Add a new exchange → Type,查看是否存在 x-delayed-message 的选项,有的话说明已经安装成功了
shellsudo service rabbitmq-server restart
代码实现
定义队列和交换机
javaimport org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue delayQueue() { return new Queue("delay_queue", true); } @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("delay_exchange", "x-delayed-message", true, false, args); } @Bean public Binding binding(Queue delayQueue, CustomExchange delayExchange) { return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay_key").noargs(); } }发送延迟消息
javaimport org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class DelayMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendDelayMessage(String message, int delay) { rabbitTemplate.convertAndSend("delay_exchange", "delay_key", message, msg -> { msg.getMessageProperties().setDelay(delay); return msg; }); } }消费延迟消息
javaimport org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DelayMessageListener { @RabbitListener(queues = "delay_queue") public void handleMessage(String message) { System.out.println("Received delayed message: " + message); } }
方式七:Netty时间轮
时间轮图示


HashedWheelTimer:具体的定时任务的时间执行精度可以通过调节 HashedWheelTimer 构造方法的时间间隔的大小来进行调节,在大多数网络应用的情况下,由于 IO 延迟的存在,并不会严格要求具体的时间执行精度,所以默认的 100ms 时间间隔可以满足大多数的情况,不需要再花精力去调节该时间精度
代码实现
pom依赖
xml<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.45.Final</version> </dependency>创建时间轮
java/** * 创建一个包含 512 个时间格的时间轮,每个时间轮的轮片时间间隔是100毫秒 */ @Bean("hashedWheelTimer") public HashedWheelTimer hashedWheelTimer(){ return new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512); }向时间轮中添加延时任务
java@Service public class OrderServiceImpl implements IOrderService { @Autowired private HashedWheelTimer hashedWheelTimer; @Override void addOrder(Order order) { // 下单业务处理... // 向时间轮中添加延时任务,有效期30分钟 hashedWheelTimer.newTimeout(task -> { // 注意这里使用异步任务线程池或者开启线程进行订单取消任务的处理 cancelOrder(order); }, 30, TimeUnit.MINUTES); } /** * 使用默认线程池 */ @Async public void cancelOrder(Order order){ // 订单过期业务处理... } }
优缺点:优点就是,相对于使用JDK的DelayQueue,其算法上具有优势,执行性能相对好一些。其缺点就是所有的延时任务以及延时触发的管理,都是在单个应用服务的内存中进行的,一旦该应用服务发生故障重启服务,时间轮任务数据将全部丢失,这一缺点和DelayQueue是一样的
参考文档
- 一口气说出 6种 延时队列的实现方法,面试官也得服-腾讯云开发者社区-腾讯云
- Redis实战篇:巧用zset实现延迟队列基于redis zset实现延迟队列,redis真的是YYDS,项目中用到延迟 - 掘金
- RabbitMQ 实现延迟队列的两种方式_rabbitmq实现延迟队列的两种方式-CSDN博客
- 【图文教程】Windows给Rabbitmq安装rabbitmq_delayed_meaage_exchange_windows rabbitmq-delayed-message-exchange-CSDN博客
- 基于 RabbitMQ 死信队列+TTL 实现延迟消息+延迟插件基本使用 - 技术栈
- Netty时间轮-HashedWheelTimer - 杨岂 - 博客园
- (14 封私信 / 1 条消息) 延时任务-基于netty时间轮算法实现 - 知乎