kafka消费者消费多个topic消息代码实现
时间: 2023-03-29 15:03:18 浏览: 285
可以使用 Kafka Consumer API 来消费多个 topic 的消息。具体实现代码如下:
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
int main(int argc, char **argv) {
rd_kafka_t *rk; /* Kafka producer instance handle */
rd_kafka_conf_t *conf; /* Temporary configuration object */
char errstr[512]; /* librdkafka API error reporting buffer */
char *brokers; /* Kafka broker(s) */
char *topics; /* Topic list to consume from */
rd_kafka_topic_partition_list_t *topic_list; /* List of topics to subscribe to */
rd_kafka_resp_err_t err; /* librdkafka API error code */
/* Check arguments */
if (argc != 3) {
fprintf(stderr, "Usage: %s <broker> <topic1,topic2,...>\n", argv[]);
exit(1);
}
brokers = argv[1];
topics = argv[2];
/* Create Kafka client configuration place-holder */
conf = rd_kafka_conf_new();
/* Set bootstrap broker(s) as a comma-separated list of
* host or host:port (default port is 9092).
* librdkafka will use the bootstrap brokers to acquire the full
* set of brokers from the cluster. */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
exit(1);
}
/* Create Kafka producer instance */
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "Failed to create Kafka producer: %s\n", errstr);
exit(1);
}
/* Create topic list */
topic_list = rd_kafka_topic_partition_list_new(1);
if (!topic_list) {
fprintf(stderr, "Failed to create topic list\n");
exit(1);
}
/* Parse topic list */
if (rd_kafka_topic_partition_list_add(topic_list, topics, RD_KAFKA_PARTITION_UA) != topic_list->cnt) {
fprintf(stderr, "Failed to parse topic list: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
exit(1);
}
/* Subscribe to topic list */
err = rd_kafka_subscribe(rk, topic_list);
if (err) {
fprintf(stderr, "Failed to subscribe to topic list: %s\n", rd_kafka_err2str(err));
exit(1);
}
/* Consume messages */
while (1) {
rd_kafka_message_t *msg;
/* Poll for new messages */
msg = rd_kafka_consumer_poll(rk, 100);
if (!msg) {
continue;
}
/* Print message */
printf("Received message on topic %s (partition %d) at offset %ld:\n",
rd_kafka_topic_name(msg->rkt), msg->partition, msg->offset);
printf("%.*s\n", (int)msg->len, (char *)msg->payload);
/* Free message */
rd_kafka_message_destroy(msg);
}
/* Destroy topic list */
rd_kafka_topic_partition_list_destroy(topic_list);
/* Destroy Kafka producer instance */
rd_kafka_destroy(rk);
return ;
}
```
以上代码实现了消费多个 topic 的消息,具体实现过程如下:
1. 创建 Kafka client configuration place-holder。
2. 设置 bootstrap broker(s)。
3. 创建 Kafka producer instance。
4. 创建 topic list。
5. 解析 topic list。
6. 订阅 topic list。
7. 消费消息。
8. 销毁 topic list 和 Kafka producer instance。
注意:以上代码仅供参考,实际使用时需要根据具体情况进行修改。
阅读全文