java代码 判断kafka消费完了
时间: 2023-08-02 21:32:48 浏览: 49
Java实现Kafka生产者消费者代码实例
5星 · 资源好评率100%
可以使用KafkaConsumer的`poll`方法来拉取消息,处理完后再手动提交offset,如果提交的offset等于分区最新的offset,则说明这个分区的消息已经全部消费完了。以下是Java代码示例:
```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 手动提交offset
consumer.commitSync();
// 获取已提交的offset
long committedOffset = consumer.committed(new TopicPartition("test-topic", 0)).offset();
// 获取分区最新的offset
long endOffset = consumer.endOffsets(Arrays.asList(new TopicPartition("test-topic", 0))).get(new TopicPartition("test-topic", 0));
// 判断是否消费完
if (committedOffset >= endOffset) {
System.out.println("Partition test-topic-0 consumed all messages");
break;
}
}
consumer.close();
}
}
```
在代码中,使用`consumer.poll`方法拉取消息,处理完后调用`consumer.commitSync`方法手动提交offset。然后使用`consumer.committed`方法获取已提交的offset,使用`consumer.endOffsets`方法获取分区最新的offset,最后通过比较这两个offset来判断分区的消息是否已经全部消费完了。
阅读全文