rdkafka 库的使用
时间: 2023-07-11 20:37:18 浏览: 150
rdkafka是一个高性能的、纯C++实现的Kafka客户端库。下面是rdkafka库的使用方法:
1. 安装rdkafka
可以通过源码安装或通过包管理器来安装rdkafka。如果是在Linux系统中,可以使用以下命令来安装:
```
sudo apt-get install librdkafka-dev
```
2. 创建Kafka生产者
```c++
#include <iostream>
#include <string>
#include <cstdio>
#include <csignal>
#include <cstdlib>
#include <librdkafka/rdkafkacpp.h>
using namespace std;
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
if (message.err()) {
cerr << "Message delivery failed: " << message.errstr() << endl;
} else {
cout << "Message delivered to topic " << message.topic_name() << " [" << message.partition() << "] at offset " << message.offset() << endl;
}
}
};
int main() {
string brokers = "localhost:9092";
string topic_str = "test";
string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", brokers, errstr);
ExampleDeliveryReportCb ex_dr_cb;
conf->set("dr_cb", &ex_dr_cb, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
cerr << "Failed to create producer: " << errstr << endl;
exit(1);
}
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, NULL, errstr);
if (!topic) {
cerr << "Failed to create topic: " << errstr << endl;
exit(1);
}
string line;
while (getline(cin, line)) {
RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY /* Copy payload */, const_cast<char *>(line.c_str()), line.size(), NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR) {
cerr << "Failed to produce message: " << RdKafka::err2str(resp) << endl;
} else {
cout << "Produced message (" << line.size() << " bytes)" << endl;
}
producer->poll(0);
}
delete topic;
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
```
3. 创建Kafka消费者
```c++
#include <iostream>
#include <string>
#include <cstdio>
#include <csignal>
#include <cstdlib>
#include <librdkafka/rdkafkacpp.h>
using namespace std;
static bool run = true;
static void sigterm(int sig) {
run = false;
}
class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb(RdKafka::Message &message, void *opaque) {
switch (message.err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
cout << "Message consumed: " << string((char *)message.payload()) << endl;
break;
default:
cerr << "Error while consuming message: " << message.errstr() << endl;
break;
}
}
};
int main() {
string brokers = "localhost:9092";
string group_id = "test-group";
string topic_str = "test";
string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", brokers, errstr);
conf->set("group.id", group_id, errstr);
ExampleConsumeCb ex_consume_cb;
conf->set("consume_cb", &ex_consume_cb, errstr);
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
if (!consumer) {
cerr << "Failed to create consumer: " << errstr << endl;
exit(1);
}
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, NULL, errstr);
if (!topic) {
cerr << "Failed to create topic: " << errstr << endl;
exit(1);
}
RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_BEGINNING);
if (resp != RdKafka::ERR_NO_ERROR) {
cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << endl;
exit(1);
}
signal(SIGINT, sigterm);
while (run) {
RdKafka::Message *message = consumer->consume(1000);
ex_consume_cb.consume_cb(*message, NULL);
delete message;
}
consumer->stop(topic, 0);
consumer->wait_destroyed(5000);
delete topic;
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
```
以上是创建Kafka生产者和消费者的代码示例,需要在程序中修改Kafka Broker的地址和端口,以及topic的名称和其他配置。
阅读全文