rabbitmq消费端怎么监听myQueue队列 并且做ack消息确认机制 java代码描述
时间: 2023-07-31 13:10:51 浏览: 101
在Java中使用RabbitMQ的消费者端监听队列并实现消息确认机制的代码示例如下:
```java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class RabbitMQConsumer {
private final static String QUEUE_NAME = "myQueue";
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); // RabbitMQ服务器地址
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义消息接收回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received message: " + message);
// 手动确认消息已被消费
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 监听队列并消费消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
// 等待消息处理完成后关闭连接
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}));
}
}
```
这段代码中,首先创建了一个连接工厂(ConnectionFactory),设置RabbitMQ服务器地址,然后创建连接(Connection)和信道(Channel)。接下来,声明了要监听的队列(QUEUE_NAME)。然后,定义了消息接收的回调函数(DeliverCallback),在回调函数中处理接收到的消息,并通过`channel.basicAck`手动确认消息已被消费。最后,通过`channel.basicConsume`方法启动监听队列并消费消息的过程。
请注意,以上代码仅提供了基本的示例,实际应用中还需要考虑异常处理、连接关闭等情况。
阅读全文