Skip to content

RabbitMQ架构示意图

image.png

  • publisher:消息的生产者,也就是发送消息的一方
  • consumer:消息的消费者,也就是消费消息的一方
  • exchange:交换机,生产者发送的消息由交换机决定投递到哪个或哪几个队列,只负责路由消息到队列,没有存储消息的能力
  • queue:队列,生产者发送的消息的存储位置,等待消费者处理
  • virtual host:虚拟主机,起到数据隔离的作用,不同的虚拟主机的交换机、队列互不干扰,通常情况下,给所有使用同一个RabbitMQ的项目创建他们自己的用户,在用户下再创建属于各自的虚拟主机

配置文件详解

生产者

yaml
spring:
  rabbitmq:
    host: 127.0.0.1           # RabbitMQ 服务主机地址
    port: 5672                 # RabbitMQ 服务端口,默认为5672
    username: admin            # 连接用户名
    password: 你的密码          # 连接密码
    virtual-host: /             # 虚拟主机,类似于命名空间,默认为/
    # 开启生产者确认回调
    publisher-confirm-type: correlated  
    # 开启消息路由失败回调
    publisher-returns: true             
    template:
      # 定义消息路由失败时的策略。true: 调用ReturnCallback;false: 直接丢弃消息
      mandatory: true

消费者

yaml
spring:
  rabbitmq:
    host: 127.0.0.1           # RabbitMQ 服务主机地址
    port: 5672                 # RabbitMQ 服务端口,默认为5672
    username: admin            # 连接用户名
    password: 你的密码          # 连接密码
    virtual-host: /             # 虚拟主机,类似于命名空间,默认为/
    listener:
      simple:
        # 消费者确认模式
        acknowledge-mode: auto        
        # 并发消费者数量配置
        concurrency: 5                
        max-concurrency: 10           
        # 预取消息数量
        prefetch: 10                  
        # 消费失败重试机制
        retry:
          enabled: true               
          max-attempts: 3             
          initial-interval: 1000      
          multiplier: 2.0

基础消息模型

  • 定义:一个生产者将消息发送到一个交换机,然后交换机将消息路由到一个队列。一个消费者从队列中获取并消费消息。这种模型适用于单个消费者处理消息的情况

  • 代码示例

    java
    // 生产者
    package com.itheima.publisher.amqp;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    public class SpringAmqpTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSimpleQueue() {
            // 队列名称
            String queueName = "simple.queue";
            // 消息
            String message = "hello, spring amqp!";
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message);
        }
    }
    java
    // 消费者
    package com.itheima.consumer.listener;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class SpringRabbitListener {
    
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueueMessage(String msg) throws InterruptedException {
            System.out.println("spring 消费者接收到消息:【" + msg + "】");
        }
    }

