springboot rocketmq集成
时间: 2023-04-24 14:00:18 浏览: 108
Spring Boot和RocketMQ的集成可以通过使用RocketMQ的Java客户端来实现。以下是一些步骤:
1. 添加RocketMQ的依赖
在pom.xml文件中添加RocketMQ的依赖:
```
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
```
2. 配置RocketMQ
在application.properties文件中添加RocketMQ的配置:
```
rocketmq.name-server=127...1:9876
rocketmq.producer.group=my-group
```
3. 创建RocketMQ生产者
使用RocketMQ的Java客户端创建一个生产者:
```
@Service
public class RocketMQProducer {
@Value("${rocketmq.producer.group}")
private String producerGroup;
private DefaultMQProducer producer;
@PostConstruct
public void init() throws MQClientException {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(env.getProperty("rocketmq.name-server"));
producer.start();
}
public void sendMessage(String topic, String message) throws Exception {
Message msg = new Message(topic, message.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
@PreDestroy
public void destroy() {
producer.shutdown();
}
}
```
4. 创建RocketMQ消费者
使用RocketMQ的Java客户端创建一个消费者:
```
@Service
public class RocketMQConsumer {
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
private DefaultMQPushConsumer consumer;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(env.getProperty("rocketmq.name-server"));
consumer.subscribe("my-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
@PreDestroy
public void destroy() {
consumer.shutdown();
}
}
```
5. 发送和接收消息
在需要发送消息的地方调用RocketMQProducer的sendMessage方法:
```
@Autowired
private RocketMQProducer producer;
public void send() throws Exception {
producer.sendMessage("my-topic", "Hello, RocketMQ!");
}
```
在需要接收消息的地方注入RocketMQConsumer并启动应用程序:
```
@Autowired
private RocketMQConsumer consumer;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
```
这样就完成了Spring Boot和RocketMQ的集成。
阅读全文