RocketMQ5.1.0消费者监听注解
时间: 2023-08-05 20:08:57 浏览: 185
rocketmq-all-5.1.0-bin-release,rocketmq-console-ng-2.0.0.jar
在RocketMQ 5.1.0版本中,除了使用监听器类来处理消息外,还可以使用注解来标注消息处理方法。具体步骤如下:
1. 添加`@RocketMQMessageListener`注解到消费者类上,指定`consumerGroup`、`topic`和`messageModel`等属性。
2. 在消息处理方法上添加`@RocketMQMessageListener`注解,指定`consumerGroup`、`topic`、`messageModel`和`selectorExpression`等属性。
3. 在消息处理方法的参数中添加`List<MessageExt>`类型的参数,用于接收从broker拉取回来的消息。
下面是一个使用注解的示例:
```java
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueByConfig;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;
import java.util.List;
import com.alibaba.rocketmq.annotation.ConsumeMode;
import com.alibaba.rocketmq.annotation.MessageModel;
import com.alibaba.rocketmq.annotation.RocketMQMessageListener;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
@Service
@RocketMQMessageListener(consumerGroup = "test-group", topic = "test-topic", selectorExpression = "test-tag",
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadMax = 64,
consumeThreadMin = 20, messageQueueListener = AllocateMessageQueueAveragely.class, consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET)
public class MyMessageListener implements MessageListenerConcurrently {
@Override
@RocketMQMessageListener(consumerGroup = "test-group", topic = "test-topic", selectorExpression = "test-tag",
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
```
在这个示例中,我们使用了`@RocketMQMessageListener`注解来标注消费者类和消息处理方法。我们指定了消费者组、主题、标签、消息模式、消费模式、消费线程数量、队列分配策略、消费起始位置等属性。在消息处理方法中,我们使用了`ConsumeConcurrentlyContext`参数和`List<MessageExt>`参数来接收消息,并打印了收到的消息。最后,我们返回了`ConsumeConcurrentlyStatus.CONSUME_SUCCESS`实例,表示这个消息已经被成功消费。
阅读全文