flink对文档一列统计前10top打包执行代码

时间: 2024-01-16 14:02:49 浏览: 26
以下是使用Flink对文档中的一列进行统计,并输出前10个最大值的打包执行代码: ```java import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import java.util.Collections; public class Top10Words { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.readTextFile("path/to/input/file"); DataSet<Tuple2<String, Integer>> counts = text.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> { for (String word : line.split(" ")) { if (word.length() > 0) { out.collect(new Tuple2<>(word, 1)); } } }) .groupBy(0) .reduce((ReduceFunction<Tuple2<String, Integer>>) (t1, t2) -> new Tuple2<>(t1.f0, t1.f1 + t2.f1)); DataSet<Tuple2<String, Integer>> topCounts = counts .map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception { return value; } }) .sortPartition(1, org.apache.flink.api.common.operators.Order.DESCENDING) .setParallelism(1) .first(10); topCounts.print(); } } ``` 在这个示例中,我们使用了Flink的DataSet API来读取文本文件,将每个单词映射为一个键值对(单词, 1),并通过reduce操作将每个单词的计数相加。然后我们将结果按照计数排序,并输出前10个最大值。在执行时,我们需要将代码打包成一个jar文件,并在集群上执行。

相关推荐

最新推荐

recommend-type

Flink实战:用户行为分析之热门商品TopN统计

按一小时的窗口大小,每 5 分钟统计一次,做滑动窗口聚合(Sliding Window) 按每个窗口聚合,输出每个窗口中点击量前 N 名的商品 实现 创建maven项目,命名UserBehaviorAnalysis,其pom内容如下: 4.0.0 ...
recommend-type

Flink一线公司经验实战

该资料收集了国内外一线公司使用flink的一些实战经验,包括了为什么使用flink,以及在使用flink后遇到的一些技术难点是怎么去解决的。具有非常高的参考价值。
recommend-type

Flink +hudi+presto 流程图.docx

Flink +hudi+presto 流程图.docx 自己实现后画的一个流程图,便于理解
recommend-type

Flink实用教程_预览版_v1.pdf

最新Flink教程,基于Flink 1.13.2。书中所有示例和案例代码均为双语。这是预览版。 目录 第1 章Flink 架构与集群安装..............................................................................................
recommend-type

《剑指大数据——Flink学习精要(Java版)》(最终修订版).pdf

《剑指大数据——Flink学习精要(Java版)》(最终修订版).pdf
recommend-type

STC89C51 简单时钟

STC89C51 简单时钟,叫你从基础开始学习单片机,
recommend-type

管理建模和仿真的文件

管理Boualem Benatallah引用此版本:布阿利姆·贝纳塔拉。管理建模和仿真。约瑟夫-傅立叶大学-格勒诺布尔第一大学,1996年。法语。NNT:电话:00345357HAL ID:电话:00345357https://theses.hal.science/tel-003453572008年12月9日提交HAL是一个多学科的开放存取档案馆,用于存放和传播科学研究论文,无论它们是否被公开。论文可以来自法国或国外的教学和研究机构,也可以来自公共或私人研究中心。L’archive ouverte pluridisciplinaire
recommend-type

MATLAB归一化方法大比拼:5种方法,哪种最适合你的数据?

![MATLAB归一化方法大比拼:5种方法,哪种最适合你的数据?](https://www.finebi.com/wp-content/uploads/2019/11/FineBI%E8%A1%8C%E4%B8%9A%E9%A9%BE%E9%A9%B6%E8%88%B1-1024x510.png) # 1. MATLAB归一化概述 归一化是一种数据预处理技术,用于将数据缩放到特定范围内,从而消除不同特征之间的尺度差异。在MATLAB中,有各种归一化方法可用于不同类型的数据和应用程序。 归一化的主要目的是: - 提高模型的训练效率和准确性,通过消除特征之间的尺度差异,使模型能够更有效地学习
recommend-type

failed to register layer: ApplyLayer exit status 1 stdout: stderr: archive/tar: invalid tar header

很抱歉听到这个问题。这个错误通常是由于Docker镜像损坏或Docker存储库中缺少所需的图层引起的。您可以尝试以下几种方法来解决这个问题: 1. 清理Docker镜像和容器:使用命令“docker system prune”清理不再使用的Docker对象。 2. 检查Docker存储库:确保您正在使用可靠的Docker存储库,并检查是否存在所有必需的图层。 3. 重新下载Docker镜像:如果您确定问题是由于损坏的Docker镜像引起的,则可以尝试重新下载Docker镜像。 4. 更新Docker版本:如果您使用的是旧版Docker,则可能会出现此问题。尝试更新到最新版本的Docke
recommend-type

Linux系统常用操作命令大全手册

附件是Linux系统常用操作命令大全手册,是 markdown格式,其中覆盖了Linux系统管理、文件操作、网络配置等多个方面,都是日常工作中非常常用的命令,欢迎大家下载学习使用!