Skip to content

方式一:DelayQueue 延时队列

  • 介绍:DelayQueue是JDK提供的一组实现延迟队列的API,当我们向DelayQueue队列添加元素时,会给元素一个 Delay(延迟时间)作为排序条件,排序条件最小的元素会优先放在队首,当队列的元素到了Delay的时间后,会被从队列中取出

  • 注意:队列中可以放基本数据类型或者自定义实体类,当存放基本数据类型时,队列中的元素默认升序排列,当存放自定义实体类时,排序就需要我们根据属性值比较计算了

  • 应用场景举例:添加三个订单入队,分别设置订单距离当前时间的5秒、10秒、15秒后取消

    img

  • 实现过程:要实现DelayQueue延时队列,队列中的元素需要继承Delayed接口,通过实现getDelay方法设置延期时间、通过实现compareTo方法对队列中的元素进行排序

  • 代码示例

    java
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class Order implements Delayed{
    
        // 延迟时间
        private long delayTime;
        // 订单名称
        String orderName;
    
        public Order(String orderName, long period, TimeUnit unit) {
            this.orderName = orderName;
            this.delayTime = System.currentTimeMillis() + (period > 0 ? unit.toMillis(period) : 0);
        }
    
        // 对队列元素进行排序
        @Override
        public int compareTo(Delayed o) {
            Order Order = (Order) o;
            long diff = this.delayTime - Order.delayTime;
            if (diff <= 0) {
                return -1;
            } else {
                return 1;
            }
        }
    
        // 设置延期时间
        @Override
        public long getDelay(TimeUnit unit) {
            return delayTime - System.currentTimeMillis();
        }
    
    }
    java
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.TimeUnit;
    
    public class Main{
        public static void main(String[] args) throws Exception {
            Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
            Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
            Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
    
            // 定义队列
            DelayQueue<Order> delayQueue = new DelayQueue<>();
            delayQueue.put(Order1);
            delayQueue.put(Order2);
            delayQueue.put(Order3);
    
            System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            while (delayQueue.size() != 0) {
                // poll()获取头部元素,如果头部元素过期,则结果不为null,反之为null
                Order task = delayQueue.poll();
                if (task != null) {
                    System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.orderName, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                }
                Thread.sleep(1000);
            }
    
        }
    }

方式二:Quartz定时任务

  1. 引入pom

    xml
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-quartz</artifactId>
    </dependency>
  2. 在启动类上添加 @EnableScheduling 注解开启定时任务功能

    java
    @EnableScheduling
    @SpringBootApplication
    public class DelayqueueApplication {
        public static void main(String[] args) {
            SpringApplication.run(DelayqueueApplication.class, args);
        }
    }
  3. 编写定时任务,每5秒执行一次

    java
    @Component
    public class QuartzTask {
        //每隔五秒执行一次
        @Scheduled(cron = "0/5 * * * * ? ")
        public void process(){
            System.out.println("执行查询订单是否失效的定时任务了");
        }
    }

