RocketMQ 编程简单示例
时间: 2023-10-30 18:06:17 浏览: 47
以下是一个简单的 RocketMQ 编程示例,在 Java 中实现:
1. 首先,需要引入 RocketMQ 的依赖:
```xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
```
2. 然后,创建一个生产者并发送消息:
```java
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 创建一个生产者,指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("test_group");
// 指定 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息对象,指定主题、标签和消息内容
Message message = new Message("test_topic", "test_tag", "Hello, RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
producer.send(message);
// 关闭生产者
producer.shutdown();
}
}
```
3. 最后,创建一个消费者并接收消息:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建一个消费者,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group");
// 指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题和标签
consumer.subscribe("test_topic", "test_tag");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
// 处理消息
for (MessageExt message : messages) {
System.out.printf("Received message: %s%n", new String(message.getBody()));
}
// 返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
```
以上就是一个简单的 RocketMQ 编程示例,可以进一步扩展和优化。