工作队列模型(Work Queue)

  • 定义:当一个消费者的处理速度慢于生产者时,如果使用基础消息模型,会导致消息堆积在队列中。Work Queue模型通过增加消费者数量来解决这个问题。

  • 代码示例

    java
    // 生产者
    // 向队列中不停发送消息,模拟消息堆积
    @Test
    public void testWorkQueue() throws InterruptedException {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, message_";
        for (int i = 0; i < 50; i++) {
            // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }
    java
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }
    
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }
    tex
    消费者1接收到消息:【hello, message_0】21:06:00.869555300
    消费者2........接收到消息:【hello, message_1】21:06:00.884518
    消费者1接收到消息:【hello, message_2】21:06:00.907454400
    消费者1接收到消息:【hello, message_4】21:06:00.953332100
    消费者1接收到消息:【hello, message_6】21:06:00.997867300
    消费者1接收到消息:【hello, message_8】21:06:01.042178700
    消费者2........接收到消息:【hello, message_3】21:06:01.086478800
    消费者1接收到消息:【hello, message_10】21:06:01.087476600
    消费者1接收到消息:【hello, message_12】21:06:01.132578300
    消费者1接收到消息:【hello, message_14】21:06:01.175851200
    消费者1接收到消息:【hello, message_16】21:06:01.218533400
    消费者1接收到消息:【hello, message_18】21:06:01.261322900
    消费者2........接收到消息:【hello, message_5】21:06:01.287003700
    消费者1接收到消息:【hello, message_20】21:06:01.304412400
    消费者1接收到消息:【hello, message_22】21:06:01.349950100
    消费者1接收到消息:【hello, message_24】21:06:01.394533900
    消费者1接收到消息:【hello, message_26】21:06:01.439876500
    消费者1接收到消息:【hello, message_28】21:06:01.482937800
    消费者2........接收到消息:【hello, message_7】21:06:01.488977100
    消费者1接收到消息:【hello, message_30】21:06:01.526409300
    消费者1接收到消息:【hello, message_32】21:06:01.572148
    消费者1接收到消息:【hello, message_34】21:06:01.618264800
    消费者1接收到消息:【hello, message_36】21:06:01.660780600
    消费者2........接收到消息:【hello, message_9】21:06:01.689189300
    消费者1接收到消息:【hello, message_38】21:06:01.705261
    消费者1接收到消息:【hello, message_40】21:06:01.746927300
    消费者1接收到消息:【hello, message_42】21:06:01.789835
    消费者1接收到消息:【hello, message_44】21:06:01.834393100
    消费者1接收到消息:【hello, message_46】21:06:01.875312100
    消费者2........接收到消息:【hello, message_11】21:06:01.889969500
    消费者1接收到消息:【hello, message_48】21:06:01.920702500
    消费者2........接收到消息:【hello, message_13】21:06:02.090725900
    消费者2........接收到消息:【hello, message_15】21:06:02.293060600
    消费者2........接收到消息:【hello, message_17】21:06:02.493748
    消费者2........接收到消息:【hello, message_19】21:06:02.696635100
    消费者2........接收到消息:【hello, message_21】21:06:02.896809700
    消费者2........接收到消息:【hello, message_23】21:06:03.099533400
    消费者2........接收到消息:【hello, message_25】21:06:03.301446400
    消费者2........接收到消息:【hello, message_27】21:06:03.504999100
    消费者2........接收到消息:【hello, message_29】21:06:03.705702500
    消费者2........接收到消息:【hello, message_31】21:06:03.906601200
    消费者2........接收到消息:【hello, message_33】21:06:04.108118500
    消费者2........接收到消息:【hello, message_35】21:06:04.308945400
    消费者2........接收到消息:【hello, message_37】21:06:04.511547700
    消费者2........接收到消息:【hello, message_39】21:06:04.714038400
    消费者2........接收到消息:【hello, message_41】21:06:04.916192700
    消费者2........接收到消息:【hello, message_43】21:06:05.116286400
    消费者2........接收到消息:【hello, message_45】21:06:05.318055100
    消费者2........接收到消息:【hello, message_47】21:06:05.520656400
    消费者2........接收到消息:【hello, message_49】21:06:05.723106700
  • 注意:默认情况下,如果多个消费者绑定到同一个queue且消费者的处理速度并不相同时,消息依旧会均分给每一个消费者,这就可能导致1个消费者空闲、1个消费者忙碌的情况,针对这种情况,可以修改消费者服务的application.yml文件,添加下面的配置

    yml
    spring:
      rabbitmq:
        listener:
          simple:
            prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    tex
    消费者1接收到消息:【hello, message_0】21:12:51.659664200
    消费者2........接收到消息:【hello, message_1】21:12:51.680610
    消费者1接收到消息:【hello, message_2】21:12:51.703625
    消费者1接收到消息:【hello, message_3】21:12:51.724330100
    消费者1接收到消息:【hello, message_4】21:12:51.746651100
    消费者1接收到消息:【hello, message_5】21:12:51.768401400
    消费者1接收到消息:【hello, message_6】21:12:51.790511400
    消费者1接收到消息:【hello, message_7】21:12:51.812559800
    消费者1接收到消息:【hello, message_8】21:12:51.834500600
    消费者1接收到消息:【hello, message_9】21:12:51.857438800
    消费者1接收到消息:【hello, message_10】21:12:51.880379600
    消费者2........接收到消息:【hello, message_11】21:12:51.899327100
    消费者1接收到消息:【hello, message_12】21:12:51.922828400
    消费者1接收到消息:【hello, message_13】21:12:51.945617400
    消费者1接收到消息:【hello, message_14】21:12:51.968942500
    消费者1接收到消息:【hello, message_15】21:12:51.992215400
    消费者1接收到消息:【hello, message_16】21:12:52.013325600
    消费者1接收到消息:【hello, message_17】21:12:52.035687100
    消费者1接收到消息:【hello, message_18】21:12:52.058188
    消费者1接收到消息:【hello, message_19】21:12:52.081208400
    消费者2........接收到消息:【hello, message_20】21:12:52.103406200
    消费者1接收到消息:【hello, message_21】21:12:52.123827300
    消费者1接收到消息:【hello, message_22】21:12:52.146165100
    消费者1接收到消息:【hello, message_23】21:12:52.168828300
    消费者1接收到消息:【hello, message_24】21:12:52.191769500
    消费者1接收到消息:【hello, message_25】21:12:52.214839100
    消费者1接收到消息:【hello, message_26】21:12:52.238998700
    消费者1接收到消息:【hello, message_27】21:12:52.259772600
    消费者1接收到消息:【hello, message_28】21:12:52.284131800
    消费者2........接收到消息:【hello, message_29】21:12:52.306190600
    消费者1接收到消息:【hello, message_30】21:12:52.325315800
    消费者1接收到消息:【hello, message_31】21:12:52.347012500
    消费者1接收到消息:【hello, message_32】21:12:52.368508600
    消费者1接收到消息:【hello, message_33】21:12:52.391785100
    消费者1接收到消息:【hello, message_34】21:12:52.416383800
    消费者1接收到消息:【hello, message_35】21:12:52.439019
    消费者1接收到消息:【hello, message_36】21:12:52.461733900
    消费者1接收到消息:【hello, message_37】21:12:52.485990
    消费者1接收到消息:【hello, message_38】21:12:52.509219900
    消费者2........接收到消息:【hello, message_39】21:12:52.523683400
    消费者1接收到消息:【hello, message_40】21:12:52.547412100
    消费者1接收到消息:【hello, message_41】21:12:52.571191800
    消费者1接收到消息:【hello, message_42】21:12:52.593024600
    消费者1接收到消息:【hello, message_43】21:12:52.616731800
    消费者1接收到消息:【hello, message_44】21:12:52.640317
    消费者1接收到消息:【hello, message_45】21:12:52.663111100
    消费者1接收到消息:【hello, message_46】21:12:52.686727
    消费者1接收到消息:【hello, message_47】21:12:52.709266500
    消费者2........接收到消息:【hello, message_48】21:12:52.725884900
    消费者1接收到消息:【hello, message_49】21:12:52.746299900

