用sparksql进行编程,完成下列需求:(1)读取restaurant.csv 数据,删除最后为空值的两列,再删除含有空值的行。(2)筛选出口味评分大于7分的数据(3)统计各类别餐饮店点评数,并按降序排列(4)将步骤(2)和(3)的结果保存在HDFS上
时间: 2024-02-13 19:06:35 浏览: 150
python 对csv数据的读取、空值、重复的删除、数据的保存为csv
好的,以下是对应的代码实现,需要注意的是,由于我无法直接读取你的本地文件,以下代码中的路径需要自行修改。
```scala
import org.apache.spark.sql.{DataFrame, SparkSession}
object RestaurantAnalysis {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("Restaurant Analysis")
.master("local[*]")
.getOrCreate()
// 读取restaurant.csv 数据,删除最后为空值的两列,再删除含有空值的行
val restaurantDf = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("file:///path/to/restaurant.csv")
.drop("last_col1", "last_col2")
.na.drop()
// 筛选出口味评分大于7分的数据
val highScoreDf = restaurantDf.filter("taste > 7")
// 统计各类别餐饮店点评数,并按降序排列
val categoryCountDf = restaurantDf.groupBy("category").count().orderBy("count")
// 将步骤(2)和(3)的结果保存在HDFS上
highScoreDf.write.mode("overwrite").csv("hdfs://path/to/highScoreDf.csv")
categoryCountDf.write.mode("overwrite").csv("hdfs://path/to/categoryCountDf.csv")
// 关闭SparkSession
spark.stop()
}
}
```
需要注意的是,上述代码中的HDFS路径需要根据实际情况进行修改,同时需要保证HDFS已经正确配置,并且可以在本机上访问。
阅读全文