springboot kafka 消费者代码
时间: 2023-07-23 12:39:31 浏览: 127
以下是一个简单的 Spring Boot Kafka 消费者代码示例:
```java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "myTopic")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
```
在上述代码中,`@KafkaListener` 注解指定了要监听的 Kafka 主题名为 "myTopic",当消费者收到消息时,会自动调用 `consume` 方法,将收到的消息作为参数传入。您可以根据自己的需求修改方法的参数类型和逻辑。
相关问题
springboot kafka 消费者
如何实现自动提交偏移量?
在 Spring Boot 中,可以通过配置属性来实现自动提交偏移量。具体来说,可以设置属性 spring.kafka.consumer.enable-auto-commit 为 true,表示开启自动提交偏移量。同时,还需要设置属性 spring.kafka.consumer.auto-commit-interval-ms 来指定自动提交的时间间隔。例如,可以将该属性设置为 5000,表示每隔 5 秒自动提交一次偏移量。当然,如果你想手动提交偏移量,也可以将该属性设置为 false,然后在代码中手动提交偏移量。
springboot kafkalistener 批量消费消息代码实例
Spring Boot 和 Kafka Listeners 配合可以方便地处理批量消费消息。以下是一个简单的示例代码片段,它使用 Spring Cloud Stream 的 `KafkaTemplate` 实现批量消费:
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class BatchMessageConsumer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 这里的 topic 可以替换为你实际的消息主题名
private static final String TOPIC = "your-topic";
@KafkaListener(topics = "${kafka.topic}", groupId = "group-id", consumerProperties = {
"spring.kafka.batch.size=100" // 设置批处理大小,默认为1
})
public void consumeMessages(String[] messages) {
if (messages != null && !messages.isEmpty()) {
for (String message : messages) {
System.out.println("Received message: " + message);
// 这里你可以进一步处理消息,例如存储到数据库、更新状态等
}
} else {
System.out.println("No messages available in this batch.");
}
// 执行完批处理后,自动发送确认,以便Kafka处理队列
kafkaTemplate.send(TOPIC, "Acknowledged");
}
}
```
在这个例子中,`@KafkaListener`注解配置了监听的主题 (`topics`)、消费者组 (`groupId`) 和设置批处理大小 (`batch.size`)。当有消息到达指定主题时,`consumeMessages`方法会被调用,并接收一个包含多个消息的字符串数组。每批接收到的消息都会逐条处理并发送确认。
阅读全文