分享一下kafka streams的学习资料
时间: 2023-03-31 08:00:30 浏览: 96
可以参考以下资料:
1. Kafka Streams Documentation: https://kafka.apache.org/documentation/streams/
2. Kafka Streams in Action book: https://www.manning.com/books/kafka-streams-in-action
3. Kafka Streams Tutorial: https://www.baeldung.com/kafka-streams
4. Kafka Streams Course on Udemy: https://www.udemy.com/course/kafka-streams/
希望对你有所帮助。
相关问题
生成一个kafka streams 调用kafka java代码
以下是一个简单的Kafka Streams应用程序,它从一个名为“input_topic”的Kafka主题中读取字符串消息,并将它们转换为大写字符串,然后将它们写入另一个名为“output_topic”的Kafka主题中。
```java
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
public class MyKafkaStreamsApp {
public static void main(String[] args) {
// Define Kafka Streams properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("application.id", "my-kafka-streams-app");
// Create a StreamsBuilder object
StreamsBuilder builder = new StreamsBuilder();
// Create a KStream object by reading from the input topic
KStream<String, String> inputStream = builder.stream(
"input_topic",
Consumed.with(Serdes.String(), Serdes.String())
);
// Transform each message to uppercase
KStream<String, String> outputStream = inputStream.mapValues(
value -> value.toUpperCase()
);
// Write the transformed messages to the output topic
outputStream.to(
"output_topic",
Produced.with(Serdes.String(), Serdes.String())
);
// Create a KafkaStreams object and start the application
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
```
在这个例子中,我们首先定义了一些Kafka Streams属性,然后创建了一个StreamsBuilder对象。接着,我们使用builder对象从“input_topic”读取消息,并使用mapValues()方法将消息转换为大写字符串。最后,我们使用to()方法将转换后的消息写入“output_topic”。
最后,我们创建了一个KafkaStreams对象,并启动了应用程序。当启动应用程序时,Kafka Streams将自动创建并管理所需的Kafka主题分区。
c++ Kafka Streams处理实时行情示例
以下是一个使用 Kafka Streams 处理实时行情的示例代码,其中我们使用了 Kafka 的 C++ 客户端库 RdKafka:
```c++
#include <iostream>
#include <string>
#include <vector>
#include <librdkafka/rdkafkacpp.h>
using std::string;
using std::vector;
using std::cout;
using std::endl;
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message& message) {
if (message.err()) {
std::cerr << "Failed to deliver message: " << message.errstr() << std::endl;
} else {
std::cout << "Message delivered to " << message.topic_name() << " [" << message.partition() << "]" << std::endl;
}
}
};
class ExampleEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) {
switch (event.type()) {
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) {
exit(1);
}
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
std::cerr << "LOG-" << event.severity() << "-" << event.fac().c_str() << ": " << event.str().c_str() << std::endl;
break;
default:
std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl;
break;
}
}
};
int main(int argc, char* argv[]) {
if (argc != 4) {
std::cerr << "Usage: " << argv[0] << " <broker> <input-topic> <output-topic>" << std::endl;
return 1;
}
string brokers = argv[1];
string input_topic = argv[2];
string output_topic = argv[3];
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", brokers, errstr);
conf->set("event_cb", &example_event_cb, errstr);
ExampleDeliveryReportCb ex_dr_cb;
conf->set("dr_cb", &ex_dr_cb, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return 1;
}
RdKafka::Conf *conf_consumer = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf_consumer->set("metadata.broker.list", brokers, errstr);
conf_consumer->set("event_cb", &example_event_cb, errstr);
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf_consumer, errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
return 1;
}
vector<string> topics;
topics.push_back(input_topic);
RdKafka::ErrorCode err = consumer->subscribe(topics);
if (err) {
std::cerr << "Failed to subscribe to " << input_topic << ": " << RdKafka::err2str(err) << std::endl;
return 1;
}
while (true) {
RdKafka::Message *msg = consumer->consume(1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
string data((const char*)msg->payload(), msg->len());
// TODO: process data
RdKafka::ErrorCode resp = producer->produce(output_topic, -1, RdKafka::Producer::RK_MSG_COPY, const_cast<char *>(data.c_str()), data.size(), NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to produce message: " << RdKafka::err2str(resp) << std::endl;
}
producer->poll(0);
} else if (msg->err() != RdKafka::ERR__TIMED_OUT) {
std::cerr << "Failed to consume message: " << RdKafka::err2str(msg->err()) << std::endl;
}
delete msg;
}
delete consumer;
delete producer;
return 0;
}
```
在上面的示例代码中,我们创建了一个生产者和一个消费者,分别用于从输入主题中消费数据并将处理后的数据写入输出主题中。你可以根据实际需求修改上面的示例代码,并添加各种流式处理操作符来实现实时行情处理。