springboot 对接rocketmq
时间: 2023-11-05 15:04:23 浏览: 77
SpringBoot Dubbo RocketMQ订单支付系统.zip
Spring Boot对接RocketMQ可以使用RocketMQ官方提供的Java客户端,即Apache RocketMQ Java Client。以下是Spring Boot对接RocketMQ的步骤:
1. 添加依赖
在Spring Boot项目的pom.xml文件中添加以下依赖:
```xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
```
2. 配置RocketMQ属性
在application.properties文件中添加RocketMQ的配置属性,例如:
```properties
rocketmq.name-server=localhost:9876
```
除了name-server属性,还可以配置producer和consumer的相关属性。
3. 创建Producer
在Spring Boot项目中创建Producer,例如:
```java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class RocketMQProducer {
@Value("${rocketmq.name-server}")
private String nameServer;
private DefaultMQProducer producer;
@PostConstruct
public void init() throws MQClientException {
producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr(nameServer);
producer.start();
}
public void sendMessage(String topic, String message) throws Exception {
Message msg = new Message(topic, message.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("Send message success. Topic: %s, Message: %s%n", topic, message);
}
@PreDestroy
public void destroy() {
producer.shutdown();
}
}
```
其中,使用@Value注解注入了name-server配置属性,创建了DefaultMQProducer,并在init()方法中启动了Producer。
4. 创建Consumer
在Spring Boot项目中创建Consumer,例如:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class RocketMQConsumer {
@Value("${rocketmq.name-server}")
private String nameServer;
private DefaultMQPushConsumer consumer;
public RocketMQConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("test_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("Consume message success. Topic: %s, Message: %s%n", msg.getTopic(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
public void shutdown() {
consumer.shutdown();
}
}
```
其中,使用@Value注解注入了name-server配置属性,创建了DefaultMQPushConsumer,并在构造方法中设置了Consumer的相关属性,包括订阅的topic、MessageListener等,并在最后启动了Consumer。
5. 发送和消费消息
在Spring Boot项目中可以调用RocketMQProducer的sendMessage()方法发送消息,例如:
```java
@Autowired
private RocketMQProducer rocketMQProducer;
public void send() throws Exception {
rocketMQProducer.sendMessage("test_topic", "Hello, RocketMQ!");
}
```
RocketMQConsumer会自动消费订阅的消息。
以上就是Spring Boot对接RocketMQ的基本步骤。需要注意的是,在使用完Producer和Consumer后需要关闭它们,以释放资源。在上面的例子中,使用了@PostConstruct和@PreDestroy注解分别在对象创建和销毁时调用init()和destroy()方法。
阅读全文