java消费kafka代码
时间: 2023-02-21 18:35:35 浏览: 134
下面是一个简单的 Java 代码,用于从 Apache Kafka 中消费数据:
```
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Key: " + record.key() + ", Value: " + record.value());
}
}
}
}
```
在这段代码中,我们创建了一个 `KafkaConsumer` 对象,并使用 `subscribe` 方法订阅了一个名为 "test-topic" 的主题。然后,我们在一个死循环中不断调用 `poll` 方法,以从 Kafka 服务器中获取消息。每当收到消息时,我们都会打印出消息的键和值。
请注意,上面的代码仅仅是一个简单的示例,它不适用于生产环境。在实际的应用程序中,您可能需要处理更多的错误处理和异常处理
阅读全文