没有合适的资源?快使用搜索试试~ 我知道了~
首页springBoot整合RocketMQ及坑的示例代码
springBoot整合RocketMQ及坑的示例代码
763 浏览量
更新于2023-05-23
评论
收藏 443KB PDF 举报
主要介绍了springBoot整合RocketMQ及坑的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
资源详情
资源评论
资源推荐

springBoot整合整合RocketMQ及坑的示例代码及坑的示例代码
主要介绍了springBoot整合RocketMQ及坑的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
版本:
JDK:1.8
springBoot:1.5.10
rocketMQ:4.2.0
pom 配置:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.10.RELEASE</version>
</parent>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
application.properties 配置:
# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=localhost:9876
java代码:
生产者
package test.config.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;
import javax.annotation.PostConstruct;
@Component
public class RocketMQClient {
/**
* 生产者的组名
*/
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQProducer() {
//生产者的组名
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
//指定NameServer地址,多个地址以 ; 隔开
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
try {
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
//创建一个消息实例,包含 topic、tag 和 消息体
//如下:topic 为 "TopicTest",tag 为 "push"
Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
StopWatch stop = new StopWatch();
stop.start();
for (int i = 0; i < 1; i++) {
SendResult result = producer.send(message);
System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
}
stop.stop();
System.out.println("----------------发送一万条消息耗时:" + stop.getTotalTimeMillis());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}
消费者:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
















安全验证
文档复制为VIP权益,开通VIP直接复制

评论0