订阅模型-Fanout交换机

  • 定义:在这种模型中,生产者将消息发送到一个交换机,然后交换机将消息路由到与交换机绑定每一个队列中。多个消费者订阅这些队列并消费消息。

    image.png

  • 代码示例

    java
    // 生产者
    @Test
    public void testFanoutExchange() {
        // 交换机名称
        String exchangeName = "hmall.fanout";
        // 消息
        String message = "hello, everyone!";
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
    java
    // 消费者
    // 根据规则,下面两个都能收到消息
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }
    
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
    }

订阅模型-Direct交换机

  • 定义:在这种模型中,队列和交换机的绑定不再是任意的了,而是相互之间指定一个RoutingKey(路由key),生产者向交换机发送消息时也必须指定RoutingKey,交换机收到消息后,根据RoutingKey将消息重定向到队列

    image.png

  • 代码示例

    java
    // 生产者
    @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "hmall.direct";
        // 消息
        String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }
    java
    // 消费者
    // 根据规则,下面两个都能收到消息
    @RabbitListener(queues = "direct.queue1")
    public void listenDirectQueue1(String msg) {
        System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
    }
    
    @RabbitListener(queues = "direct.queue2")
    public void listenDirectQueue2(String msg) {
        System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
    }

订阅模型-Topic

  • 定义:和Direct一样,都是根据RoutingKey把消息路由到绑定的队列中,但是Topic交换机可以在与队列绑定RoutingKey时使用通配符

    image.png

  • 通配符规则:

    • #:匹配一个或多个词
    • *:只匹配一个词
  • 举例:

    • item.#:能够匹配 item.spu.insert 或者 item.spu
    • item.*:只能匹配 item.spu
  • 代码示例

    java
    // 生产者
    @Test
    public void testSendTopicExchange() {
        // 交换机名称
        String exchangeName = "hmall.topic";
        // 消息
        String message = "喜报!孙悟空大战哥斯拉,胜!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    }
    java
    // 消费者
    // 根据规则,下面两个都能收到消息
    @RabbitListener(queues = "topic.queue1")
    public void listenTopicQueue1(String msg){
        System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
    }
    
    @RabbitListener(queues = "topic.queue2")
    public void listenTopicQueue2(String msg){
        System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
    }

Java代码声明队列和交换机

  • Fanout示例

    image.png

    java
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue1"),
        exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT)
    ))
    public void listenFanoutQueue1(String msg){
        System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue2"),
        exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT)
    ))
    public void listenFanoutQueue2(String msg){
        System.out.println("消费者1接收到fanout.queue2的消息:【" + msg + "】");
    }
  • Direct示例

    image.png

    java
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
    }
  • Topic示例

    image.png

    java
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
        key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
        key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
    }

