实际上是生产者调用远程接口,然后把消息存储,然后消费者才获取
时间: 2024-03-21 10:42:28 浏览: 54
dubbox生产者和消费者示例代码(含依赖包).zip
非常抱歉,我理解错了,确实是生产者调用远程接口将消息发送到RocketMQ的Broker节点,然后消费者通过调用远程接口来消费这些消息。下面是一个基于Java语言的RocketMQ生产者、消费者的例子:
1. 生产者调用远程接口发送消息
```java
public class RocketMQProducer {
private static final String NAMESRV_ADDR = "127.0.0.1:9876";
private static final String TOPIC_NAME = "test_topic";
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置Namesrv地址
producer.setNamesrvAddr(NAMESRV_ADDR);
// 启动生产者实例
producer.start();
// 创建消息实例,指定Topic、Tag和消息内容
Message message = new Message(TOPIC_NAME, "TagA", "Hello, RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 调用远程接口发送消息
SendResult sendResult = producer.send(message);
// 输出消息发送结果
System.out.printf("SendResult status:%s, msgId:%s%n", sendResult.getSendStatus(), sendResult.getMsgId());
// 关闭生产者实例
producer.shutdown();
}
}
```
2. 消费者调用远程接口消费消息
```java
public class RocketMQConsumer {
private static final String NAMESRV_ADDR = "127.0.0.1:9876";
private static final String TOPIC_NAME = "test_topic";
private static final String CONSUMER_GROUP = "consumer_group";
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// 设置Namesrv地址
consumer.setNamesrvAddr(NAMESRV_ADDR);
// 订阅Topic和Tag
consumer.subscribe(TOPIC_NAME, "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
// 处理消息
for (MessageExt message : messages) {
System.out.printf("Thread Name:%s, Receive New Messages:%s %n", Thread.currentThread().getName(), new String(message.getBody()));
}
// 返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
```
在上面的例子中,生产者调用远程接口发送消息,消费者调用远程接口消费消息。当生产者发送消息时,消息首先被RocketMQ存储在Broker节点上,然后消费者通过调用远程接口来消费这些消息。
阅读全文