rocketmq 的demo示例
时间: 2023-07-21 10:59:50 浏览: 113
RocketMQ是一个分布式消息中间件,它提供可靠的消息传递和高性能的消息处理能力。下面是一个简单的RocketMQ的demo示例,用于发送和接收消息:
1. 首先,你需要下载并安装RocketMQ,可以从官方网站获取安装包。
2. 启动RocketMQ服务端。进入RocketMQ的安装目录,执行以下命令启动NameServer和Broker:
```
# 启动NameServer
sh mqnamesrv
# 启动Broker
sh mqbroker -n localhost:9876
```
3. 在Java项目中引入RocketMQ的依赖。在项目的pom.xml文件中添加以下依赖:
```xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
```
4. 发送消息的示例代码:
```java
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class RocketMQProducer {
public static void main(String[] args) throws MQClientException {
// 实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
try {
// 创建消息实例,指定Topic、Tag和消息内容
Message msg = new Message("topic", "tag", "Hello RocketMQ".getBytes());
// 发送消息并获取发送结果
SendResult sendResult = producer.send(msg);
System.out.println("发送结果:" + sendResult);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭生产者
producer.shutdown();
}
}
}
```
5. 接收消息的示例代码:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketMQConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic和Tag
consumer.subscribe("topic", "tag");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("接收到消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
}
}
```
以上示例代码演示了如何使用RocketMQ发送和接收消息。你可以根据自己的需求进行修改和扩展。希望对你有所帮助!
阅读全文