消息转换器

  • 定义:Spring消息发送的方法的消息类型是个Object,而在数据传输过程中,该方法会将发送的消息序列化为字节发送给MQ,接收消息时,会将字节反序列化为Java对象,只不过默认情况下使用的序列化方式是JDK序列化,存在以下问题:

    • 数据体积过大
    • 有安全漏洞
    • 可读性差

    image.png

    image.png

  • 解决方式一:配置JSON转换器

    1. 在生产者和消费者的服务里引入依赖,如果项目中引入了 spring-boot-starter-web 依赖,则无需引入下面依赖

      xml
      <dependency>
          <groupId>com.fasterxml.jackson.dataformat</groupId>
          <artifactId>jackson-dataformat-xml</artifactId>
          <version>2.9.10</version>
      </dependency>
    2. 在生产者和消费者中增加下面配置转换Bean

      java
      @Bean
      public MessageConverter messageConverter(){
          // 1.定义消息转换器
          Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
          // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
          jackson2JsonMessageConverter.setCreateMessageIds(true);
          return jackson2JsonMessageConverter;
      }
  • 解决方式二:可以每次发送消息时手动序列化对象,消费消息时再反序列化为对象,下面以使用 com.alibaba.fastjson.JSON 工具类为例

    java
    // 生产者
    Map messageMap = new HashMap()<String, Object>;
    messageMap.put("name", "张三");
    messageMap.put("age", 18);
    rabbitTemplate.convertAndSend("hmall.fanout", "", JSON.toJSONString(messageMap));
    java
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    
    // 消费者
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue1"),
        exchange = @Exchange(value = "hmall.fanout", type = ExchangeTypes.FANOUT, durable = "true")
    ))
    public void listenFanoutQueue1(Message message){
        Map<String, Object> messageMap = JSON.parseObject(message.getBody(), Map.class);
        System.out.println("name: " + messageMap.get("name"));
        System.out.println("age: " + messageMap.get("age"));
    }

