怎么实现rocketmq生产不同的主题,然后消费
时间: 2023-07-07 09:06:12 浏览: 87
RocketMQ是一个分布式消息中间件,可以通过创建不同的主题(Topic)来实现消息的生产和消费。
首先,要创建一个主题,可以使用RocketMQ提供的命令行工具或者API来创建。以下是使用命令行工具创建主题的示例:
```shell
sh bin/mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t myTopic
```
这将在RocketMQ中创建一个名为“myTopic”的主题。
接下来,就可以使用RocketMQ提供的生产者API来向该主题发送消息,示例如下:
```java
DefaultMQProducer producer = new DefaultMQProducer("example_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("myTopic", "tagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);
System.out.println(result);
producer.shutdown();
```
以上代码将向名为“myTopic”的主题发送一条消息,“tagA”是消息的标签,可以用来过滤消息。在发送消息后,将会得到一个发送结果对象。
最后,可以使用RocketMQ提供的消费者API来从该主题消费消息,示例如下:
```java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("myTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
```
以上代码将创建一个消费者,并订阅名为“myTopic”的主题。消费者收到消息后,将通过注册的消息监听器来处理消息。在处理完成后,需要返回一个消费状态,表示消息是否被成功消费。
阅读全文