方式三:使用Redis zset实现

  • zset简单介绍:zset(有序集合)是一个没有重复元素的集合,zset给每一个元素都关联了一个属性score,该属性是对集合中的元素进行排序的依据

  • zset常用命令

    命令作用语法示例
    ZADD添加成员ZADD key [NX|XX] [CH] [INCR] score member [score member ...]ZADD delayqueue 1000 order 1 2000 order2 3000 orde3
    ZRANGE按索引范围获取成员ZRANGE key start stop [WITHSCORES]ZRANGE delayqueue 0 -1 WITHSCORES
    返回:所有成员及其分数,按分数升序排列
    ZREVRANGE按索引范围逆序获取成员ZREVRANGE key start stop [WITHSCORES]ZREVRANGE delayqueue 0 2 WITHSCORES
    返回:分数最高的前3名成员
    ZRANGEBYSCORE按分数范围获取成员ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]ZRANGEBYSCORE delayqueue 1000 2000 WITHSCORES
    返回:分数在1000-2000之间的成员
    ZREM删除成员ZREM key member [member ...]ZREM delayqueue "member1"
    返回:成功删除的成员数量
    ZCARD获取集合成员数量ZCARD keyZCARD delayqueue
    返回:集合中的成员总数
    ZCOUNT统计分数范围内的成员数量ZCOUNT key min maxZCOUNT delayqueue 1000 1500
    返回:分数在1000-1500之间的成员数量
    ZSCORE获取成员分数ZSCORE key memberZSCORE delayqueue "member1"
    返回:member1的分数
    ZINCRBY增加成员分数ZINCRBY key increment memberZINCRBY delayqueue 500 "memeber1"
    返回:Alice增加500后的新分数
    ZRANK获取成员排名(升序)ZRANK key memberZRANK delayqueue "member1"
    返回:member1的排名
    ZREVRANK获取成员排名(降序)ZREVRANK key memberZREVRANK delayqueue "member1"
    返回:member1在逆序排列中的排名
    ZREMRANGEBYRANK按排名范围删除成员ZREMRANGEBYRANK key start stopZREMRANGEBYRANK delayqueue 0 2
    返回:删除排名0-2的成员
    ZREMRANGEBYSCORE按分数范围删除成员ZREMRANGEBYSCORE key min maxZREMRANGEBYSCORE delayqueue 0 999
    返回:删除分数在0-999之间的成员
  • 使用zset实现延迟队列的思路

    • 生产者:将元素加入集合时,将 当前时间时间戳+元素有效期时间戳 作为score

    • 消费者:while循环轮询查询,使用ZRANGEBYSCORE查询 0~当前时间戳 内的元素,找到了就消费,消费完毕后,从集合移除

      image-20251011094602596

  • 代码示例

    • 实体类

      java
      @Data
      @NoArgsConstructor
      @AllArgsConstructor
      public class Order {
          private String id;
          private String name;
      }
    • 延迟队列服务类

      java
      @Service
      public class DelayQueueService {
          @Autowired
          StringRedisTemplate redisTemplate;
      
          // 延迟队列名称
          private static final String DELY_QUEUE_NAME = "dely_queue";
      
          /**
           * 将元素加入集合
           * @param order 消息体
           * @param period 延迟跨度
           * @param unit 时间单位
           */
          public Boolean push(Order order, long period, TimeUnit unit){
              long score = System.currentTimeMillis() + (period > 0 ? unit.toMillis(period) : 0);
              String json = JSONObject.toJSONString(order);
              Boolean result = redisTemplate.opsForZSet().add(DELY_QUEUE_NAME, json, score);
              return result;
          }
      
          /**
           * 从集合中取出元素
           * 根据rangeByScore获取score介于 0~当前时间戳 内的的元素
           */
          public List<Order> poll() {
              List<Message> orderList  =new ArrayList<>();
              
              try {
                  Set<String> jsonSet = redisTemplate.opsForZSet().rangeByScore(DELY_QUEUE_NAME, 0, System.currentTimeMillis());
                  if (CollectionUtils.isEmpty(jsonSet)) {
                      return null;
                  }
                  orderList = jsonSet.stream().map(json -> {
                      Order order = null;
                      try {
                          order = JSONObject.parseObject(json, Order.class);
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                      return order;
                  }).collect(Collectors.toList());
              } catch (Exception e) {
                  log.error(e.toString());
              }
              
              return orderList;
          }
          
          /**
           * 从集合中移除元素
           */
          public Long remove(Order order) {
              return redisTemplate.opsForZSet().remove(DELY_QUEUE_NAME, JSONObject.toJSONString(order));
          }
      
      }
    • 生产者

      java
      @Service
      public class DelayQueueProducer {
          @Autowired
          DelayQueueService delayQueueService;
      
          /**
           * 将元素加入集合
           */
          public Boolean push(){
              Order order = new Order("1", "订单名称");
              
              return delayQueueService.push(order, 10, TimeUnit.SECONDS);
          }
      }
    • 消费者

      java
      @Service
      public class DelayQueueConsumer {
          @Autowired
          DelayQueueService delayQueueService;
          
          // 线程池数量
          private int threadPoolSize = 1;
          
          @PostConstruct
          public void timer() {
              ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(threadPoolSize);
              
              // 每一分钟执行一次
              scheduledExecutorService.scheduleAtFixedRate(new HandleDelayQueue(), 0, 1, TimeUnit.MINUTES);
          }
          
          public class HandleDelayQueue implements Runnable {
              @Override
              public void run() {
                  // 取出过期元素
                  List<Order> orderList = delayQueueService.pull();
                  for (Order order : orderList) {
                      System.out.println("对过期订单进行业务处理...");
                      
                      // 移除元素
                      delayQueueService.remove(order);
                  }
              }
          }
      }

方式四:使用redis key过期回调

  1. 修改 redis.conf 文件,开启notify-keyspace-events

    tex
    notify-keyspace-events Ex
  2. Redis监听配置

    java
    @Configuration
    public class RedisListenerConfig {
        @Bean
        RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
    
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(connectionFactory);
            return redisMessageListenerContainer;
        }
    }
  3. Redis过期监听方法(必须继承 KeyExpirationEventMessageListener

    java
    @Component
    public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
    
        public RedisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
            super(redisMessageListenerContainer);
        }
        
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String expiredKey = message.toString();
            System.out.println("监听到key:" + expiredKey + "已过期");
        }
    }

