帮我写一个rocketMQ消费者区分topic和group
时间: 2024-05-01 13:18:22 浏览: 163
好的,我来给你写一个简单的RocketMQ消费者示例代码,用于区分不同的Topic和Group:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
public class RocketMQConsumer {
private static final String NAME_SERVER_ADDR = "localhost:9876"; // RocketMQ NameServer地址
private static final String CONSUMER_GROUP = "test_consumer_group"; // 消费者组名
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(NAME_SERVER_ADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 从最早的消息开始消费
consumer.setMessageModel(MessageModel.CLUSTERING); // 消息模式为集群模式
// 这里可以添加多个订阅的Topic和Group
try {
consumer.subscribe("Topic1", "Group1");
consumer.subscribe("Topic2", "Group2");
consumer.subscribe("Topic3", "Group3");
} catch (MQClientException e) {
e.printStackTrace();
}
// 设置消息监听器,处理消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
try {
consumer.start();
System.out.printf("Consumer Started.%n");
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
```
上面的代码中启动了一个RocketMQ消费者,它可以订阅多个Topic和Group,并且可以处理不同的消息。你可以根据自己的实际需求修改代码中的订阅Topic和Group。
注意:在实际使用中,需要根据不同的情况对RocketMQ消费者做进一步的配置和优化。
阅读全文