rocketmq pull 示例代码
时间: 2023-07-03 16:21:14 浏览: 115
rocketmq实例代码
4星 · 用户满意度95%
以下是 RocketMQ 的 pull 示例代码,供您参考:
```java
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class PullConsumer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
// 实例化消息消费者
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("example_group_name");
// 设置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 启动消费者实例
consumer.start();
// 从指定队列中拉取消息
MessageQueue mq = new MessageQueue("TopicTest", "BrokerA", 1);
long offset = consumer.fetchConsumeOffset(mq, false);
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
// 处理消息
break;
case NO_MATCHED_MSG:
// 没有匹配的消息
break;
case NO_NEW_MSG:
// 没有新的消息
break;
case OFFSET_ILLEGAL:
// 非法的 offset
break;
default:
break;
}
// 关闭消费者实例
consumer.shutdown();
}
}
```
在这个示例中,我们创建了一个 DefaultMQPullConsumer 实例,并指定了 NameServer 地址。然后,我们从指定的消息队列中拉取消息,并处理它们。最后,我们关闭了消费者实例。
阅读全文