mq消费者的一直监听是怎么做的
时间: 2024-02-12 07:05:03 浏览: 86
mq使用入门案例demo
MQ 消费者的一直监听可以通过多线程以及阻塞式的 API 来实现。
一种常见的实现方式是使用线程池来创建多个消费者线程,每个线程都通过阻塞式的 API 从 MQ 中获取消息,并处理消息逻辑。这样,消费者就可以一直监听 MQ 中是否有消息到达,而不会阻塞主线程。
下面是一个示例代码,使用了 JMS API 来实现 MQ 消费者的一直监听:
```java
public class MessageListener implements Runnable {
private static final String QUEUE_NAME = "testQueue";
private static final int THREAD_POOL_SIZE = 10;
private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Destination destination;
private MessageConsumer consumer;
public MessageListener(ConnectionFactory connectionFactory) throws JMSException {
this.connectionFactory = connectionFactory;
this.connection = connectionFactory.createConnection();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.destination = session.createQueue(QUEUE_NAME);
this.consumer = session.createConsumer(destination);
}
@Override
public void run() {
try {
connection.start();
while (true) {
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message: " + textMessage.getText());
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Executor executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
for (int i = 0; i < THREAD_POOL_SIZE; i++) {
executor.execute(new MessageListener(connectionFactory));
}
}
}
```
在上面的代码中,使用了 ActiveMQ API 来实现了一个 MQ 消费者线程,通过多线程以及阻塞式的 API,消费者可以一直监听 MQ 中是否有消息到达。需要注意的是,上面的代码使用了线程池来创建多个消费者线程,以充分利用系统资源。
阅读全文