RabbitMQ架构示意图

- publisher:消息的生产者,也就是发送消息的一方
- consumer:消息的消费者,也就是消费消息的一方
- exchange:交换机,生产者发送的消息由交换机决定投递到哪个或哪几个队列,只负责路由消息到队列,没有存储消息的能力
- queue:队列,生产者发送的消息的存储位置,等待消费者处理
- virtual host:虚拟主机,起到数据隔离的作用,不同的虚拟主机的交换机、队列互不干扰,通常情况下,给所有使用同一个RabbitMQ的项目创建他们自己的用户,在用户下再创建属于各自的虚拟主机
配置文件详解
生产者
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务主机地址
port: 5672 # RabbitMQ 服务端口,默认为5672
username: admin # 连接用户名
password: 你的密码 # 连接密码
virtual-host: / # 虚拟主机,类似于命名空间,默认为/
# 开启生产者确认回调
publisher-confirm-type: correlated
# 开启消息路由失败回调
publisher-returns: true
template:
# 定义消息路由失败时的策略。true: 调用ReturnCallback;false: 直接丢弃消息
mandatory: true消费者
spring:
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务主机地址
port: 5672 # RabbitMQ 服务端口,默认为5672
username: admin # 连接用户名
password: 你的密码 # 连接密码
virtual-host: / # 虚拟主机,类似于命名空间,默认为/
listener:
simple:
# 消费者确认模式
acknowledge-mode: auto
# 并发消费者数量配置
concurrency: 5
max-concurrency: 10
# 预取消息数量
prefetch: 10
# 消费失败重试机制
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
multiplier: 2.0基础消息模型
定义:一个生产者将消息发送到一个交换机,然后交换机将消息路由到一个队列。一个消费者从队列中获取并消费消息。这种模型适用于单个消费者处理消息的情况

代码示例
java// 生产者 package com.itheima.publisher.amqp; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, spring amqp!"; // 发送消息 rabbitTemplate.convertAndSend(queueName, message); } }java// 消费者 package com.itheima.consumer.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } }
工作队列模型(Work Queue)
定义:当一个消费者的处理速度慢于生产者时,如果使用基础消息模型,会导致消息堆积在队列中。Work Queue模型通过增加消费者数量来解决这个问题。

