kafka+c++实现
时间: 2023-12-12 11:36:07 浏览: 36
Kafka是一个基于发布/订阅模式的消息队列,它使用Zookeeper来协调多个Broker之间的通信。C++客户端可以使用librdkafka库来与Kafka进行交互。下面是一个简单的示例代码,演示如何使用librdkafka库在C++中生产和消费消息:
1. 生产消息
```c++
#include <iostream>
#include <string>
#include <librdkafka/rdkafkacpp.h>
int main() {
std::string brokers = "localhost:9092";
std::string topic_str = "test_topic";
std::string errstr;
// 创建配置对象
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
// 创建生产者对象
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
delete conf;
return 1;
}
// 创建消息对象
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, NULL, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
delete producer;
delete conf;
return 1;
}
// 生产消息
std::string message_str = "Hello, Kafka!";
RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(message_str.c_str()), message_str.size(), NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to produce message: " << RdKafka::err2str(resp) << std::endl;
} else {
std::cout << "Produced message: " << message_str << std::endl;
}
// 清理资源
producer->flush(1000);
delete topic;
delete producer;
delete conf;
return 0;
}
```
2. 消费消息
```c++
#include <iostream>
#include <string>
#include <librdkafka/rdkafkacpp.h>
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
if (message.err()) {
std::cerr << "Failed to deliver message: " << message.errstr() << std::endl;
} else {
std::cout << "Delivered message: " << std::string(static_cast<char *>(message.payload()), message.len()) << std::endl;
}
}
};
class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb(RdKafka::Message &message, void *opaque) {
if (message.err()) {
std::cerr << "Failed to consume message: " << message.errstr() << std::endl;
} else {
std::cout << "Consumed message: " << std::string(static_cast<char *>(message.payload()), message.len()) << std::endl;
}
}
};
int main() {
std::string brokers = "localhost:9092";
std::string topic_str = "test_topic";
std::string errstr;
// 创建配置对象
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
// 创建消费者对象
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
delete conf;
return 1;
}
// 订阅主题
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, NULL, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
delete consumer;
delete conf;
return 1;
}
RdKafka::ErrorCode resp = consumer->start(topic, 0, RdKafka::Topic::OFFSET_BEGINNING);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;
delete topic;
delete consumer;
delete conf;
return 1;
}
// 设置回调函数
ExampleConsumeCb consume_cb;
consumer->set_consume_callback(&consume_cb);
// 消费消息
while (true) {
RdKafka::Message *message = consumer->consume(topic, 0, 1000);
if (message->err() == RdKafka::ERR_NO_ERROR) {
std::cout << "Consuming message..." << std::endl;
} else if (message->err() == RdKafka::ERR__TIMED_OUT) {
// 超时
} else {
std::cerr << "Failed to consume message: " << message->errstr() << std::endl;
break;
}
delete message;
}
// 清理资源
consumer->stop(topic, 0);
consumer->poll(1000);
delete topic;
delete consumer;
delete conf;
return 0;
}
```