生产者可靠性

生产者重连:解决连接MQ失败

  • 启用连接MQ失败的重试机制(生产者配置文件)

    yaml
    spring:
      rabbitmq:
        connection-timeout: 1s # 设置MQ的连接超时时间
        template:
          retry:
            enabled: true # 开启超时重试机制
            initial-interval: 1000ms # 失败后的初始等待时间
            multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
            max-attempts: 3 # 最大重试次数
  • 注意: SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码

生产者确认:解决消息发送失败

  • 说明:RabbitMQ提供了生产者消息确认机制,包括 Publisher ConfirmPublisher Return

    image-20260315144003236

    机制Publisher ConfirmPublisher Return
    触发条件消息成功到达交换机(无论是否路由到队列)消息到达交换机,但是无法路由到任何队列
    关注点消息是否成功投递到交换机消息是否从交换机正确路由到队列
    网络阶段生产者 → 交换机交换机 → 队列
    数据完整性确保消息不丢失在网络传输中防止消息因路由错误而静默丢失
    典型场景网络闪断导致消息未到达MQ路由键拼写错误或队列未绑定
  • 代码实现

    1. 启用消息确认机制(生产者配置文件)

      yaml
      spring:
        rabbitmq:
          publisher-confirm-type: correlated  # 启用 Confirm(旧版用 publisher-confirms: true)
          publisher-returns: true             # 启用 Return 机制
          template:
            mandatory: true                   # 强制触发Return(当路由失败时)

      publisher-confirm-type有三种模式可选:

      • none:关闭confirm机制
      • simple:同步阻塞等待MQ的回执消息
      • correlated:MQ异步回调方式返回回执消息(常用)
    2. 编写ConfirmCallback(在每一次消息发送时单独指定

      java
      // 1. 创建消息内容
      String messageContent = "hello";
      MessageProperties messageProperties = new MessageProperties();
      messageProperties.setMessageId(UUID.randomUUID().toString());
      Message message = MessageBuilder.withBody(messageContent.getBytes())
          .andProperties(messageProperties)
          .build();
      
      // 2. 创建CorrelationData并保存必要信息
      CorrelationData cd = new CorrelationData(messageProperties.getMessageId());
      cd.setExchange("hmall.direct");
      cd.setRoutingKey("q");
      cd.setMessage(message); // 保存消息用于重发
      
      // 3.给Future添加ConfirmCallback
      cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
          @Override
          public void onFailure(Throwable ex) {
              // 3.1.Future发生异常时的处理逻辑,基本不会触发
              log.error("send message fail", ex);
          }
      
          @Override
          public void onSuccess(CorrelationData.Confirm result) {
              // 3.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
              if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                  log.debug("发送消息成功,收到 ack!");
              }else{ // result.getReason(),String类型,返回nack时的异常描述
                  log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
                  int retryCount = correlationData.getRetryCount() == null ? 0 : correlationData.getRetryCount();
                  if (retryCount < 3) {
                      correlationData.setRetryCount(retryCount + 1);
                      // 重发消息(使用保存的消息内容)
                      rabbitTemplate.send(
                          correlationData.getExchange(), 
                          correlationData.getRoutingKey(), 
                          correlationData.getMessage(),  // 使用保存的Message对象
                          correlationData               // 使用同一个CorrelationData
                      );
                  } else {
                      log.error("消息重试超过上限: {}", correlationData);
                  }
              }
          }
      });
      
      // 4.发送消息
      rabbitTemplate.convertAndSend("hmall.direct", "q", message, cd);
    3. 编写ReturnCallback(每个RabbitTemplate只能配置一个ReturnCallback

      java
      package com.itheima.publisher.config;
      
      import lombok.AllArgsConstructor;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.core.ReturnedMessage;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.context.annotation.Configuration;
      
      import javax.annotation.PostConstruct;
      
      @Slf4j
      @AllArgsConstructor
      @Configuration
      public class MqConfig {
          private final RabbitTemplate rabbitTemplate;
      
          @PostConstruct
          public void init(){
              rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                  @Override
                  public void returnedMessage(ReturnedMessage returned) {
                      log.error("触发return callback,");
                      log.debug("exchange: {}", returned.getExchange());
                      log.debug("routingKey: {}", returned.getRoutingKey());
                      log.debug("message: {}", returned.getMessage());
                      log.debug("replyCode: {}", returned.getReplyCode());
                      log.debug("replyText: {}", returned.getReplyText());
                  }
              });
          }
      }