代码示例
java// 生产者 // 向队列中不停发送消息,模拟消息堆积 @Test public void testWorkQueue() throws InterruptedException { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, message_"; for (int i = 0; i < 50; i++) { // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息 rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }java@RabbitListener(queues = "work.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "work.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }tex消费者1接收到消息:【hello, message_0】21:06:00.869555300 消费者2........接收到消息:【hello, message_1】21:06:00.884518 消费者1接收到消息:【hello, message_2】21:06:00.907454400 消费者1接收到消息:【hello, message_4】21:06:00.953332100 消费者1接收到消息:【hello, message_6】21:06:00.997867300 消费者1接收到消息:【hello, message_8】21:06:01.042178700 消费者2........接收到消息:【hello, message_3】21:06:01.086478800 消费者1接收到消息:【hello, message_10】21:06:01.087476600 消费者1接收到消息:【hello, message_12】21:06:01.132578300 消费者1接收到消息:【hello, message_14】21:06:01.175851200 消费者1接收到消息:【hello, message_16】21:06:01.218533400 消费者1接收到消息:【hello, message_18】21:06:01.261322900 消费者2........接收到消息:【hello, message_5】21:06:01.287003700 消费者1接收到消息:【hello, message_20】21:06:01.304412400 消费者1接收到消息:【hello, message_22】21:06:01.349950100 消费者1接收到消息:【hello, message_24】21:06:01.394533900 消费者1接收到消息:【hello, message_26】21:06:01.439876500 消费者1接收到消息:【hello, message_28】21:06:01.482937800 消费者2........接收到消息:【hello, message_7】21:06:01.488977100 消费者1接收到消息:【hello, message_30】21:06:01.526409300 消费者1接收到消息:【hello, message_32】21:06:01.572148 消费者1接收到消息:【hello, message_34】21:06:01.618264800 消费者1接收到消息:【hello, message_36】21:06:01.660780600 消费者2........接收到消息:【hello, message_9】21:06:01.689189300 消费者1接收到消息:【hello, message_38】21:06:01.705261 消费者1接收到消息:【hello, message_40】21:06:01.746927300 消费者1接收到消息:【hello, message_42】21:06:01.789835 消费者1接收到消息:【hello, message_44】21:06:01.834393100 消费者1接收到消息:【hello, message_46】21:06:01.875312100 消费者2........接收到消息:【hello, message_11】21:06:01.889969500 消费者1接收到消息:【hello, message_48】21:06:01.920702500 消费者2........接收到消息:【hello, message_13】21:06:02.090725900 消费者2........接收到消息:【hello, message_15】21:06:02.293060600 消费者2........接收到消息:【hello, message_17】21:06:02.493748 消费者2........接收到消息:【hello, message_19】21:06:02.696635100 消费者2........接收到消息:【hello, message_21】21:06:02.896809700 消费者2........接收到消息:【hello, message_23】21:06:03.099533400 消费者2........接收到消息:【hello, message_25】21:06:03.301446400 消费者2........接收到消息:【hello, message_27】21:06:03.504999100 消费者2........接收到消息:【hello, message_29】21:06:03.705702500 消费者2........接收到消息:【hello, message_31】21:06:03.906601200 消费者2........接收到消息:【hello, message_33】21:06:04.108118500 消费者2........接收到消息:【hello, message_35】21:06:04.308945400 消费者2........接收到消息:【hello, message_37】21:06:04.511547700 消费者2........接收到消息:【hello, message_39】21:06:04.714038400 消费者2........接收到消息:【hello, message_41】21:06:04.916192700 消费者2........接收到消息:【hello, message_43】21:06:05.116286400 消费者2........接收到消息:【hello, message_45】21:06:05.318055100 消费者2........接收到消息:【hello, message_47】21:06:05.520656400 消费者2........接收到消息:【hello, message_49】21:06:05.723106700注意:默认情况下,如果多个消费者绑定到同一个queue且消费者的处理速度并不相同时,消息依旧会均分给每一个消费者,这就可能导致1个消费者空闲、1个消费者忙碌的情况,针对这种情况,可以修改消费者服务的application.yml文件,添加下面的配置
ymlspring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息tex消费者1接收到消息:【hello, message_0】21:12:51.659664200 消费者2........接收到消息:【hello, message_1】21:12:51.680610 消费者1接收到消息:【hello, message_2】21:12:51.703625 消费者1接收到消息:【hello, message_3】21:12:51.724330100 消费者1接收到消息:【hello, message_4】21:12:51.746651100 消费者1接收到消息:【hello, message_5】21:12:51.768401400 消费者1接收到消息:【hello, message_6】21:12:51.790511400 消费者1接收到消息:【hello, message_7】21:12:51.812559800 消费者1接收到消息:【hello, message_8】21:12:51.834500600 消费者1接收到消息:【hello, message_9】21:12:51.857438800 消费者1接收到消息:【hello, message_10】21:12:51.880379600 消费者2........接收到消息:【hello, message_11】21:12:51.899327100 消费者1接收到消息:【hello, message_12】21:12:51.922828400 消费者1接收到消息:【hello, message_13】21:12:51.945617400 消费者1接收到消息:【hello, message_14】21:12:51.968942500 消费者1接收到消息:【hello, message_15】21:12:51.992215400 消费者1接收到消息:【hello, message_16】21:12:52.013325600 消费者1接收到消息:【hello, message_17】21:12:52.035687100 消费者1接收到消息:【hello, message_18】21:12:52.058188 消费者1接收到消息:【hello, message_19】21:12:52.081208400 消费者2........接收到消息:【hello, message_20】21:12:52.103406200 消费者1接收到消息:【hello, message_21】21:12:52.123827300 消费者1接收到消息:【hello, message_22】21:12:52.146165100 消费者1接收到消息:【hello, message_23】21:12:52.168828300 消费者1接收到消息:【hello, message_24】21:12:52.191769500 消费者1接收到消息:【hello, message_25】21:12:52.214839100 消费者1接收到消息:【hello, message_26】21:12:52.238998700 消费者1接收到消息:【hello, message_27】21:12:52.259772600 消费者1接收到消息:【hello, message_28】21:12:52.284131800 消费者2........接收到消息:【hello, message_29】21:12:52.306190600 消费者1接收到消息:【hello, message_30】21:12:52.325315800 消费者1接收到消息:【hello, message_31】21:12:52.347012500 消费者1接收到消息:【hello, message_32】21:12:52.368508600 消费者1接收到消息:【hello, message_33】21:12:52.391785100 消费者1接收到消息:【hello, message_34】21:12:52.416383800 消费者1接收到消息:【hello, message_35】21:12:52.439019 消费者1接收到消息:【hello, message_36】21:12:52.461733900 消费者1接收到消息:【hello, message_37】21:12:52.485990 消费者1接收到消息:【hello, message_38】21:12:52.509219900 消费者2........接收到消息:【hello, message_39】21:12:52.523683400 消费者1接收到消息:【hello, message_40】21:12:52.547412100 消费者1接收到消息:【hello, message_41】21:12:52.571191800 消费者1接收到消息:【hello, message_42】21:12:52.593024600 消费者1接收到消息:【hello, message_43】21:12:52.616731800 消费者1接收到消息:【hello, message_44】21:12:52.640317 消费者1接收到消息:【hello, message_45】21:12:52.663111100 消费者1接收到消息:【hello, message_46】21:12:52.686727 消费者1接收到消息:【hello, message_47】21:12:52.709266500 消费者2........接收到消息:【hello, message_48】21:12:52.725884900 消费者1接收到消息:【hello, message_49】21:12:52.746299900
订阅模型-Fanout交换机
定义:在这种模型中,生产者将消息发送到一个交换机,然后交换机将消息路由到与交换机绑定每一个队列中。多个消费者订阅这些队列并消费消息。

