java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
时间: 2024-04-22 19:07:31 浏览: 15
这个异常通常是在 Kafka 消费者应用程序中出现的,它表示消费者没有订阅任何主题或分配任何分区就尝试消费消息。在使用 Kafka 消费者应用程序时,您需要确保在调用 `poll()` 方法之前已经订阅了至少一个主题或者分配了至少一个分区。
您可以使用 `subscribe()` 方法订阅一个主题列表,也可以使用 `assign()` 方法手动分配分区。如果您使用 `subscribe()` 方法,Kafka 会自动为您进行分区分配。以下是一个订阅主题的示例代码:
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
```
在上面的代码中,我们使用 `subscribe()` 方法订阅了一个名为 "my-topic" 的主题,并在一个无限循环中调用 `poll()` 方法来获取消息。如果您想手动分配分区,可以使用 `assign()` 方法,如下所示:
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition partition = new TopicPartition("my-topic", 0);
consumer.assign(Arrays.asList(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
```
在上面的代码中,我们使用 `assign()` 方法手动分配了名为 "my-topic" 的主题的第 0 个分区,并在一个无限循环中调用 `poll()` 方法来获取消息。