使用consumer.poll写一段在java中kafka消费代码,要避免数据漏消费
时间: 2024-11-09 19:17:35 浏览: 7
Java实现Kafka生产者消费者代码实例
5星 · 资源好评率100%
在Java中使用Kafka消费者`Consumer.poll()`来处理消息,为了避免数据漏消费,你需要设置合适的消费组和超时时间,并且确保消费者的运行状态良好。下面是一个基本示例:
```java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerExample {
private final String bootstrapServers = "localhost:9092"; // 替换为你服务器的实际地址
private final String groupId = "my-consumer-group"; // 定义消费组名
private final String topicName = "my-topic"; // 指定你要消费的主题
public void consumeMessages() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true"); // 自动提交偏移量
props.put("auto.commit.interval.ms", "5000"); // 提交间隔,单位毫秒
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); // 设置最大等待时间(这里是100ms)
for (ConsumerRecord<String, String> record : records) {
// 处理接收到的消息
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// 可能需要确认消费(这里仅作演示,实际生产环境中可能会有业务处理逻辑)
consumer.commitSync();
}
}
} finally {
consumer.close();
}
}
public static void main(String[] args) {
KafkaConsumerExample consumer = new KafkaConsumerExample();
consumer.consumeMessages();
}
}
```
在这个例子中,我们设置了自动提交偏移量并定期检查新消息。如果程序中断,Kafka会自动从上一次消费的位置继续。但是,如果你担心网络延迟或其他异常导致消息丢失,可以考虑增加重试机制或者监控系统的可靠性。
阅读全文