flink对文件统计某列词频打包执行代码
时间: 2024-03-16 22:45:21 浏览: 66
以下是基于Flink实现对文件统计某列词频的打包执行代码:
1. 首先,在pom.xml文件中添加Flink依赖:
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.2</version>
</dependency>
```
2. 然后,编写Java程序:
```java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从参数中获取输入文件路径和需要统计的列
final ParameterTool params = ParameterTool.fromArgs(args);
String input = params.get("input");
int column = params.getInt("column");
DataStream<String> text = env.readTextFile(input);
DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) {
String[] tokens = value.toLowerCase().split("\\W+");
if (tokens.length > column) {
out.collect(tokens[column]);
}
}
});
DataStream<String> counts = words.keyBy(value -> value).mapWithState(new WordCountFunction());
counts.print();
env.execute("Flink WordCount");
}
}
class WordCountFunction extends MapWithStatefulFunction<String, Long, Long, Tuple2<String, Long>> {
@Override
public Tuple2<String, Long> map(String value, Long state) {
if (state == null) {
state = 0L;
}
state++;
return Tuple2.of(value, state);
}
}
```
在上述代码中,我们首先读取输入文件,然后将文件中每一行数据按照指定的分隔符切分成单词,接着选择需要统计的列,最后按照单词进行分组,并使用mapWithState函数对单词进行计数。最后,我们将结果打印出来。
3. 将程序打包成jar包并提交到Flink集群上执行:
```bash
$ mvn clean package
$ flink run -c WordCount /path/to/wordcount.jar --input /path/to/input/file --column 2
```
在上述命令中,我们使用`flink run`命令将jar包提交到Flink集群上运行。其中,`-c`参数用于指定程序入口类,`--input`参数用于指定输入文件路径,`--column`参数用于指定需要统计的列。
阅读全文