kafka消费者的完整代码
时间: 2023-07-06 15:38:22 浏览: 92
以下是Kafka消费者的完整代码示例,使用Java语言编写:
```java
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.errors.*;
public class KafkaConsumerExample {
private static Scanner in;
private static boolean stop = false;
public static void main(String[] argv)throws Exception{
if (argv.length != 2) {
System.err.printf("Usage: %s <topicName> <groupId>\n",
KafkaConsumerExample.class.getSimpleName());
System.exit(-1);
}
in = new Scanner(System.in);
String topicName = argv[0];
String groupId = argv[1];
ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
consumerRunnable.start();
String line = "";
while (!line.equals("exit")) {
line = in.next();
}
consumerRunnable.getKafkaConsumer().wakeup();
System.out.println("Stopping consumer .....");
consumerRunnable.join();
}
private static class ConsumerThread extends Thread{
private String topicName;
private String groupId;
private KafkaConsumer<String,String> kafkaConsumer;
public ConsumerThread(String topicName, String groupId){
this.topicName = topicName;
this.groupId = groupId;
}
public void run() {
Properties configProperties = new Properties();
configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");
kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Collections.singletonList(topicName));
try {
while (!stop) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.value());
}
} catch(WakeupException ex){
System.out.println("Exception caught " + ex.getMessage());
} finally{
kafkaConsumer.close();
System.out.println("After closing KafkaConsumer");
}
}
public KafkaConsumer<String,String> getKafkaConsumer(){
return this.kafkaConsumer;
}
}
}
```
请注意,上面的示例代码中使用了Java的多线程技术,因为Kafka消费者需要在后台运行以接收Kafka集群中的消息。
阅读全文