方式五:使用RabbitMQ消息队列

  • 实现思路:使用DLX(死信交换机)+ TTL(消息超时时间)实现,假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在”蹲点“这个死信队列,消息一进入死信队列,就立马被消费了

  • 实现步骤

    1. 创建一个普通的业务队列(我们称之为 normal.queue),不设置任何消费者
    2. 为这个 normal.queue 配置一个死信交换机(dead.letter.exchange
    3. 当生产者发送一条消息时,我们为其设置一个 TTL(例如10秒),并将其发送到与 normal.queue 绑定的业务交换机(normal.exchange
    4. 由于 normal.queue 没有消费者,消息会在队列中静静地等待
    5. 10秒后,消息的 TTL 到期,它变成了"死信"
    6. RabbitMQ 自动将这条死信消息从 normal.queue 中移除,并将其路由到预设的 dead.letter.exchange
    7. dead.letter.exchange 再根据其路由规则,将消息投递到最终的"死信队列"(dead.letter.queue
    8. 我们的消费者只监听这个死信队列。一旦收到消息,就意味着延迟时间已到,可以开始处理业务
  • 代码实现

    • 配置类

      java
      import org.springframework.amqp.core.*;  
      import org.springframework.context.annotation.Bean;  
      import org.springframework.context.annotation.Configuration;  
      
      @Configuration  
      public class DelayedConfig {  
          // 1. 声明普通的业务交换机和队列  
          public static final String NORMAL_EXCHANGE = "normal.exchange";  
          public static final String NORMAL_QUEUE = "normal.queue";  
          public static final String NORMAL_ROUTING_KEY = "normal.key";  
      
          // 2. 声明死信交换机和死信队列  
          public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";  
          public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";  
          public static final String DEAD_LETTER_ROUTING_KEY = "dead.key";  
      
          @Bean  
          public DirectExchange normalExchange() {  
              return new DirectExchange(NORMAL_EXCHANGE);  
          }  
      
          /**
          * x-dead-letter-exchange:指定了当队列里的消息变成死信后,应该被发往哪个交换机
          * x-dead-letter-routing-key:指定了死信消息被发送到死信交换机时,使用哪个路由键。这允许我们更灵活地控制死信消息的流向
          */
          @Bean  
          public Queue normalQueue() {  
              return QueueBuilder.durable(NORMAL_QUEUE)  
                  .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)  
                  .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY)  
                  .build();  
          }  
      
          @Bean  
          public Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {  
              return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);  
          }  
      
          // 3. 确保死信交换机和死信队列也作为 Bean 被声明  
          @Bean  
          public DirectExchange deadLetterExchange() {  
              return new DirectExchange(DEAD_LETTER_EXCHANGE);  
          }  
      
          @Bean  
          public Queue deadLetterQueue() {  
              return new Queue(DEAD_LETTER_QUEUE);  
          }  
      
          @Bean  
          public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {  
              // 死信队列绑定到死信交换机,使用普通队列指定的 dead-letter-routing-key
              return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY);  
          }  
      }
    • 生产者:生产者将消息发送到业务交换机 normal.exchange,并为每条消息动态设置 expiration 属性

      java
      @RestController  
      @RequestMapping  
      public class DelayController {  
          private final RabbitTemplate rabbitTemplate;  
      
          public DelayController(RabbitTemplate rabbitTemplate) {  
              this.rabbitTemplate = rabbitTemplate;  
          }  
      
          @RequestMapping("/delay")  
          public String delay(){  
              //发送带ttl的消息  
              System.out.println("发送延迟消息, 当前时间: " + new Date());
              rabbitTemplate.convertAndSend("normal.exchange", "normal.key",  
                                            "delay test with ttl 10s..."+new Date(),message -> {  
                                                message.getMessageProperties().setExpiration("10000");  
                                                return message;  
                                            });  
              rabbitTemplate.convertAndSend("normal.exchange", "normal.key",  
                                            "delay test with ttl 20s..."+new Date(), message -> {  
                                                message.getMessageProperties().setExpiration("20000");  
                                                return message;  
                                            });  
              return "success";  
          }  
      }
    • 消费者:消费者只关心最终的业务处理,所以它监听的是 dead.letter.queue

      java
      import com.rabbitmq.client.Channel;  
      import org.springframework.amqp.core.Message;  
      import org.springframework.amqp.rabbit.annotation.RabbitListener;  
      import org.springframework.stereotype.Component;  
      
      import java.util.Date;  
      @Component  
      public class DelayConsumer {  
          @RabbitListener(queues = "dead.letter.queue")  
          public void ListenerDLXQueue(Message message, Channel channel) throws Exception {  
              long deliveryTag = message.getMessageProperties().getDeliveryTag();  
              System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n", new Date(), new String(message.getBody(),"UTF-8"),  
                                deliveryTag);  
          }  
      }
  • 缺点:RabbitMQ 只会检查队列头部的消息是否过期,如果队头的消息没有过期,那么后面的消息就算已经过期了,也无法被投递到死信交换机,也就是说,如果20s的消息先入队位于队头,10s的消息后入队位于其后,等20s的消息过期被投递到死信队列后,10s的消息已经结束了,直接进行投递,但依旧会造成延迟时间被延后

  • 改进方案:使用延迟消息插件

