创建多个rocketmq消费者 java实现
时间: 2023-08-28 10:06:15 浏览: 107
可以通过创建多个消费者来实现RocketMQ消息的并发处理,以下是Java代码示例:
```java
public class ConsumerGroupExample {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer1 = new DefaultMQPushConsumer("group1");
consumer1.setNamesrvAddr("localhost:9876");
consumer1.subscribe("Topic1", "Tag1");
DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer("group1");
consumer2.setNamesrvAddr("localhost:9876");
consumer2.subscribe("Topic1", "Tag1");
consumer1.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer2.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer1.start();
consumer2.start();
System.out.printf("Consumer Started.%n");
}
}
```
这里创建了两个消费者,并使用相同的消费者组名(group1),它们都订阅了Topic1和Tag1的消息。每个消费者都注册了一个消息监听器,用于处理收到的消息。最后,通过调用start()方法启动消费者。由于两个消费者都属于同一个消费者组,它们将共同消费Topic1和Tag1的消息,并且每个消息只会被一个消费者处理。
阅读全文