使用Spark SQL分析,找出每种类型的评分最高10部电影
时间: 2024-03-23 19:40:17 浏览: 59
Atlas Spark SQL血缘分析,HBASE
好的,以下是使用Spark SQL分析找出每种类型的评分最高10部电影的Scala代码实现:
```scala
import org.apache.spark.sql.{DataFrame, SparkSession}
object MovieAnalysis {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("MovieAnalysis")
.master("local")
.getOrCreate()
// 加载movies.csv文件为DataFrame
val moviesDF = spark.read.format("csv")
.option("header", "true")
.load("movies.csv")
// 加载ratings.csv文件为DataFrame
val ratingsDF = spark.read.format("csv")
.option("header", "true")
.load("ratings.csv")
// 注册movies表
moviesDF.createOrReplaceTempView("movies")
// 注册ratings表
ratingsDF.createOrReplaceTempView("ratings")
// 执行SQL查询,找出每种类型的评分最高10部电影
val resultDF = spark.sql(
"""
|SELECT movieId, title, genres, avg(rating) as avg_rating
|FROM movies
|JOIN ratings ON movies.movieId = ratings.movieId
|GROUP BY movieId, title, genres
|ORDER BY genres, avg_rating DESC
|""".stripMargin)
// 输出结果
resultDF.show(false)
spark.stop()
}
}
```
解释一下上面的代码:
1. 首先创建SparkSession对象,设置应用名和运行模式。
2. 使用read函数加载movies.csv和ratings.csv文件,生成DataFrame。
3. 使用createOrReplaceTempView函数注册movies表和ratings表,方便后续的SQL查询。
4. 执行SQL查询,使用JOIN操作连接movies表和ratings表,按照电影类型和评分进行分组和排序,找出每种类型的评分最高10部电影。
5. 输出结果,关闭SparkSession。
注意:在执行该程序之前需要先安装Spark并配置环境变量,同时需要将movies.csv和ratings.csv文件放在同一目录下。
阅读全文