flink wordcount 代码
时间: 2023-10-10 07:11:55 浏览: 100
好的,我可以为您提供 Flink WordCount 代码,以下是一个简单的示例:
```java
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class WordCount {
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
DataStream<String> text = env.readTextFile("input.txt");
// transform the data
DataStream<WordCount> counts =
text.flatMap(new Tokenizer())
.keyBy("word")
.sum("count");
// output the results
counts.print();
// execute program
env.execute("Streaming WordCount");
}
// user-defined class that extracts words from a line of text
public static final class Tokenizer implements FlatMapFunction<String, WordCount> {
@Override
public void flatMap(String value, Collector<WordCount> out) {
// normalize and split the line of text
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new WordCount(token, 1));
}
}
}
}
// user-defined class to represent a word and its count
public static final class WordCount {
public String word;
public int count;
public WordCount() {}
public WordCount(String word, int count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
```
这个程序会读取一个名为 "input.txt" 的文本文件,然后进行单词计数,并将结果输出到标准输出。请注意,您需要替换 input.txt 为您自己的文件路径。
阅读全文