package anaylse import org.apache.spark.sql.SparkSession object BasicAnaylse { val spark=SparkSession.builder().appName("BasicAnalyse") .master("local[*]") 注释其中重要部分的意思 .enableHiveSupport() .getOrCreate() // spark.sparkContext.setLogLevel("WARN") def main(args: Array[String]): Unit = { //探索每个表中的重复记录表和空值记录数 val tableName = Array("media_index","mediamatch_userevent","mediamatch_usermsg","mmconsume_billevents","order_index") var i = ""; for(i<-tableName){ Analyse(i) } // val mediamatch_userevent = spark.table("user_project.mediamatch_userevent") // mediamatch_userevent.show(false) } def Analyse(tableName:String): Unit ={ val data = spark.table("user_project."+tableName) print(tableName+"表数据:"+data.count()) print(tableName+"表phone_no字段为空数:"+(data.count()-data.select("phone_no").na.drop().count)) } }
时间: 2024-04-28 08:26:12 浏览: 161
这段代码是一个基本的数据分析程序,使用 Apache Spark SQL 的 SparkSession 类来创建一个 Spark 应用程序。它的主要作用是对指定的表进行分析,包括查找重复记录表和空值记录数。其中重要的部分是:
- `appName("BasicAnalyse")`:设置应用程序的名称为 "BasicAnalyse"。
- `master("local[*]")`:设置应用程序在本地运行,使用所有可用的 CPU 核心。
- `enableHiveSupport()`:启用 Hive 支持,可以在 Spark SQL 中访问 Hive 表。
- `getOrCreate()`:获取或创建 SparkSession 实例。
- `spark.table("user_project."+tableName)`:通过 SparkSession 实例获取指定表的 DataFrame。
在 `main` 函数中,使用一个循环语句遍历指定的表,并调用 `Analyse` 函数对每个表进行分析。在 `Analyse` 函数中,使用 `count()` 函数获取表的总记录数,使用 `na.drop()` 函数删除空值记录,计算出表中 `phone_no` 字段的空值记录数。
相关问题
分析代码package analyse import org.apache.spark.sql.SparkSession object BasicAnaylse { val spark=SparkSession.builder() .appName("BasicAnalyse") .master("local[*]") .enableHiveSupport() .getOrCreate() spark.sparkContext.setLogLevel("WARN") def main(args: Array[String]): Unit = { val tableName = Array("media_index","mediamatch_userevent","mediamatch_usermsg","mmconsume_billevents","order_index") var i = ""; for ( i<-tableName){ Analyse(i) } } def Analyse(tableName:String):Unit={ val data = spark.table(tableName = "user_project."+tableName) println(tableName+"表数据:"+data.count()) println(tableName+"表字段为空数:"+(data.count()-data.select("phone_no").na.drop().count)) } }
根据你提供的代码,我们可以看出该代码的功能是对指定的5个表进行数据分析,输出表数据的总量和字段为空的数量。具体分析如下:
1. 导入了SparkSession类,用于连接Spark集群。
2. 创建了一个BasicAnalyse对象,并定义了SparkSession对象spark,用于连接Spark集群和设置日志级别为WARN。
3. 在main函数中,定义了一个包含5个表名的数组tableName,并使用for循环遍历该数组,依次调用Analyse函数进行分析。
4. Analyse函数中,首先通过SparkSession对象spark的table方法获取指定表名的数据。
5. 然后使用count方法计算数据总量,并使用na.drop().count方法计算字段为空的数量。
6. 最后输出表数据的总量和字段为空的数量。
需要注意的是,该代码使用了Spark SQL来操作数据,因此需要确保Spark集群已经启动并且可以访问到指定的表。同时,该代码中使用了变量i来遍历表名数组tableName,可以使用更加直观的方式来定义变量名。例如,可以使用tableName.foreach(tableName => Analyse(tableName))来遍历表名数组并调用Analyse函数进行分析。
解释下列代码package sparkML import org.apache.spark.ml.clustering.{KMeans, KMeansModel} import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer} import org.apache.spark.sql.{DataFrame, SparkSession} object Kmeans { case class Product(id: String, wd: String, jd: String, sjc: String, jyje: String) def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("TFIDF@2") .master("local[3]") .getOrCreate() import spark.implicits._ val product = spark.sparkContext.textFile("data/data.txt").map { x => val data = x.split(",") Product(data(0), data(1), data(2), data(3), data(4)) }.toDS().cache() product.createOrReplaceTempView("data") val products: DataFrame = spark.sql("select id, concat_ws(',',collect_set(jyje)) as jyje from data group by id") val productData = new Tokenizer().setInputCol("jyje").setOutputCol("productWords").transform(products) val tfData = new HashingTF().setNumFeatures(20).setInputCol("productWords").setOutputCol("productFeatures").transform(productData) val idfModel = new IDF().setInputCol("productFeatures").setOutputCol("features").fit(tfData) val idfData = idfModel.transform(tfData) val trainingData = idfData.select("id", "features") val kmeans = new KMeans() .setK(4) .setMaxIter(5) .setFeaturesCol("features") .setPredictionCol("prediction") val kmeansModel = kmeans.fit(trainingData) val kmeansData = kmeansModel.transform(trainingData) kmeansData.show() } }
这段代码是一个使用Spark ML库实现KMeans聚类算法的示例。代码中定义了一个名为Product的case class,表示一个产品的属性集合。代码中首先使用SparkContext的textFile方法读取数据文件,然后使用map方法将每行数据转换为Product对象,并转换成DataFrame格式。接着,使用Spark SQL对DataFrame进行分组和聚合操作,得到每个产品的交易金额数据。然后,使用Tokenizer和HashingTF对文本进行分词和特征提取,使用IDF对特征进行加权,得到最终的训练数据。最后,使用KMeans算法对训练数据进行聚类,并输出聚类结果。
阅读全文