java监听kafka获取数据
时间: 2023-07-06 15:39:40 浏览: 121
2、java调用kafka api
要在Java中监听Kafka并获取数据,可以使用Kafka的Java客户端库。以下是一个基本的示例代码:
```java
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.*;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
String topicName = "my-topic";
String groupId = "my-group";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
```
在此示例中,我们使用了`KafkaConsumer`类来订阅`my-topic`主题,并使用`poll()`方法从Kafka集群获取消息。一旦收到记录,我们可以从`ConsumerRecord`对象中获取偏移量、键和值,并在控制台中打印它们。
注意,这只是一个基本示例。在生产环境中,您需要考虑许多其他因素,例如如何处理异常、如何将消费者组配置为适当的大小等。
阅读全文