请写出flink统计每个人的总成绩样例可打包执行代码
时间: 2024-03-16 07:47:20 浏览: 20
好的,以下是一个使用 Flink 统计每个人的总成绩的样例代码,可打包执行:
```java
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class ScoreSum {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 读取成绩数据
DataSet<String> scores = env.readTextFile("path/to/scores.txt");
// 将每一行数据转换为 Tuple2<姓名, 成绩>
DataSet<Tuple2<String, Integer>> scoresTuple = scores.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] fields = value.split(",");
String name = fields[0];
int score = Integer.parseInt(fields[1]);
out.collect(new Tuple2<>(name, score));
}
});
// 按姓名分组,计算每个人的总成绩
DataSet<Tuple2<String, Integer>> sumScores = scoresTuple.groupBy(0)
.aggregate(new AggregateFunction<Tuple2<String, Integer>, Integer, Tuple2<String, Integer>>() {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(Tuple2<String, Integer> value, Integer accumulator) {
return accumulator + value.f1;
}
@Override
public Tuple2<String, Integer> getResult(Integer accumulator) {
return new Tuple2<>("总成绩", accumulator);
}
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
});
// 打印结果
sumScores.print();
}
}
```
需要注意的是,这里假设成绩数据的格式为 `姓名,成绩`,并且每行数据以逗号分隔。在实际使用中,你需要根据自己的数据格式进行相应的修改。