代码示例
java// 生产者 @Test public void testFanoutExchange() { // 交换机名称 String exchangeName = "hmall.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }java// 消费者 // 根据规则,下面两个都能收到消息 @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }
订阅模型-Direct交换机
定义:在这种模型中,队列和交换机的绑定不再是任意的了,而是相互之间指定一个RoutingKey(路由key),生产者向交换机发送消息时也必须指定RoutingKey,交换机收到消息后,根据RoutingKey将消息重定向到队列

代码示例
java// 生产者 @Test public void testSendDirectExchange() { // 交换机名称 String exchangeName = "hmall.direct"; // 消息 String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message); }java// 消费者 // 根据规则,下面两个都能收到消息 @RabbitListener(queues = "direct.queue1") public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "direct.queue2") public void listenDirectQueue2(String msg) { System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); }
订阅模型-Topic
定义:和Direct一样,都是根据RoutingKey把消息路由到绑定的队列中,但是Topic交换机可以在与队列绑定RoutingKey时使用通配符

通配符规则:
- #:匹配一个或多个词
- *:只匹配一个词
举例:
- item.#:能够匹配 item.spu.insert 或者 item.spu
- item.*:只能匹配 item.spu
代码示例
java// 生产者 @Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "hmall.topic"; // 消息 String message = "喜报!孙悟空大战哥斯拉,胜!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }java// 消费者 // 根据规则,下面两个都能收到消息 @RabbitListener(queues = "topic.queue1") public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(queues = "topic.queue2") public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); }
Java代码声明队列和交换机
Fanout示例
java@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue1"), exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue1(String msg){ System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue2"), exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue2(String msg){ System.out.println("消费者1接收到fanout.queue2的消息:【" + msg + "】"); }Direct示例
java@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); }Topic示例
java@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); }
‘
消息转换器
定义:Spring消息发送的方法的消息类型是个Object,而在数据传输过程中,该方法会将发送的消息序列化为字节发送给MQ,接收消息时,会将字节反序列化为Java对象,只不过默认情况下使用的序列化方式是JDK序列化,存在以下问题:
- 数据体积过大
- 有安全漏洞
- 可读性差


