kafka指定消费1分钟前的数据实现延迟消费 c++ 版本 完整程序
时间: 2024-05-06 09:14:54 浏览: 74
以下是一个使用 C++ 实现 Kafka 延迟消费的示例程序:
```cpp
#include <iostream>
#include <librdkafka/rdkafkacpp.h>
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
if (message.err()) {
std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
} else {
std::cout << "Message delivered to topic " << message.topic_name() << " [" << message.partition() << "] at offset " << message.offset() << std::endl;
}
}
};
int main() {
std::string brokers = "localhost:9092";
std::string topic = "test_topic";
std::string errstr;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
tconf->set("auto.offset.reset", "earliest", errstr);
ExampleDeliveryReportCb ex_dr_cb;
conf->set("dr_cb", &ex_dr_cb, errstr);
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
RdKafka::Topic *rd_topic = RdKafka::Topic::create(consumer, topic, tconf, errstr);
int64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count() - 60000;
RdKafka::ErrorCode resp = consumer->start(rd_topic, 0, timestamp);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;
return 1;
}
while (true) {
RdKafka::Message *msg = consumer->consume(rd_topic, 0, 1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::cout << "Received message: " << std::string((char *) msg->payload()) << std::endl;
} else if (msg->err() == RdKafka::ERR__PARTITION_EOF) {
std::cerr << "Reached end of partition" << std::endl;
} else {
std::cerr << "Failed to consume message: " << msg->errstr() << std::endl;
}
delete msg;
}
return 0;
}
```
该程序使用 librdkafka 库连接到 Kafka,并创建一个消费者实例。然后,它使用 `consumer->start()` 函数指定从 1 分钟前开始消费数据。最后,它使用 `consumer->consume()` 函数接收消息并进行处理。
请注意,需要在编译时链接 librdkafka 库。
阅读全文