写一个rocketmq消费者代码
时间: 2023-07-07 16:16:04 浏览: 122
好的,以下是一个简单的 RocketMQ 消费者代码示例,供参考:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或多个主题,并指定tag来过滤需要消费的消息
consumer.subscribe("topic_name", "tag");
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt message : msgs) {
System.out.println(new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.println("Consumer started.");
}
}
```
这个示例中,我们通过 DefaultMQPushConsumer 类创建了一个消费者实例,并设置了 NameServer 的地址、订阅了一个主题和指定的 tag,并注册了一个回调函数来处理消息。在回调函数中,我们遍历收到的消息,并将其输出到控制台。最后,我们启动了消费者实例。
阅读全文