接受消息的代码demo
时间: 2023-12-28 22:05:08 浏览: 25
当然,这里是一个简单的 RocketMQ Java 代码示例,用于接收消息:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketMQConsumer {
public static void main(String[] args) {
// 实例化消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_consumer_group");
// 设置NameServer地址
consumer.setNamesrvAddr("your_namesrv_address");
try {
// 订阅主题和标签
consumer.subscribe("your_topic", "your_tag");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
for (MessageExt message : list) {
// 处理接收到的消息
System.out.println("接收到消息:" + new String(message.getBody()));
}
// 返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消费者已启动,等待接收消息...");
// 阻塞主线程,保持消费者运行
Thread.sleep(Long.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭消费者
consumer.shutdown();
}
}
}
```
请根据实际情况替换代码中的 `your_consumer_group` 和 `your_namesrv_address`,以及根据需要修改订阅的主题和标签。这个示例展示了如何使用 RocketMQ 的 Java 客户端接收消息,并通过注册消息监听器来处理接收到的消息。