java操作kafka
时间: 2023-11-08 07:02:58 浏览: 128
Java操作Kafka需要使用Kafka的Java客户端库,即Kafka Java API。使用该API,可以实现消息的生产和消费。具体步骤如下:
1. 引入Kafka Java API相关依赖
2. 创建Kafka生产者对象Producer和Kafka消费者对象Consumer
3. 配置Kafka Producer和Consumer的属性,如bootstrap.servers、key.serializer、value.serializer等
4. 使用Producer发送消息到指定的topic
5. 使用Consumer消费指定topic中的消息
示例代码:
```java
// 创建Kafka生产者对象
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息到指定的topic
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
// 创建Kafka消费者对象
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅指定topic
consumer.subscribe(Collections.singletonList("my-topic"));
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); }
}
// 关闭Kafka生产者和消费者
producer.close();
consumer.close();
```
阅读全文