flink wordcount
时间: 2023-06-14 15:06:26 浏览: 204
Flink WordCount 是 Flink 的一个经典示例,用于演示 Flink 流处理的基本功能。它是一个简单的单词计数程序,可以从输入文本中读取单词,并计算每个单词在文本中出现的次数。
以下是一个 Flink WordCount 的示例代码:
```java
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
public class WordCount {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置数据源
DataStream<String> text;
if (params.has("input")) {
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = env.fromElements(WordCountData.WORDS);
}
// 转换数据流
DataStream<WordWithCount> counts =
text.flatMap(new Tokenizer())
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count");
// 输出结果
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
// 执行任务
env.execute("Streaming WordCount");
}
// 单词拆分函数
public static final class Tokenizer implements FlatMapFunction<String, WordWithCount> {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new WordWithCount(token, 1L));
}
}
}
}
// 单词计数类
public static final class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
```
该程序使用 Flink 流处理 API 来读取输入文本、拆分单词、计数单词并输出结果。程序的具体执行流程如下:
1. 读取命令行参数或默认数据源。
2. 创建 Flink 执行环境。
3. 读取数据源。
4. 转换数据流,拆分单词并计数。
5. 输出结果到文件或标准输出。
6. 执行任务。
如果你想要运行 Flink WordCount 示例程序,可以按照以下步骤进行:
1. 下载 Flink 并解压。
2. 打开终端并进入 Flink 的安装目录。
3. 运行 `./bin/start-cluster.sh` 启动 Flink 集群。
4. 运行 `./bin/flink run examples/streaming/WordCount.jar --input /path/to/input/file --output /path/to/output/file`。
5. 等待程序执行完成,查看输出结果。
阅读全文