在不同方法中kafka消费不同topic
时间: 2023-05-30 16:06:07 浏览: 136
可以使用多线程的方式,在每个线程中消费不同的topic。具体做法如下:
1. 创建多个消费者对象,每个消费者对象消费不同的topic。
2. 将每个消费者对象放入一个线程中,并启动线程。
3. 在每个线程中,使用消费者对象消费对应的topic。
示例代码:
```java
public class KafkaConsumerDemo {
private static final String TOPIC_1 = "topic1";
private static final String TOPIC_2 = "topic2";
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(new ConsumerThread(TOPIC_1));
executor.submit(new ConsumerThread(TOPIC_2));
}
private static class ConsumerThread implements Runnable {
private final String topic;
public ConsumerThread(String topic) {
this.topic = topic;
}
@Override
public void run() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
}
```
上面的代码中,创建了两个消费者线程,一个消费topic1,一个消费topic2。在每个线程中,创建了一个KafkaConsumer对象,用于消费对应的topic。在循环中,使用poll方法获取消息,并处理消息。
阅读全文