flink 流处理 词频统计
时间: 2023-06-14 19:06:21 浏览: 197
flink介绍PPT
在 Flink 中进行词频统计,可以通过以下步骤来实现:
1. 读取数据:从 Kafka、文件或其他数据源中读取数据流。
2. 数据转换:将数据流中的数据进行转换,将每个单词拆分出来并转换成小写。
3. 分组聚合:将转换后的数据流按单词进行分组,然后对每个单词进行计数操作。
4. 结果输出:将计数结果输出到 Kafka、文件或其他数据源中。
以下是一个简单的 Flink 词频统计示例代码:
```java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// 获取执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从命令行参数中获取配置项
final ParameterTool params = ParameterTool.fromArgs(args);
// 从指定数据源读取数据
DataStream<String> text;
if (params.has("input")) {
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = env.fromElements("hello world", "hello flink", "hello world");
}
// 对数据流进行转换,将每个单词拆分出来并转换成小写
DataStream<WordWithCount> wordCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word.toLowerCase(), 1L));
}
}
})
// 对转换后的数据流按单词进行分组,然后对每个单词进行计数操作
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count");
// 输出计数结果
if (params.has("output")) {
wordCounts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
wordCounts.print();
}
// 执行程序
env.execute("Streaming WordCount");
}
// 存储单词和计数的 POJO 类
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
```
在以上代码中,我们首先获取执行环境,然后从指定数据源中读取数据流。接着,我们对数据流进行转换,将每个单词拆分出来并转换成小写。然后,我们对转换后的数据流按单词进行分组,然后对每个单词进行计数操作。最后,我们将计数结果输出到指定数据源中。
该示例代码中使用了 Flink 的时间窗口功能,将计数操作限制在 5 秒的时间窗口内。这样可以保证每个时间窗口内的计数结果都是独立的,并且不会受到前后时间窗口内的数据影响。
需要注意的是,该示例代码中的计数操作是在内存中进行的。如果要对大规模数据进行计数操作,可以考虑使用 Flink 的状态管理功能,将计数结果存储在状态中,然后通过定时器或其他方式对状态进行定期清理。
阅读全文