MQ可靠性

数据持久化:保证数据可靠性

  • 说明:为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

    • 交换机持久化

      image.png

      java
      // 代码实现,指定durable="true"
      @RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "fanout.queue1"),
          exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT, durable = "true")
      ))
      public void listenFanoutQueue1(String msg){
          System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
      }
    • 队列持久化

      image.png

      java
      // 代码实现,指定durable="true"
      @RabbitListener(bindings = @QueueBinding(
          value = @Queue(name = "fanout.queue1", durable = "true"),
          exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT)
      ))
      public void listenFanoutQueue1(String msg){
          System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
      }
    • 消息持久化:Spring默认的消息就是持久化的

      image.png

LazyQueue:消息直接入磁盘

  • 说明:从RabbitMQ 3.6.0 版本开始,增加了LazyQueue模式,特征如下:

    • 接收到消息后直接存入磁盘而非内存
    • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
    • 支持数百万条的消息存储

    而在 3.12 版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式

  • 配置方式

    image.png

    java
    // 代码实现,arguments = @Argument(name = "x-queue-mode", value = "lazy")
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue1", arguments = @Argument(name = "x-queue-mode", value = "lazy")),
        exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT)
    ))
    public void listenLazyQueue(String msg){
        log.info("接收到 lazy.queue的消息:{}", msg);
    }

消费者可靠性

消费者确认

  • 说明:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

    • ack:成功处理消息,RabbitMQ从队列中删除该消息
    • nack:消息处理失败,RabbitMQ需要再次投递消息
    • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息(一般reject用的较少,除非是消息格式有问题,那就是开发问题了)
  • 消息回执的三种处理模式

    • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除(非常不安全,不建议使用)

      yaml
      spring:
        rabbitmq:
          listener:
            simple:
              acknowledge-mode: none # 不做处理
    • manual:手动模式。需要自己在业务代码中调用api,发送acknackreject,存在业务入侵,但更灵活

      yaml
      spring:
        rabbitmq:
          listener:
            simple:
              acknowledge-mode: manual # 手动处理
      java
      @RabbitListener(queues = "simple.queue")
      public void listenSimpleQueueMessage(Message message, Channel channel) throws InterruptedException {
          log.info("spring 消费者接收到消息:【" + JSON.parseObject(message.getBody(), String.class) + "】");
          if (true) { // 手动返回ack
              channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
          }else { // 手动返回nack
              channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
          }
      }
    • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

      • 如果是业务异常,会自动返回nack

      • 如果是消息处理或校验异常,自动返回reject。返回Reject的常见异常有:

        Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:

        • o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.
        • o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.
        • o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.
        • o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message<Foo> but Message<Bar> is received.
        • java.lang.NoSuchMethodException: Added in version 1.6.3.
        • java.lang.ClassCastException: Added in version 1.6.3.
      yaml
      spring:
        rabbitmq:
          listener:
            simple:
              acknowledge-mode: auto # 自动处理
      java
      @RabbitListener(queues = "simple.queue")
      public void listenSimpleQueueMessage(Message message, Channel channel) throws InterruptedException {
          log.info("spring 消费者接收到消息:【" + JSON.parseObject(message.getBody(), String.class) + "】");
          if (true) {
              // throw new RuntimeException("故意的"); // 返回nack
              throw new MessageConversionException("故意的"); // 返回reject
          }
          log.info("消息处理完成");
      }

