RocketMQ5.1.0消费者监听接口案例
时间: 2023-08-06 07:02:25 浏览: 97
以下是RocketMQ 5.1.0版本的消费者监听接口案例:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group");
// 指定NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
```
这个案例中,我们实例化了一个`DefaultMQPushConsumer`对象,并设置了它的名称和NameServer地址。我们还通过`subscribe`方法订阅了`TopicTest`主题和所有的标签。最后,我们通过`registerMessageListener`方法注册了一个实现了`MessageListenerConcurrently`接口的回调类,来处理从broker拉取回来的消息。在这个回调类中,我们打印了收到的消息,并返回了`ConsumeConcurrentlyStatus.CONSUME_SUCCESS`实例,以表示这个消息已经被成功消费。最后,我们启动了消费者实例并打印了一条成功启动的消息。
阅读全文