springboot kafkalistener 批量消费消息代码实例
时间: 2024-10-22 14:06:22 浏览: 31
Spring Boot 和 Kafka Listeners 配合可以方便地处理批量消费消息。以下是一个简单的示例代码片段,它使用 Spring Cloud Stream 的 `KafkaTemplate` 实现批量消费:
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class BatchMessageConsumer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 这里的 topic 可以替换为你实际的消息主题名
private static final String TOPIC = "your-topic";
@KafkaListener(topics = "${kafka.topic}", groupId = "group-id", consumerProperties = {
"spring.kafka.batch.size=100" // 设置批处理大小,默认为1
})
public void consumeMessages(String[] messages) {
if (messages != null && !messages.isEmpty()) {
for (String message : messages) {
System.out.println("Received message: " + message);
// 这里你可以进一步处理消息,例如存储到数据库、更新状态等
}
} else {
System.out.println("No messages available in this batch.");
}
// 执行完批处理后,自动发送确认,以便Kafka处理队列
kafkaTemplate.send(TOPIC, "Acknowledged");
}
}
```
在这个例子中,`@KafkaListener`注解配置了监听的主题 (`topics`)、消费者组 (`groupId`) 和设置批处理大小 (`batch.size`)。当有消息到达指定主题时,`consumeMessages`方法会被调用,并接收一个包含多个消息的字符串数组。每批接收到的消息都会逐条处理并发送确认。
阅读全文