Rocketmq消费者代码实现
时间: 2023-09-17 07:10:57 浏览: 173
RocketMQ消费者可以使用Java客户端来实现,以下是一个简单的示例代码:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketMQConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 设置消费者从哪里开始消费消息
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅主题和标签
consumer.subscribe("test_topic", "test_tag");
// 注册消息监听器
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
System.out.println("Consumer started.");
}
}
```
在此示例代码中,我们创建了一个名为`consumer_group`的消费者实例,并设置了NameServer地址为`localhost:9876`。然后,我们订阅了主题为`test_topic`、标签为`test_tag`的消息,并注册了消息监听器。当消费者接收到消息时,消息监听器会对消息进行处理。
最后,我们启动了消费者并输出了一条日志消息。注意,在实际生产环境中,您需要根据实际情况配置消费者参数,例如重试次数、批量消费等。
阅读全文