解决方式一:配置JSON转换器
在生产者和消费者的服务里引入依赖,如果项目中引入了
spring-boot-starter-web依赖,则无需引入下面依赖xml<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>在生产者和消费者中增加下面配置转换Bean
java@Bean public MessageConverter messageConverter(){ // 1.定义消息转换器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; }
解决方式二:可以每次发送消息时手动序列化对象,消费消息时再反序列化为对象,下面以使用 com.alibaba.fastjson.JSON 工具类为例
java// 生产者 Map messageMap = new HashMap()<String, Object>; messageMap.put("name", "张三"); messageMap.put("age", 18); rabbitTemplate.convertAndSend("hmall.fanout", "", JSON.toJSONString(messageMap));javaimport org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; // 消费者 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue1"), exchange = @Exchange(value = "hmall.fanout", type = ExchangeTypes.FANOUT, durable = "true") )) public void listenFanoutQueue1(Message message){ Map<String, Object> messageMap = JSON.parseObject(message.getBody(), Map.class); System.out.println("name: " + messageMap.get("name")); System.out.println("age: " + messageMap.get("age")); }
生产者可靠性
生产者重连:解决连接MQ失败
启用连接MQ失败的重试机制(生产者配置文件)
yamlspring: rabbitmq: connection-timeout: 1s # 设置MQ的连接超时时间 template: retry: enabled: true # 开启超时重试机制 initial-interval: 1000ms # 失败后的初始等待时间 multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier max-attempts: 3 # 最大重试次数注意: SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码
生产者确认:解决消息发送失败
说明:RabbitMQ提供了生产者消息确认机制,包括
Publisher Confirm和Publisher Return
机制 Publisher Confirm Publisher Return 触发条件 消息成功到达交换机(无论是否路由到队列) 消息到达交换机,但是无法路由到任何队列 关注点 消息是否成功投递到交换机 消息是否从交换机正确路由到队列 网络阶段 生产者 → 交换机 交换机 → 队列 数据完整性 确保消息不丢失在网络传输中 防止消息因路由错误而静默丢失 典型场景 网络闪断导致消息未到达MQ 路由键拼写错误或队列未绑定 代码实现
启用消息确认机制(生产者配置文件)
yamlspring: rabbitmq: publisher-confirm-type: correlated # 启用 Confirm(旧版用 publisher-confirms: true) publisher-returns: true # 启用 Return 机制 template: mandatory: true # 强制触发Return(当路由失败时)publisher-confirm-type有三种模式可选:
- none:关闭confirm机制
- simple:同步阻塞等待MQ的回执消息
- correlated:MQ异步回调方式返回回执消息(常用)
编写ConfirmCallback(在每一次消息发送时单独指定)
java// 1. 创建消息内容 String messageContent = "hello"; MessageProperties messageProperties = new MessageProperties(); messageProperties.setMessageId(UUID.randomUUID().toString()); Message message = MessageBuilder.withBody(messageContent.getBytes()) .andProperties(messageProperties) .build(); // 2. 创建CorrelationData并保存必要信息 CorrelationData cd = new CorrelationData(messageProperties.getMessageId()); cd.setExchange("hmall.direct"); cd.setRoutingKey("q"); cd.setMessage(message); // 保存消息用于重发 // 3.给Future添加ConfirmCallback cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { // 3.1.Future发生异常时的处理逻辑,基本不会触发 log.error("send message fail", ex); } @Override public void onSuccess(CorrelationData.Confirm result) { // 3.2.Future接收到回执的处理逻辑,参数中的result就是回执内容 if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执 log.debug("发送消息成功,收到 ack!"); }else{ // result.getReason(),String类型,返回nack时的异常描述 log.error("发送消息失败,收到 nack, reason : {}", result.getReason()); int retryCount = correlationData.getRetryCount() == null ? 0 : correlationData.getRetryCount(); if (retryCount < 3) { correlationData.setRetryCount(retryCount + 1); // 重发消息(使用保存的消息内容) rabbitTemplate.send( correlationData.getExchange(), correlationData.getRoutingKey(), correlationData.getMessage(), // 使用保存的Message对象 correlationData // 使用同一个CorrelationData ); } else { log.error("消息重试超过上限: {}", correlationData); } } } }); // 4.发送消息 rabbitTemplate.convertAndSend("hmall.direct", "q", message, cd);编写ReturnCallback(每个RabbitTemplate只能配置一个ReturnCallback)
javapackage com.itheima.publisher.config; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Slf4j @AllArgsConstructor @Configuration public class MqConfig { private final RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { log.error("触发return callback,"); log.debug("exchange: {}", returned.getExchange()); log.debug("routingKey: {}", returned.getRoutingKey()); log.debug("message: {}", returned.getMessage()); log.debug("replyCode: {}", returned.getReplyCode()); log.debug("replyText: {}", returned.getReplyText()); } }); } }
MQ可靠性
数据持久化:保证数据可靠性
说明:为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:
交换机持久化
java// 代码实现,指定durable="true" @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue1"), exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT, durable = "true") )) public void listenFanoutQueue1(String msg){ System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】"); }队列持久化
java// 代码实现,指定durable="true" @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue1", durable = "true"), exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue1(String msg){ System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】"); }消息持久化:Spring默认的消息就是持久化的

