kafka内置函数subscribe
时间: 2023-10-02 14:07:58 浏览: 134
Kafka内置函数subscribe用于订阅一个或多个主题(topic)的消息。它是Kafka消费者API中的一个方法,用于告诉消费者要从哪些主题中接收消息。
在Java客户端中,使用subscribe函数可以订阅一个或多个主题。例如:
```java
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅单个主题
consumer.subscribe(Collections.singleton("my-topic"));
// 订阅多个主题
// consumer.subscribe(Arrays.asList("topic1", "topic2", "topic3"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
// 处理接收到的消息
}
}
}
```
在上面的例子中,我们使用了KafkaConsumer类的subscribe方法,将"my-topic"作为参数传递给subscribe函数,以订阅名为"my-topic"的主题的消息。
你也可以使用subscribe方法订阅多个主题,只需要将主题名以集合(List)的形式传递给subscribe函数即可。
需要注意的是,subscribe函数是非阻塞的,它会立即返回并将订阅请求发送到Kafka集群,然后消费者将开始接收订阅的主题上的消息。
阅读全文