java消费kafka程序实例
时间: 2023-12-13 21:34:27 浏览: 88
以下是Java实现Kafka消费者的代码示例:
```java
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
String topicName = "test";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
System.out.println("Subscribed to topic " + topicName);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
}
```
上述代码中,我们使用KafkaConsumer类来创建一个消费者实例。我们需要指定Kafka集群的地址和端口,以及消费者组的ID。然后,我们订阅一个主题,并在一个无限循环中轮询新的消息。当我们收到新的消息时,我们可以从ConsumerRecord对象中获取消息的偏移量、键和值。
阅读全文