编写flink的流处理程序,统计流中单词出现的次数
时间: 2023-11-28 19:08:46 浏览: 69
Python统计单词出现的次数
5星 · 资源好评率100%
可以使用Flink的DataStream API来实现流处理程序,以下是一个简单的代码示例:
```java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从socket读取数据流
DataStream<String> stream = env.socketTextStream("localhost", 9999);
// 将每行数据按空格拆分成单词
DataStream<String> words = stream.flatMap((String line, Collector<String> out) -> {
for (String word : line.split(" ")) {
out.collect(word);
}
});
// 对每个单词进行计数,生成(word, count)格式的二元组
DataStream<Tuple2<String, Integer>> counts = words.keyBy((KeySelector<String, String>) value -> value)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(new WindowFunction<String, Tuple2<String, Integer>, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<String> input, Collector<Tuple2<String, Integer>> out) {
int count = 0;
for (String word : input) {
count++;
}
out.collect(new Tuple2<>(key, count));
}
});
// 打印计数结果
counts.print();
env.execute("Word Count");
}
}
```
上述代码中,首先从socket读取数据流,然后将每行数据按空格拆分成单词。接着对每个单词进行计数,生成二元组。最后打印计数结果。这里使用了Flink的窗口操作,每隔5秒钟计算一次窗口内单词出现次数的总和。
阅读全文