rocketmq小案例
时间: 2024-12-27 10:17:59 浏览: 2
### RocketMQ 简单实例教程
#### 1. 准备工作
为了能够顺利运行 RocketMQ 实例,需确认已安装与 RocketMQ 版本相匹配的 JDK 并设置好 `JAVA_HOME` 环境变量[^3]。
#### 2. 下载并解压 RocketMQ
访问官方提供的下载页面获取最新版 RocketMQ 压缩包,并将其放置于服务器上任意位置进行解压缩操作。由于 Java 编程语言具备跨平台特性,因此无需担心操作系统差异带来的影响[^1]。
#### 3. 启动 NameServer 组件
进入 RocketMQ 解压后的根目录,在终端输入如下指令启动名称服务:
```bash
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
```
上述命令将会以后台进程的方式启动 NameServer,并实时监控其日志输出以便观察启动状态。
#### 4. 启动 Broker 组件
同样位于 RocketMQ 根目录下执行以下 Shell 脚本来开启消息代理节点:
```bash
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
```
此步骤中的 `-n` 参数指定了之前启动好的 NameServer 地址;而后面的命令用于追踪 Broker 日志变化情况以验证是否成功上线。
#### 5. 创建生产者应用程序
下面展示一段基于 Spring Boot 构建的消息发送端代码片段:
```java
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;
@Service
public class ProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String message){
this.rocketMQTemplate.convertAndSend(topic,message);
}
}
```
这段代码展示了如何利用 `RocketMQTemplate` 来封装消息发送逻辑,其中 `convertAndSend()` 方法负责将指定主题下的字符串形式的消息推送到目标队列中去[^2]。
#### 6. 创建消费者应用程序
接着编写接收方的服务类实现自动消费功能:
```java
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import java.util.List;
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer-group")
@Service
public class ConsumerService implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
```
此处定义了一个监听器接口的具体实现方式,每当有新数据到达时便会触发相应处理流程并将具体内容打印出来显示给用户查看[^4]。
阅读全文