什么版本Kafka C++ 库版本有subscribe
时间: 2023-09-27 10:11:16 浏览: 198
Kafka C++ 库版本 0.11.0.0 及以上都支持 subscribe() 方法。在该方法中,您可以指定一个或多个主题以及回调函数,以便在消费者接收到新消息时进行处理。例如,以下是使用 Kafka C++ 库进行订阅的示例代码:
```cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <csignal>
static bool run = true;
void sigterm(int sig) {
run = false;
}
class ExampleEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
run = false;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
std::cerr << "LOG-" << event.severity() << "-" << event.fac() << ": " << event.str() << std::endl;
break;
default:
std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl;
break;
}
}
};
class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb(RdKafka::Message &msg, void *opaque) {
switch (msg.err())
{
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
std::cout << "Read msg at offset " << msg.offset() << std::endl;
if (msg.key()) {
std::cout << "Key: " << *msg.key() << std::endl;
}
std::cout << "Payload: " << static_cast<const char *>(msg.payload()) << std::endl;
break;
case RdKafka::ERR__PARTITION_EOF:
std::cerr << "Reached end of partition " << RdKafka::TopicPartition::format(msg.topic(), msg.partition(), msg.offset()) << std::endl;
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << msg.errstr() << std::endl;
run = false;
break;
default:
std::cerr << "Consume failed: " << msg.errstr() << std::endl;
run = false;
}
}
};
int main(int argc, char **argv) {
std::string brokers = "localhost:9092";
std::string errstr;
std::string topic_str = "my_topic";
std::vector<std::string> topics = {topic_str};
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", brokers, errstr);
ExampleEventCb ex_event_cb;
conf->set("event_cb", &ex_event_cb, errstr);
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
exit(1);
}
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, nullptr, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
ExampleConsumeCb ex_consume_cb;
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset, &ex_consume_cb);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;
exit(1);
}
std::cout << "Started consumer for topic " << topic_str << std::endl;
signal(SIGINT, sigterm);
signal(SIGTERM, sigterm);
while (run) {
consumer->poll(1000);
}
consumer->stop(topic, partition);
consumer->close();
delete topic;
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
```
在上面的示例代码中,我们使用了 `consumer->start()` 方法来开始从指定主题中消费消息。在回调函数 `ExampleConsumeCb::consume_cb()` 中,我们对接收到的消息进行处理。
阅读全文