使用kafka发送消息的C++代码
时间: 2024-06-10 17:04:40 浏览: 137
好的,关于使用kafka发送消息的C代码,你可以参考以下部分示例代码:
```c
#include <librdkafka/rdkafka.h>
int main(int argc, char **argv) {
rd_kafka_t *rk; // kafka实例
rd_kafka_topic_t *rkt; // kafka主题
char errstr[512]; // 错误信息
// kafka配置
rd_kafka_conf_t *conf = rd_kafka_conf_new();
// 设置bootstrap.servers为kafka地址,多个地址以逗号隔开
if (rd_kafka_conf_set(conf, "bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Failed to set config: %s\n", errstr);
return 1;
}
// 创建kafka实例
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "Failed to create producer: %s\n", errstr);
return 1;
}
// 创建kafka主题
rkt = rd_kafka_topic_new(rk, "test-topic", NULL);
if (!rkt) {
fprintf(stderr, "Failed to create topic: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(rk);
return 1;
}
// 发送消息
const char *msg = "hello, kafka";
if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *) msg, strlen(msg), NULL, 0, NULL) != 0) {
fprintf(stderr, "Failed to send message: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
} else {
printf("Message sent: %s\n", msg);
}
// 销毁kafka实例和主题
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
return 0;
}
```
以上代码使用了librdkafka开源库,具体使用可参考官方文档,链接为:https://docs.confluent.io/platform/current/clients/librdkafka/rdkafka_8h.html
阅读全文