rdkafka_example
时间: 2023-07-21 09:34:01 浏览: 154
kafka示例
rdkafka 是一款 C/C++ 实现的高性能消息队列系统,支持多种协议和编程语言。以下是一个使用 rdkafka 的简单示例:
```c++
#include <iostream>
#include <string>
#include <csignal>
#include <librdkafka/rdkafkacpp.h>
static bool run = true;
void sigterm_handler(int sig) {
run = false;
}
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
if (message.err()) {
std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
} else {
std::cout << "Message delivered to " << message.topic_name() << " [" << message.partition() << "] at offset " << message.offset() << std::endl;
}
}
};
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test";
std::string errstr;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
tconf->set("auto.offset.reset", "earliest", errstr);
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
RdKafka::Topic *top = RdKafka::Topic::create(consumer, topic, tconf, errstr);
ExampleDeliveryReportCb ex_dr_cb;
conf->set("dr_cb", &ex_dr_cb, errstr);
consumer->start(top, partition, start_offset);
signal(SIGINT, sigterm_handler);
signal(SIGTERM, sigterm_handler);
while (run) {
RdKafka::Message *msg = consumer->consume(top, partition, 1000);
if (msg) {
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::cout << "Received message: " << std::string(static_cast<char*>(msg->payload()), msg->len()) << std::endl;
consumer->commitAsync(msg);
} else {
std::cerr << "Error while consuming message: " << msg->errstr() << std::endl;
}
delete msg;
}
}
consumer->stop(top, partition);
consumer->close();
delete top;
delete consumer;
delete tconf;
delete conf;
return 0;
}
```
这个示例是一个 Kafka 消费者,它从指定的 Kafka 集群中的指定主题消费消息,并将消费到的消息输出到控制台。在程序运行期间,如果接收到 SIGINT 或 SIGTERM 信号,程序会停止消费并退出。
阅读全文