给我写一个kafka生产者代码,读取txt文件,以json格式发送到kafka集群
时间: 2023-03-29 20:04:42 浏览: 155
kafka生产者源码
好的,以下是一个简单的Kafka生产者代码,用于读取txt文件并将其以JSON格式发送到Kafka集群:
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
#define MAX_LINE_LEN 1024
int main(int argc, char **argv) {
if (argc != 4) {
fprintf(stderr, "Usage: %s <broker> <topic> <filename>\n", argv[]);
return 1;
}
char *broker = argv[1];
char *topic = argv[2];
char *filename = argv[3];
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
char errstr[512];
conf = rd_kafka_conf_new();
if (rd_kafka_conf_set(conf, "bootstrap.servers", broker, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Failed to set broker: %s\n", errstr);
return 1;
}
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "Failed to create producer: %s\n", errstr);
return 1;
}
FILE *fp = fopen(filename, "r");
if (!fp) {
fprintf(stderr, "Failed to open file: %s\n", filename);
return 1;
}
char line[MAX_LINE_LEN];
while (fgets(line, MAX_LINE_LEN, fp)) {
char *json = "{\"message\": \"%s\"}";
char *msg = malloc(strlen(json) + strlen(line) + 1);
sprintf(msg, json, line);
rd_kafka_produce(
rk,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
msg,
strlen(msg),
NULL,
,
NULL
);
rd_kafka_poll(rk, );
free(msg);
}
fclose(fp);
rd_kafka_flush(rk, 10000);
rd_kafka_destroy(rk);
return ;
}
```
请注意,此代码仅用于演示目的,实际使用时需要进行适当的错误处理和参数验证。
阅读全文