帮我写一个java版本的rabbitmq多线程的消费端处理
时间: 2024-05-05 22:17:07 浏览: 227
以下是一个简单的Java版本的RabbitMQ多线程消费端处理的代码示例:
```java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
public class RabbitmqConsumerMultiThread {
private static final String QUEUE_NAME = "my_queue";
private static final String EXCHANGE_NAME = "my_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
int numThreads = 5; //设置线程数
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {
final int threadNum = i;
Runnable task = new Runnable() {
@Override
public void run() {
try {
Channel channel = connection.createChannel();
channel.basicQos(1); //一次只处理一条消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Thread " + threadNum + " received message: " + message);
try {
Thread.sleep(1000); //模拟处理消息的时间
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
channel.basicAck(envelope.getDeliveryTag(), false); //确认消息已处理
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}
};
executorService.submit(task);
}
}
}
```
这个代码示例中,我们首先创建了一个RabbitMQ的连接和通道,然后声明了一个队列和一个交换机,并将队列绑定到交换机上。
接着,我们使用Java的Executor框架创建了一个线程池,其中包含了5个线程。每个线程都会创建一个新的通道,并使用basicConsume方法注册一个消费者。
消费者处理消息的方式与普通的消费者相同,但是我们在处理消息之前调用了basicQos方法,限制了每次只处理一条消息。这是为了避免某个线程一次性处理太多的消息,导致其他线程空闲的情况发生。
最后,我们将每个线程的任务提交到线程池中,并等待任务完成。每个线程会一直循环等待新的消息到达,直到程序被关闭。
阅读全文