写一个类消费rocketmq的消息
时间: 2024-12-14 19:17:17 浏览: 2
在Java中,如果你想要创建一个类来消费RocketMQ消息,你可以使用Apache RocketMQ提供的Java客户端API。以下是一个简单的示例,展示了一个`MessageConsumer`类的大概结构:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.common.message.Message;
public class RocketMQMessageConsumer {
private DefaultMQPushConsumer consumer;
// 初始化消费者
public RocketMQMessageConsumer(String groupName, String consumerName) {
consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr("localhost:9876"); // 设置NMS地址,实际生产环境中替换为你的真实地址
consumer.subscribe("topicTest", "*", new MessageSelector() { // 订阅主题,这里订阅所有消息
@Override
public boolean select(Message msg) {
return true; // 简单的选择策略,实际可以根据需要编写条件
}
});
}
// 消费消息的方法
public void consumeMessage() {
consumer.start(); // 启动消费者
while (true) {
Message message = consumer.poll(500); // 每隔500毫秒拉取消息,可根据业务调整时间间隔
if (message != null) {
System.out.printf("Received message from topic %s: %s%n", message.getTopic(), message.getBody().toString());
// 对收到的消息进行处理...
} else {
break;
}
}
consumer.shutdown(); // 关闭消费者
}
// 示例结束,记得关闭消费者资源
}
阅读全文