在scala中,有ratings.csv和movies.csv,编程实现RDD转DataFrame,选择一个合理的SQL分析
时间: 2024-02-01 22:13:37 浏览: 99
首先,我们可以通过以下代码将ratings.csv和movies.csv文件加载为RDD:
```scala
val spark = SparkSession.builder().appName("rating").getOrCreate()
val sc = spark.sparkContext
val ratingsRdd = sc.textFile("ratings.csv")
val moviesRdd = sc.textFile("movies.csv")
```
然后,我们可以使用case class定义ratings和movies的模式,并将RDD转换为DataFrame:
```scala
case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Long)
case class Movie(movieId: Int, title: String, genres: String)
val ratings = ratingsRdd.map(_.split(",")).map(r => Rating(r(0).toInt, r(1).toInt, r(2).toDouble, r(3).toLong))
val movies = moviesRdd.map(_.split(",")).map(m => Movie(m(0).toInt, m(1), m(2)))
import spark.implicits._
val ratingsDf = ratings.toDF()
val moviesDf = movies.toDF()
```
接下来,我们可以将ratings和movies表注册为临时表(temp table)并执行SQL查询,例如:
```scala
ratingsDf.createOrReplaceTempView("ratings")
moviesDf.createOrReplaceTempView("movies")
val topRatedMovies = spark.sql("SELECT m.title, avg(r.rating) as avg_rating FROM ratings r JOIN movies m ON r.movieId = m.movieId GROUP BY m.title ORDER BY avg_rating DESC LIMIT 10")
topRatedMovies.show()
```
该查询将返回平均评分最高的前10部电影的标题和平均评分。
阅读全文