方式六:使用RabbitMQ的插件

  • 安装RabbitMQ

  • 插件地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange

  • 安装插件

    • Windows:我的windows上的RabbitMQ版本是4.1.4,所以下载插件时选择v4.1.0应该没问题

      • 插件版本下载

        image-20251011145355176

      • 将下载的.ez文件复制到RabbitMQ安装文件夹下的plugins文件夹下

      • 进入RabbitMQ安装文件夹下的sbin文件夹执行命令启用插件

        shell
        rabbitmq-plugins enable rabbitmq_delayed_message_exchange-4.1.0

        如果上述命令安装提示安装失败,就去掉版本号再次执行,应该就能安装成功了 image-20251011150518431

        shell
        rabbitmq-plugins enable rabbitmq_delayed_message_exchange

        image-20251011150527386

      • 验证是否安装成功:重启RabbitMQ(必须!!!),进入管理端 http://localhost:15672, 进入exchanges选项卡 → Add a new exchange → Type,查看是否存在 x-delayed-message 的选项,有的话说明已经安装成功了

        image-20251011151010385

    • Ubuntu环境

      • 查看RabbitMQ版本

        shell
        sudo rabbitmqctl status | grep "RabbitMQ version"
        # 或
        sudo rabbitmqctl version

        image-20251011155627104

      • 下载与RabbitMQ版本匹配的插件并上传到 /usr/lib/rabbitmq/lib/rabbitmq_server-<你的版本version>/plugins/ 目录,我的RabbitMQ版本是3.12.1,所以插件选择v3.12.0

        image-20251011160757464

        shell
        # 上传附件到服务器命令,备忘
        scp "D:\workSpace\5_practical_tools_download\edge\rabbitmq_delayed_message_exchange-3.12.0.ez" root@服务器ip:/usr/lib/rabbitmq/lib/rabbitmq_server-3.12.1/plugins
      • 进入 /usr/lib/rabbitmq/lib/rabbitmq_server-<你的版本version>/sbin 目录下,执行下面命令启用延迟队列插件

        shell
        sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange

        image-20251011160910740

      • 验证是否安装成功:重启RabbitMQ(必须!!!),进入管理端 http://服务器ip:15672, 进入exchanges选项卡 → Add a new exchange → Type,查看是否存在 x-delayed-message 的选项,有的话说明已经安装成功了

        shell
        sudo service rabbitmq-server restart

        image-20251011161035678

  • 代码实现

    • 定义队列和交换机

      java
      import org.springframework.amqp.core.*;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      @Configuration
      public class RabbitConfig {
          @Bean
          public Queue delayQueue() {
              return new Queue("delay_queue", true);
          }
          @Bean
          public CustomExchange delayExchange() {
              Map<String, Object> args = new HashMap<>();
              args.put("x-delayed-type", "direct");
              return new CustomExchange("delay_exchange", "x-delayed-message", true, false, args);
          }
          @Bean
          public Binding binding(Queue delayQueue, CustomExchange delayExchange) {
              return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay_key").noargs();
          }
      }
    • 发送延迟消息

      java
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;
      @Component
      public class DelayMessageSender {
          @Autowired
          private RabbitTemplate rabbitTemplate;
          public void sendDelayMessage(String message, int delay) {
              rabbitTemplate.convertAndSend("delay_exchange", "delay_key", message, msg -> {
                  msg.getMessageProperties().setDelay(delay);
                  return msg;
              });
          }
      }
    • 消费延迟消息

      java
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      @Component
      public class DelayMessageListener {
          @RabbitListener(queues = "delay_queue")
          public void handleMessage(String message) {
              System.out.println("Received delayed message: " + message);
          }
      }

