springboot整合rocketmq,消费者接收到消息,怎么让消费者进行手动确认,而不让mq继续发送,请写出代码示例
时间: 2023-09-15 17:19:43 浏览: 444
好的,以下是一个简单的 Spring Boot 整合 RocketMQ 的消费者配置示例,其中包含了手动确认消息的代码:
```java
@Configuration
public class RocketMQConsumerConfig {
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Value("${rocketmq.namesrvAddr}")
private String namesrvAddr;
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe("TopicTest", "*"); // 订阅主题和标签,*代表所有标签
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 从最开始的消息开始消费
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
// 消费消息
System.out.printf("Received message: %s %n", new String(msg.getBody()));
// 手动确认消息
consumer.sendMessageBack(msg, ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
} catch (Exception e) {
e.printStackTrace();
// 消费失败,进行重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
return consumer;
}
}
```
在上面的代码中,我们将消费者的消费方式设置为 `ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET`,从最开始的消息开始消费。然后在 `registerMessageListener` 方法中,我们可以获取到消息列表 `msgs`,对每一条消息进行消费处理,并且在消费成功后,使用 `consumer.sendMessageBack(msg, ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name())` 方法手动确认消息。如果消费失败,我们可以返回 `ConsumeConcurrentlyStatus.RECONSUME_LATER`,这样消息将会重新进入队列进行重新消费。
阅读全文