c++ kafka消费消息

时间: 2023-08-05 20:10:15 浏览: 339
你可以使用librdkafka库来在C++中消费Kafka消息。以下是一个简单的示例代码: ```cpp #include <iostream> #include <cstdlib> #include <csignal> #include <librdkafka/rdkafkacpp.h> static bool running = true; // Kafka消息消费回调函数 class ExampleConsumeCb : public RdKafka::ConsumeCb { public: void consume_cb(RdKafka::Message &msg, void *opaque) { switch (msg.err()) { case RdKafka::ERR_NO_ERROR: // 处理接收到的消息 std::cout << "Received message: " << msg.payload() << std::endl; break; case RdKafka::ERR__TIMED_OUT: // 超时错误 break; default: // 其他错误 std::cerr << "Error occurred: " << msg.errstr() << std::endl; break; } } }; // Ctrl+C信号处理函数 static void sigterm(int sig) { running = false; } int main() { std::string brokers = "localhost:9092"; // Kafka broker地址 std::string topic = "test_topic"; // 要消费的Kafka主题 std::string errstr; RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); conf->set("bootstrap.servers", brokers, errstr); RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr); if (!consumer) { std::cerr << "Failed to create consumer: " << errstr << std::endl; exit(1); } // 订阅主题 RdKafka::ErrorCode err = consumer->subscribe({topic}); if (err) { std::cerr << "Failed to subscribe to topic: " << RdKafka::err2str(err) << std::endl; exit(1); } ExampleConsumeCb consume_cb; consumer->poll(0); // 初始化消费者 signal(SIGINT, sigterm); // 注册Ctrl+C信号处理函数 while (running) { // 消费消息 RdKafka::Message *msg = consumer->consume(1000); consume_cb.consume_cb(*msg, nullptr); delete msg; } // 关闭消费者 consumer->close(); delete consumer; // 销毁配置对象 delete conf; return 0; } ``` 上述代码通过librdkafka库创建一个Kafka消费者,并订阅指定的主题。在循环中,它会不断地消费消息,并使用`ExampleConsumeCb`类中定义的回调函数处理接收到的消息。通过注册Ctrl+C信号处理函数,你可以使用Ctrl+C来停止消费消息。 请注意,在使用此代码之前,你需要先安装librdkafka库,并将其链接到你的C++项目中。 希望对你有所帮助!如果有任何问题,请随时提问。
阅读全文

相关推荐