方式七:Netty时间轮

  • 时间轮图示

    img

    img

  • HashedWheelTimer:具体的定时任务的时间执行精度可以通过调节 HashedWheelTimer 构造方法的时间间隔的大小来进行调节,在大多数网络应用的情况下,由于 IO 延迟的存在,并不会严格要求具体的时间执行精度,所以默认的 100ms 时间间隔可以满足大多数的情况,不需要再花精力去调节该时间精度

  • 代码实现

    • pom依赖

      xml
      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
          <version>4.1.45.Final</version>
      </dependency>
    • 创建时间轮

      java
      /**
      * 创建一个包含 512 个时间格的时间轮,每个时间轮的轮片时间间隔是100毫秒
      */
      @Bean("hashedWheelTimer")
      public HashedWheelTimer hashedWheelTimer(){
          return new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
      }
    • 向时间轮中添加延时任务

      java
      @Service
      public class OrderServiceImpl implements IOrderService {
      
          @Autowired
          private HashedWheelTimer hashedWheelTimer;
      
          @Override
          void addOrder(Order order) {
              // 下单业务处理...
      
              // 向时间轮中添加延时任务,有效期30分钟
              hashedWheelTimer.newTimeout(task -> {
                  // 注意这里使用异步任务线程池或者开启线程进行订单取消任务的处理
                  cancelOrder(order);
              }, 30, TimeUnit.MINUTES);
      
          }
          
          /**
          * 使用默认线程池
          */
          @Async
          public void cancelOrder(Order order){
              // 订单过期业务处理...
          }
      }
  • 优缺点:优点就是,相对于使用JDK的DelayQueue,其算法上具有优势,执行性能相对好一些。其缺点就是所有的延时任务以及延时触发的管理,都是在单个应用服务的内存中进行的,一旦该应用服务发生故障重启服务,时间轮任务数据将全部丢失,这一缺点和DelayQueue是一样的

参考文档

MIT版权,未经许可禁止任何形式的转载