写一下kafka单个topic多张表多线程顺序消费代码
时间: 2023-08-13 19:21:05 浏览: 56
以下是一个简单的 Kafka 单个 topic 多张表多线程顺序消费的代码示例:
```java
public class KafkaConsumerThread implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
public KafkaConsumerThread(String groupId, List<String> topics) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
this.topics = topics;
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
// 处理消息
processMessage(record);
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
}
private void processMessage(ConsumerRecord<String, String> record) {
// 根据不同的 topic 和 partition 处理消息,可以通过 record.topic() 和 record.partition() 获取
// 如果存在多张表,可以根据消息的 key 或者 value 进行判断和处理
// 如果需要多线程顺序消费,可以将消息存入阻塞队列,让多个消费者线程从队列中取出消息并处理
}
}
```
使用方式:
```java
public static void main(String[] args) {
List<String> topics = Arrays.asList("topic1", "topic2", "topic3");
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 3; i++) {
threads.add(new Thread(new KafkaConsumerThread("group1", topics)));
}
for (Thread thread : threads) {
thread.start();
}
}
```
这里创建了 3 个消费者线程,每个线程都订阅了多个 topic,可以根据需要修改。在 `processMessage` 方法中可以根据消息的 key 或者 value 进行判断和处理,如果需要多线程顺序消费,可以将消息存入阻塞队列,让多个消费者线程从队列中取出消息并处理。