kafka stream流式处理案例代码
时间: 2024-10-23 13:17:46 浏览: 25
Kafka Streams是一个用于处理Apache Kafka主题数据的实时应用程序框架,它允许你在Kafka之上构建复杂的流处理管道。以下是一个简单的Java代码示例,展示了如何使用Kafka Streams创建一个从输入主题到输出主题的文本计数器应用:
```java
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
public class SimpleCountingApp {
public static void main(String[] args) {
Properties props = new Properties();
// 设置属性,如bootstrap servers、application ID等
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
// 创建一个Stream实例
StreamsBuilder builder = new StreamsBuilder();
// 读取输入主题,这里假设输入主题是"input"
KStream<String, String> text = builder.stream("input", Consumed.with(Serdes.String(), Serdes.String()));
// 对每个单词进行计数
KGroupedStream<String, String> grouped = text.groupByKey();
KTable<String, Long> counts = grouped.count();
// 将结果写入输出主题,这里假设输出主题是"output"
counts.toStream().to("output", Produced.with(Serdes.String(), Serdes.Long()));
// 构建并启动Kafka Streams应用
KafkaStreams streams = new KafkaStreams(builder.build(), props);
try {
streams.start();
// 监控应用,直到关闭
System.out.println("Starting application...");
streams.cleanUp(); // 关闭时清理临时状态
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
阅读全文