失败重试

  • 说明:当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。Spring提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列

  • 代码实现

    • 启用消费者失败重试(消费者配置文件)

      yaml
      spring:
        rabbitmq:
          listener:
            simple:
              retry:
                enabled: true # 开启消费者失败重试
                initial-interval: 1000ms # 初识的失败等待时长为1秒
                multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
                max-attempts: 3 # 最大重试次数
                stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
  • 达到重试次数后依旧失败的处理策略

    • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

    • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

    • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(比较优雅的一种处理方案,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理)

      java
      package com.itheima.consumer.config;
      
      import org.springframework.amqp.core.Binding;
      import org.springframework.amqp.core.BindingBuilder;
      import org.springframework.amqp.core.DirectExchange;
      import org.springframework.amqp.core.Queue;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.amqp.rabbit.retry.MessageRecoverer;
      import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
      import org.springframework.context.annotation.Bean;
      
      @Configuration
      @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
      public class ErrorMessageConfig {
      
          // 定义处理失败消息的交换机
          @Bean
          public DirectExchange errorMessageExchange(){
              return new DirectExchange("error.direct");
          }
      
          // 定义处理失败消息的队列
          @Bean
          public Queue errorQueue(){
              return new Queue("error.queue", true);
          }
      
          // 绑定交换机和队列
          @Bean
          public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
              return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
          }
      
          //  定义RepublishMessageRecoverer
          @Bean
          public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
              return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
          }
      }

业务幂等性

  • 说明:为了防止消息被消费多次导致出现业务故障,必须想办法保证消息处理的幂等性

  • 实现方案:唯一消息ID

    • 每一条消息都生成一个唯一的ID,与消息一起投递给消费者
    • 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
    • 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理
  • 开启唯一ID功能

    1. 以消息转换器为例

      java
      @Bean
      public MessageConverter messageConverter(){
          // 1.定义消息转换器
          Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
          // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
          jackson2JsonMessageConverter.setCreateMessageIds(true);
          return jackson2JsonMessageConverter;
      }
    2. 发送消息后,消息ID就随着消息一起投递到了MQ中

      image-20260315164032661

    3. 消费者代码示例

      java
      @RabbitListener(queues = "your.queue.name")
      @Transactional
      public void handleMessage(Message amqpMessage) {
          // 获取消息ID
          String messageId = amqpMessage.getMessageProperties().getMessageId();
      
          if (messageId == null) {
              // 如果消息没有ID,可以选择拒绝或生成一个ID
              throw new IllegalArgumentException("Message must have an ID");
          }
      
          // 查询数据库是否有记录检查消息是否已处理
          if () {
              System.out.println("消息已处理过,跳过: " + messageId);
              return;
          }
      
          try {
              // 业务处理...
      
              // 保存消息ID到数据库...
      
              System.out.println("消息处理成功: " + messageId);
      
          } catch (Exception e) {
              throw e; // 抛出异常触发重试机制
          }
      }

延迟消息

  • 定义:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息

死信交换机

  • 死信的定义:当一个队列中的消息满足下列情况之一时,可以成为死信

    • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
    • 消息是一个过期消息,超时无人消费
    • 要投递的队列消息满了,无法投递
  • 死信交换机的定义:如果一个队列中的消息已经成为死信,并且这个队列通过**dead-letter-exchange**属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机

  • 死信交换机实现延迟队列的原理

    image-20260315165823455

    1. 队列simple.queue不绑定消费者
    2. 队列simple.queue通过dead-letter-exchange属性指定了dlx.direct交换机
    3. publisher发送消息到simple.queue队列,超时后,由于消息过期且无人消费,成为死信,被投递到dlx.direct这个交换机中
    4. dlx.direct交换机接收到消息后正常路由到队列,然后被消费者消费,进而实现了延迟某一段时间后被消费者消费的效果

MIT版权,未经许可禁止任何形式的转载