librdkafka 消费者类
时间: 2023-08-01 15:07:28 浏览: 119
librdkafka 是一个 C/C++ 语言编写的 Apache Kafka 客户端库,它提供了高级别和低级别的 API,使得 Kafka 的生产者和消费者的开发变得更加简单和灵活。下面介绍 librdkafka 中的消费者类:
librdkafka 中的消费者类是 `RdKafka::Consumer`,它封装了消费者的相关操作。使用该类需要先创建一个 `RdKafka::Conf` 对象来设置消费者的配置,然后创建一个 `RdKafka::KafkaConsumer` 对象来实例化一个消费者。
具体的代码如下:
```c++
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "localhost:9092"); // 设置 Kafka broker 地址
conf->set("group.id", "test-group"); // 设置消费者组 ID
std::string errstr;
RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
if (consumer == nullptr) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
exit(1);
}
// 订阅主题
std::vector<std::string> topics = {"test-topic"};
RdKafka::ErrorCode err = consumer->subscribe(topics);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to subscribe topic: " << RdKafka::err2str(err) << std::endl;
exit(1);
}
// 开始消费消息
while (true) {
RdKafka::Message *msg = consumer->consume(1000); // 每隔 1 秒轮询一次
if (msg == nullptr) {
continue;
}
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::cout << "Received message: " << std::string(static_cast<char *>(msg->payload()), msg->len()) << std::endl;
} else {
std::cerr << "Failed to consume message: " << RdKafka::err2str(msg->err()) << std::endl;
}
delete msg;
}
// 清理资源
consumer->close();
delete consumer;
delete conf;
```
上述代码中,首先创建了一个 `RdKafka::Conf` 对象,并设置了 `bootstrap.servers` 和 `group.id` 两个配置项。然后使用该对象创建一个 `RdKafka::KafkaConsumer` 对象,并订阅了一个名为 `test-topic` 的主题。最后进入消息消费循环,每隔 1 秒钟轮询一次消息,如果有消息到来则打印消息内容。
需要注意的是,消费者在使用完毕后需要调用 `close` 方法来清理资源。
阅读全文