kafka的topic在代码中如何定义
时间: 2024-12-24 11:34:36 浏览: 4
kafka监控获取指定topic的消息总量示例
3星 · 编辑精心推荐
在Kafka中,Topic是一种消息队列的概念,用于存储和组织生产者发送的数据流。在Java中,通常通过`org.apache.kafka.clients.producer.KafkaProducer`或者`org.apache.kafka.clients.consumer.ConsumerRecord`等API来操作Topic。以下是一个简单的例子:
**生产者创建Topic:**
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", Serdes.String().serializer());
props.put("value.serializer", Serdes.String().serializer());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 定义Topic名
String topicName = "my-topic";
producer.send(new ProducerRecord<>(topicName, "key", "value")); // 生产一条消息
producer.close(); // 关闭连接
```
**消费者订阅Topic:**
```java
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-group"); // 消费组标识
consumerProps.put("key.deserializer", Serdes.String().deserializer());
consumerProps.put("value.deserializer", Serdes.String().deserializer());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList(topicName)); // 订阅Topic
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.close();
```
这里我们通过配置属性来指定Topic名称,然后创建生产者或消费者的实例,并分别调用对应的方法来定义或订阅Topic。
阅读全文