启动后,默认是会创建一个临时队列,临时队列绑定的exchange为 “itcast-default ”,routing key
为 “#”。
所有发送 exchange 为“itcast-default ” 的MQ消息都会被投递到这个临时队列,并且触发上述的方
法。
1.4 自定义消息通道
Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出
流,而在我们实际使用中,往往是需要定义各种输入输出流。使用方法也很简单。
一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个
订单输入,和订单输出两个 binding。
使用时,需要在 @EnableBinding 注解中,添加自定义的接口。
使用 @StreamListener 做监听的时候,需要指定 OrderProcessor.INPUT_ORDER
如上配置,指定了 destination 为 mqTestOrder 的输入输出流。
1.5 消息分组
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例
的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一
条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理,但是有些业务场景之
下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来
实现这样的功能。
interface OrderProcessor {
String INPUT_ORDER = "inputOrder";
String OUTPUT_ORDER = "outputOrder";
@Input(INPUT_ORDER)
SubscribableChannel inputOrder();
@Output(OUTPUT_ORDER)
MessageChannel outputOrder();
}
spring:
cloud:
stream:
defaultBinder: defaultRabbit
bindings:
inputOrder:
destination: mqTestOrder
outputOrder:
destination: mqTestOrder
评论0