这貌似还是使用了 kafkaconsumer函数 kafkaconsumer函数在不存在topic时 会创建对应的topic 我看官网文档并没有看见相关的配置 可以不创建topic
时间: 2024-05-20 09:14:30 浏览: 10
你说的没错,`kafkaconsumer`函数在不存在topic时会创建对应的topic。如果你不想让`kafkaconsumer`函数自动创建topic,可以通过以下方式进行配置:
1. 在kafka的配置文件(`server.properties`)中,将`auto.create.topics.enable`设置为`false`,这样kafka就不会自动创建不存在的topic了。
2. 如果你使用的是KafkaConsumer API,可以在创建KafkaConsumer实例时,将`auto.offset.reset`设置为`none`,这样当KafkaConsumer订阅不存在的topic时,就会抛出异常而不是自动创建topic。
但是需要注意的是,如果你不想让`kafkaconsumer`函数自动创建topic,并且确保你的程序不会因为订阅不存在的topic而出现异常,那么建议在程序中先手动创建所有需要使用的topic。
相关问题
Python 通过kazoo函数实现连接至kafka获取topic消费的信息 并且在不存在topic时抛出异常
可以使用 `kazoo` 库来连接至 `kafka` 并获取 `topic` 的消费信息。以下是示例代码:
```python
from kazoo.client import KazooClient
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable
zk_host = 'localhost:2181'
kafka_host = 'localhost:9092'
topic = 'my-topic'
# Connect to ZooKeeper
zk = KazooClient(hosts=zk_host)
zk.start()
# Check if the topic exists
if not zk.exists('/brokers/topics/{}'.format(topic)):
raise Exception('Topic {} does not exist.'.format(topic))
# Connect to Kafka
try:
consumer = KafkaConsumer(
topic,
bootstrap_servers=kafka_host,
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group'
)
except NoBrokersAvailable:
raise Exception('No brokers available at {}.'.format(kafka_host))
# Consume messages from Kafka
for message in consumer:
print(message.value)
```
在上面的代码中,我们首先连接到 `ZooKeeper`,然后检查是否存在指定的 `topic`。如果 `topic` 不存在,我们就抛出一个异常。接下来,我们尝试连接到 `Kafka`,如果连接失败,同样抛出一个异常。最后,我们使用 `KafkaConsumer` 对象消费消息。
请注意,以上代码仅供参考,实际实现可能因环境和需求而异。
我在用librdkafka的C API消费数据。rd_kafka_topic_partition_list_set_offset 该如何使用,有例子不
很高兴为您解答这个问题!rd_kafka_topic_partition_list_set_offset 函数可以用于设置消费者的偏移量,具体使用方法如下:
```c
rd_kafka_topic_partition_list_t *topics;
rd_kafka_topic_partition_list_new(&topics, 1);
rd_kafka_topic_partition_list_add(topics, "test_topic", 0);
// 设置偏移量为100
rd_kafka_topic_partition_list_set_offset(topics, "test_topic", 0, 100);
// 使用设置的偏移量消费数据
rd_kafka_consume_start(topic, partition, offset);
```
在上面的代码中,我们首先创建了一个包含一个主题和一个分区的 rd_kafka_topic_partition_list_t 对象。然后,我们使用 rd_kafka_topic_partition_list_set_offset 函数将偏移量设置为100。最后,我们使用设置的偏移量消费数据。
希望这个回答能够帮助您解决问题!
相关推荐
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![rar](https://img-home.csdnimg.cn/images/20210720083606.png)
![pdf](https://img-home.csdnimg.cn/images/20210720083512.png)
![-](https://csdnimg.cn/download_wenku/file_type_column_c1.png)
![-](https://csdnimg.cn/download_wenku/file_type_column_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)