java多线程消费kafka
时间: 2023-06-30 11:18:33 浏览: 113
kafka-example:卡夫卡的例子
好的,针对你的问题,我可以给出一些思路和代码示例。使用Java多线程消费Kafka需要借助Kafka的Java客户端API和Java多线程编程的知识。
首先,你需要引入Kafka的Java客户端API依赖,可以在Maven或Gradle中添加如下依赖:
```xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
```
然后,可以编写一个Kafka消费者类,继承自Thread类,用于启动多个线程同时消费Kafka消息。示例代码如下:
```java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumerThread extends Thread {
private final KafkaConsumer<String, String> consumer;
private final String topic;
public KafkaConsumerThread(String topic, String groupId) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
this.topic = topic;
}
@Override
public void run() {
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Thread %s, partition %d, offset %d, key %s, value %s%n",
Thread.currentThread().getName(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}
public void shutdown() {
consumer.close();
}
}
```
在上述代码中,KafkaConsumerThread类继承自Thread类,用于启动一个消费者线程。在类的构造函数中,我们传入了topic和groupId,用于指定消费的主题和消费者组。在run()方法中,我们使用consumer.poll()方法从Kafka获取消息,并遍历消息进行消费。在消费完成后,我们使用consumer.commitSync()方法手动提交消费位移。在shutdown()方法中,我们关闭了消费者对象。
最后,我们可以在应用程序中启动多个KafkaConsumerThread线程,用于同时消费Kafka消息。示例代码如下:
```java
public static void main(String[] args) {
int numThreads = 3;
String groupId = "test-group";
String topic = "test-topic";
List<KafkaConsumerThread> consumers = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
KafkaConsumerThread consumer = new KafkaConsumerThread(topic, groupId);
consumer.start();
consumers.add(consumer);
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (KafkaConsumerThread consumer : consumers) {
consumer.shutdown();
}
}));
}
```
在上述代码中,我们启动了3个KafkaConsumerThread线程,用于同时消费测试主题test-topic中的消息。我们还注册了一个JVM关闭钩子,用于在应用程序关闭时优雅地关闭消费者线程。
阅读全文