rabbitmq 避免重复消费的具体java代码实现
时间: 2023-07-20 17:05:52 浏览: 125
在 RabbitMQ 中,避免重复消费的一种常用方式是使用消息的唯一标识符(message ID)来判断是否已经消费过该消息。具体实现可以参考以下 Java 代码:
```java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
public class Consumer {
private static final String QUEUE_NAME = "my_queue";
private static final Set<String> PROCESSED_MESSAGES = new HashSet<>();
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
String messageId = delivery.getProperties().getMessageId();
if (!PROCESSED_MESSAGES.contains(messageId)) {
// 处理消息
System.out.println("Received message: " + message);
// 将消息 ID 添加到已处理的集合
PROCESSED_MESSAGES.add(messageId);
} else {
System.out.println("Skipping message with ID " + messageId + " because it was already processed");
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
```
在上面的代码中,我们创建了一个集合 `PROCESSED_MESSAGES` 来存储已经处理过的消息的 ID。在消息到达时,我们首先通过 `delivery.getProperties().getMessageId()` 获取消息的 ID,然后检查该 ID 是否已经存在于集合中。如果不存在,则说明该消息还没有被处理过,我们就可以对其进行处理,并将其 ID 添加到集合中。如果已经存在,则说明该消息已经被处理过,我们就可以跳过该消息。
值得注意的是,上面的代码中只是简单地将消息 ID 添加到集合中,而没有考虑如何清理集合。在实际应用中,为了避免集合过大导致内存占用过高,我们需要定期清理集合,例如每天或每周清理一次。
阅读全文