c++ kafka消费消息
时间: 2023-08-05 20:10:15 浏览: 339
你可以使用librdkafka库来在C++中消费Kafka消息。以下是一个简单的示例代码:
```cpp
#include <iostream>
#include <cstdlib>
#include <csignal>
#include <librdkafka/rdkafkacpp.h>
static bool running = true;
// Kafka消息消费回调函数
class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb(RdKafka::Message &msg, void *opaque) {
switch (msg.err()) {
case RdKafka::ERR_NO_ERROR:
// 处理接收到的消息
std::cout << "Received message: " << msg.payload() << std::endl;
break;
case RdKafka::ERR__TIMED_OUT:
// 超时错误
break;
default:
// 其他错误
std::cerr << "Error occurred: " << msg.errstr() << std::endl;
break;
}
}
};
// Ctrl+C信号处理函数
static void sigterm(int sig) {
running = false;
}
int main() {
std::string brokers = "localhost:9092"; // Kafka broker地址
std::string topic = "test_topic"; // 要消费的Kafka主题
std::string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
exit(1);
}
// 订阅主题
RdKafka::ErrorCode err = consumer->subscribe({topic});
if (err) {
std::cerr << "Failed to subscribe to topic: " << RdKafka::err2str(err) << std::endl;
exit(1);
}
ExampleConsumeCb consume_cb;
consumer->poll(0); // 初始化消费者
signal(SIGINT, sigterm); // 注册Ctrl+C信号处理函数
while (running) {
// 消费消息
RdKafka::Message *msg = consumer->consume(1000);
consume_cb.consume_cb(*msg, nullptr);
delete msg;
}
// 关闭消费者
consumer->close();
delete consumer;
// 销毁配置对象
delete conf;
return 0;
}
```
上述代码通过librdkafka库创建一个Kafka消费者,并订阅指定的主题。在循环中,它会不断地消费消息,并使用`ExampleConsumeCb`类中定义的回调函数处理接收到的消息。通过注册Ctrl+C信号处理函数,你可以使用Ctrl+C来停止消费消息。
请注意,在使用此代码之前,你需要先安装librdkafka库,并将其链接到你的C++项目中。
希望对你有所帮助!如果有任何问题,请随时提问。
阅读全文