LazyQueue:消息直接入磁盘
说明:从RabbitMQ 3.6.0 版本开始,增加了LazyQueue模式,特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
- 支持数百万条的消息存储
而在 3.12 版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式
配置方式
java// 代码实现,arguments = @Argument(name = "x-queue-mode", value = "lazy") @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue1", arguments = @Argument(name = "x-queue-mode", value = "lazy")), exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT) )) public void listenLazyQueue(String msg){ log.info("接收到 lazy.queue的消息:{}", msg); }
消费者可靠性
消费者确认
说明:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息(一般reject用的较少,除非是消息格式有问题,那就是开发问题了)
消息回执的三种处理模式
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除(非常不安全,不建议使用)
yamlspring: rabbitmq: listener: simple: acknowledge-mode: none # 不做处理manual:手动模式。需要自己在业务代码中调用api,发送
ack或nack或reject,存在业务入侵,但更灵活yamlspring: rabbitmq: listener: simple: acknowledge-mode: manual # 手动处理java@RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(Message message, Channel channel) throws InterruptedException { log.info("spring 消费者接收到消息:【" + JSON.parseObject(message.getBody(), String.class) + "】"); if (true) { // 手动返回ack channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); }else { // 手动返回nack channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true); } }auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回
ack. 当业务出现异常时,根据异常判断返回不同结果:如果是业务异常,会自动返回
nack;如果是消息处理或校验异常,自动返回
reject。返回Reject的常见异常有:Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:
- o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.
- o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.
- o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.
- o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message<Foo> but Message<Bar> is received.
- java.lang.NoSuchMethodException: Added in version 1.6.3.
- java.lang.ClassCastException: Added in version 1.6.3.
yamlspring: rabbitmq: listener: simple: acknowledge-mode: auto # 自动处理java@RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(Message message, Channel channel) throws InterruptedException { log.info("spring 消费者接收到消息:【" + JSON.parseObject(message.getBody(), String.class) + "】"); if (true) { // throw new RuntimeException("故意的"); // 返回nack throw new MessageConversionException("故意的"); // 返回reject } log.info("消息处理完成"); }
失败重试
说明:当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。Spring提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列
代码实现
启用消费者失败重试(消费者配置文件)
yamlspring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000ms # 初识的失败等待时长为1秒 multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 # 最大重试次数 stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
达到重试次数后依旧失败的处理策略
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(比较优雅的一种处理方案,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理)javapackage com.itheima.consumer.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; @Configuration @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true") public class ErrorMessageConfig { // 定义处理失败消息的交换机 @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } // 定义处理失败消息的队列 @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } // 绑定交换机和队列 @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); } // 定义RepublishMessageRecoverer @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); } }
业务幂等性
说明:为了防止消息被消费多次导致出现业务故障,必须想办法保证消息处理的幂等性
实现方案:唯一消息ID
- 每一条消息都生成一个唯一的ID,与消息一起投递给消费者
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理
开启唯一ID功能
以消息转换器为例
java@Bean public MessageConverter messageConverter(){ // 1.定义消息转换器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; }发送消息后,消息ID就随着消息一起投递到了MQ中

消费者代码示例
java@RabbitListener(queues = "your.queue.name") @Transactional public void handleMessage(Message amqpMessage) { // 获取消息ID String messageId = amqpMessage.getMessageProperties().getMessageId(); if (messageId == null) { // 如果消息没有ID,可以选择拒绝或生成一个ID throw new IllegalArgumentException("Message must have an ID"); } // 查询数据库是否有记录检查消息是否已处理 if () { System.out.println("消息已处理过,跳过: " + messageId); return; } try { // 业务处理... // 保存消息ID到数据库... System.out.println("消息处理成功: " + messageId); } catch (Exception e) { throw e; // 抛出异常触发重试机制 } }
延迟消息
- 定义:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息
死信交换机
死信的定义:当一个队列中的消息满足下列情况之一时,可以成为死信
- 消费者使用
basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false - 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
- 消费者使用
死信交换机的定义:如果一个队列中的消息已经成为死信,并且这个队列通过
**dead-letter-exchange**属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机死信交换机实现延迟队列的原理

- 队列simple.queue不绑定消费者
- 队列simple.queue通过dead-letter-exchange属性指定了dlx.direct交换机
- publisher发送消息到simple.queue队列,超时后,由于消息过期且无人消费,成为死信,被投递到dlx.direct这个交换机中
- dlx.direct交换机接收到消息后正常路由到队列,然后被消费者消费,进而实现了延迟某一段时间后被消费者消费的效果