实际开发中 如何设置rabbitmq的消息发送确认回调?
时间: 2024-03-21 11:42:39 浏览: 138
springboot整合rabbitmq,开启手工确认。保证消息100%投递
在 RabbitMQ 中,可以通过设置消息确认机制来保证消息的可靠性。消息确认机制主要包括两个方面:生产者消息确认和消费者消息确认。
生产者消息确认是指生产者发送消息到 Broker 后,通过消息确认机制获取消息是否成功发送到 Broker 的确认信息。在实际开发中,可以通过设置 ConfirmCallback 回调函数来处理生产者消息确认的结果。
下面是一个示例代码:
```java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String QUEUE_NAME = "my_queue";
private static Connection connection;
private static Channel channel;
public static void main(String[] args) throws IOException, TimeoutException {
// 连接 RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 开启消息确认模式
channel.confirmSelect();
// 设置消息确认回调
channel.addConfirmListener(new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("Messages have been confirmed up to " + deliveryTag);
} else {
System.out.println("Message has been confirmed: " + deliveryTag);
}
}
});
// 发送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("Sent message: " + message);
// 关闭连接
channel.close();
connection.close();
}
}
```
在上述代码中,我们使用了 channel.confirmSelect() 方法开启消息确认模式,并通过 channel.addConfirmListener() 方法设置了消息确认回调函数 ConfirmCallback。当消息被确认发送到 Broker 后,ConfirmCallback 回调函数会被调用,可以处理消息发送结果。在回调函数中,deliveryTag 表示消息的唯一标识,multiple 表示是否是批量确认。
希望这个回答能够帮到您!
阅读全文