kafka实时指定消费1分钟前的数据实现延迟消费 c++ 版本 完整程序
时间: 2024-05-16 07:13:04 浏览: 157
springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区
以下是一个简单的C++版本的Kafka消费者示例代码,可以实现实时消费1分钟前的数据:
```c++
#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <csignal>
#include <unistd.h>
#include <librdkafka/rdkafkacpp.h>
using namespace std;
static bool run = true;
static void sigterm (int sig) {
run = false;
}
int main(int argc, char **argv) {
string brokers = "localhost:9092";
string errstr;
string topic_str = "test_topic";
int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_END;
// 创建 Kafka 配置
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// 设置 Kafka 配置
if (conf->set("bootstrap.servers", brokers, errstr) != RdKafka::Conf::CONF_OK) {
cerr << "Failed to set broker list: " << errstr << endl;
exit(1);
}
// 创建 Kafka 消费者
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
// 订阅 Kafka 主题
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
if (resp != RdKafka::ERR_NO_ERROR) {
cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << endl;
exit(1);
}
signal(SIGINT, sigterm);
signal(SIGTERM, sigterm);
while (run) {
// 消费 Kafka 消息
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
int64_t timestamp = msg->timestamp().timestamp;
int64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
// 判断时间戳是否在1分钟之前
if (now - timestamp > 60000) {
const char *payload = static_cast<const char *>(msg->payload());
cout << "Received message: " << payload << endl;
}
} else if (msg->err() == RdKafka::ERR__TIMED_OUT) {
// 消息超时,重试
continue;
} else {
cerr << "Failed to consume message: " << RdKafka::err2str(msg->err()) << endl;
break;
}
// 释放消息资源
delete msg;
}
// 删除资源
delete topic;
delete consumer;
delete tconf;
delete conf;
return 0;
}
```
在上面的示例代码中,我们使用librdkafka库来连接Kafka,首先创建Kafka的配置对象和消费者对象,然后订阅指定的主题。在消费消息时,我们通过调用`msg->timestamp().timestamp`获取消息的时间戳,然后计算与当前时间的时间差,如果大于1分钟则进行消费。如果消息超时,则重试。最后释放消息资源并删除所有对象。
当然,这只是一个简单的示例代码,实际应用中可能需要考虑更多的情况,比如Kafka集群故障、消息重复等问题。
阅读全文