RabbitMQ-高级篇.pdf
### RabbitMQ 高级篇知识点解析 #### 一、消息可靠性 消息从生产者发出到被消费者接收的过程中,会经历多个阶段,其中任一环节都可能导致消息丢失。常见的情况包括: 1. **发送时丢失**:生产者发送的消息未能到达交换机(Exchange)。 2. **到达交换机后未到达队列**:即使消息到达了交换机,也可能未能成功存储到队列(Queue)中。 3. **MQ服务宕机**:队列中的消息可能因MQ服务故障而丢失。 4. **Consumer端故障**:消费者接收到消息但未能成功处理消息即发生故障。 针对以上问题,RabbitMQ提供了以下解决方案: 1. **生产者确认机制**(Producer Confirm):确保消息能够成功到达交换机。 2. **消息持久化**:使消息能够在队列中持久保存,避免服务重启导致数据丢失。 3. **消费者确认机制**(Consumer Acknowledgment):确认消息已成功处理。 4. **失败重试机制**:对于处理失败的消息进行重试。 #### 二、生产者确认机制详解 RabbitMQ 提供了两种确认机制,分别为`publisher-confirm`和`publisher-return`。 ##### 1.1.1 修改配置 为了启用生产者确认机制,需要在`application.yml`配置文件中添加以下内容: ```yaml spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true ``` - `publisher-confirm-type`: 开启确认模式,支持两种类型: - `simple`: 同步等待确认结果,直到超时。 - `correlated`: 异步回调,定义`ConfirmCallback`接口,在消息处理完成时调用此接口。 - `publisher-returns`: 开启`publisher-return`功能,同样是基于回调机制,定义`ReturnCallback`接口,用于处理消息路由失败的情况。 - `template.mandatory`: 定义消息路由失败时的策略。如果设置为`true`,则会调用`ReturnCallback`接口;如果设置为`false`,则直接丢弃消息。 ##### 1.1.2 定义Return回调 由于每个`RabbitTemplate`实例只能配置一个`ReturnCallback`,因此需要在项目启动时配置: ```java import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.ApplicationContext; public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 设置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 投递失败,记录日志 log.error("消息投递失败,replyCode: {}, replyText: {}, exchange: {}, routingKey: {}", replyCode, replyText, exchange, routingKey); }); } } ``` ##### 1.1.3 定义ConfirmCallback 在`SpringAmqpTest`类中定义一个单元测试方法,用于实现`ConfirmCallback`接口,该接口允许在发送消息时指定,以便根据不同业务场景处理确认成功或失败的情况: ```java import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @SpringJUnitConfig(CommonConfig.class) @TestPropertySource(locations = "classpath:application.yml") public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testConfirm() { rabbitTemplate.setConfirmCallback(new ConfirmCallback() { @Override public void handle(Long deliveryTag, boolean multiple, boolean ack) { if (ack) { log.info("消息处理成功,deliveryTag: {}", deliveryTag); } else { log.warn("消息处理失败,deliveryTag: {}", deliveryTag); } } }); rabbitTemplate.convertAndSend("exchange", "routingKey", "test message"); } } ``` #### 三、总结 本文主要介绍了RabbitMQ高级篇中的关键概念之一——消息可靠性及其解决方案。通过详细解析生产者确认机制的配置与实现,展示了如何确保消息在从生产者到消费者的整个传递过程中不会丢失。这对于构建可靠的消息中间件系统至关重要。此外,还探讨了消息持久化、消费者确认机制以及失败重试机制等其他重要的可靠性保障措施。这些技术点对于实际应用中的系统设计与开发具有重要意义。