我要Java版本的 轮询和非轮询
时间: 2023-10-15 15:06:55 浏览: 90
好的,下面分别给出 Java 版本的轮询和非轮询消费 RabbitMQ 广播消息的示例代码。
轮询消费:
```java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "logs";
private static final String[] QUEUE_NAMES = {"queue1", "queue2", "queue3"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (String queue : QUEUE_NAMES) {
channel.queueDeclare(queue, false, false, false, null);
channel.queueBind(queue, EXCHANGE_NAME, "");
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
for (String queue : QUEUE_NAMES) {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queue, true, deliverCallback, consumerTag -> {});
}
}
}
```
非轮询消费:
```java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "logs";
private static final String[] QUEUE_NAMES = {"queue1", "queue2", "queue3"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (String queue : QUEUE_NAMES) {
channel.queueDeclare(queue, false, false, false, null);
channel.queueBind(queue, EXCHANGE_NAME, "");
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
for (String queue : QUEUE_NAMES) {
channel.basicConsume(queue, true, deliverCallback, consumerTag -> {});
}
}
}
```
这两份代码的主要区别在于轮询消费的代码使用了多个消费者去监听不同的队列,而非轮询消费的代码只使用了一个消费者,同时监听多个队列。其他部分都是一样的,都是创建连接、声明 exchange 和 queue,然后使用 `basicConsume` 方法注册一个消费者,指定回调函数来处理收到的消息。
阅读全文