不是订阅mqtt,是要订阅kafka的topic
时间: 2023-05-30 14:06:16 浏览: 93
对不起,我理解有误。要订阅Kafka的topic,需要使用Kafka的消费者API。以下是一个示例代码,它使用Kafka的Java消费者API来订阅一个topic并消费消息。
```java
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
String topic = "my-topic"; // 要订阅的topic名称
String group = "my-group"; // 消费者组名称
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka的broker地址
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic)); // 订阅topic
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); // 从Kafka中拉取消息
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
}
}
```
在这个示例中,我们使用了Kafka的Java消费者API来创建一个消费者,并订阅了一个名为"my-topic"的topic。然后我们通过轮询方式从Kafka中拉取消息,并遍历这些消息并打印出来。请注意,Kafka的消费者API还提供了其他的一些配置选项,例如自动提交偏移量、手动提交偏移量、消费者偏移量重置等。根据你的实际需求,可以选择相应的配置选项。
阅读全文