spring boot消息队列使用AmqpAdmin创建任意类型的交换器(订阅、通配符、路由),并绑定队列。向队列中发送以自己学号+姓名生成的User对象,以监听的方式接收消息并输出。完整代码 截图:组件定义类、发送消息类、监听类、运行结果
时间: 2025-01-06 13:50:35 浏览: 3
Spring Boot结合AMQP(Advanced Message Queuing Protocol)使用RabbitMQ作为消息队列时,可以利用`AmqpAdmin`来创建交换器和队列,并进行消息的发送和接收。下面是一个简单的示例,包括了创建交换器、绑定队列以及发送和监听用户对象:
首先,在`ApplicationConfig`或类似的配置类中,定义消息队列相关的bean:
```java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.support.Simple AmqpAdmin;
@EnableRabbit
public class RabbitConfig {
private static final String EXCHANGE_NAME = "userExchange";
private static final String QUEUE_NAME = "userQueue";
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
return factory;
}
@Bean
public SimpleAmqpAdmin amqpAdmin(CachingConnectionFactory connectionFactory) {
return new SimpleAmqpAdmin(connectionFactory);
}
@Bean
public Exchange userExchange(SimpleAmqpAdmin admin) {
return admin.declareExchange(ExchangeBuilder.exchangeBuilder()
.withName(EXCHANGE_NAME)
.setType(ExchangeTypes.DIRECT)
.build());
}
@Bean
public Queue userQueue(SimpleAmqpAdmin admin) {
return admin.declareQueue(QUEUE_NAME);
}
@Bean
public Binding binding(UserExchange userExchange, UserQueue userQueue) {
return BindingBuilder.bind(userQueue).to(userExchange).with("#");
}
// 发送消息的 RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setDefaultSerializer(new Jackson2JsonMessageConverter());
return template;
}
// 听取消息的容器
@Bean
public MessageListenerContainer messageListenerContainer(
RabbitTemplate rabbitTemplate,
UserQueue userQueue,
UserMessageConsumer consumer) {
Map<String, Object> map = new HashMap<>();
map.put(MessageListenerContainer.EXCHANGE_NAMES_KEY, userExchange.getName());
map.put(MessageListenerContainer.QUEUE_NAMES_KEY, userQueue.getName());
return new DirectMessageListenerContainer(rabbitTemplate, consumer, map);
}
}
```
接下来,创建一个User类用于表示用户对象:
```java
public class User {
private String studentId;
private String name;
// getters and setters...
}
```
然后,我们创建一个发送消息的类(如`SendUserMessage`):
```java
public class SendUserMessage {
private RabbitTemplate rabbitTemplate;
public void sendMessage(String studentId, String name) {
User user = new User(studentId, name);
rabbitTemplate.convertAndSend(userExchange.getName(), "#" + studentId, user);
}
}
```
最后,创建一个监听类(如`UserMessageConsumer`),用于处理接收到的消息:
```java
@Component
public class UserMessageConsumer implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String id = message.getMessageProperties().getRoutingKey().split("#")[0];
User user = (User) message.getBody();
System.out.println("Received message from student " + id + ": " + user.getName());
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
运行这段代码,你可以通过`SendUserMessage`类实例发送以学生ID和姓名为内容的消息,`UserMessageConsumer`会监听并打印接收到的信息。
相关问题:
1. Spring Boot如何设置RabbitMQ的连接信息?
2. RabbitMQ的Direct、Fanout、Topic和Header四种交换模式分别是什么用途?
3. 如果需要同时处理多个队列,应该如何修改这个例子?
阅读全文