java 代码实现activemq接收消息
时间: 2024-09-17 14:03:29 浏览: 36
springboot集成activemq实现消息接收demo
在Java中使用ActiveMQ接收消息通常涉及以下几个步骤。首先,你需要添加相关的依赖,比如Apache ActiveMQ的AMQSTools库。以下是基本的消费者示例:
```java
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.ExecutorService;
public class AMQJMSConsumer {
private static final String QUEUE_NAME = "YOUR_QUEUE_NAME";
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
// 获取连接
Connection connection = connectionFactory.createConnection();
connection.start();
// 开启事务式Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建消息消费者
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
System.out.println("【消费者】开始监听:" + QUEUE_NAME);
while (true) {
TextMessage message = (TextMessage) consumer.receive(1000); // 设置超时时间,单位毫秒
if (message != null) {
executorService.submit(() -> processMessage(message.getText()));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭资源
if (executorService != null) {
executorService.shutdownNow();
}
}
}
private static void processMessage(String message) {
// 处理接收到的消息
System.out.println("Received message: " + message);
}
}
```
这个例子中,创建了一个单线程的ExecutorService来异步处理接收到的消息。`processMessage`方法是你自定义的实际业务逻辑,将接收到的文本消息内容打印出来。
阅读全文