相关代码怎么写
时间: 2023-07-10 17:19:03 浏览: 82
好的,以下是一个简单的示例代码,仅供参考。该代码实现了同时消费多个topic,并且使用线程池来实现并发消费。
```java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiTopicConsumer {
private final int numThreads; // 消费线程数
private final String groupId; // 消费组ID
private final List<String> topics; // 需要消费的topic
private ExecutorService executor; // 线程池
private KafkaConsumer<String, String> consumer; // 消费者实例
public MultiTopicConsumer(int numThreads, String groupId, List<String> topics, Properties props) {
this.numThreads = numThreads;
this.groupId = groupId;
this.topics = topics;
this.executor = Executors.newFixedThreadPool(numThreads);
this.consumer = new KafkaConsumer<>(props);
}
public void consume() {
try {
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 在Rebalance之前提交offset
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 从上次提交的offset开始消费
consumer.seekToEnd(partitions);
}
});
for (int i = 0; i < numThreads; i++) {
executor.submit(new ConsumerTask(consumer));
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static class ConsumerTask implements Runnable {
private final KafkaConsumer<String, String> consumer;
public ConsumerTask(KafkaConsumer<String, String> consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
// 同步提交offset
consumer.commitSync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
List<String> topics = Arrays.asList("topic1", "topic2", "topic3");
MultiTopicConsumer consumer = new MultiTopicConsumer(3, "test-group", topics, props);
consumer.consume();
}
}
```
在这个示例代码中,我们使用了Kafka的Java客户端API来连接Kafka,创建消费者并订阅多个topic。我们使用了线程池技术来实现并发消费。每个消费线程都运行在一个ConsumerTask实例中,该实例使用KafkaConsumer.poll()方法获取消息并处理消息。在处理完消息后,我们使用KafkaConsumer.commitSync()方法同步提交offset。
另外需要注意的是,我们在Rebalance之前提交offset,避免在Rebalance之后消费者重复消费消息。在Rebalance之后,我们从上次提交的offset开始消费。这样可以保证消费者不会漏掉消息。
希望这个示例代码能够帮助您理解如何使用Java多线程消费Kafka多个topic。
阅读全文