Scala Ratings.csv文件 文件里面的内容包含了每一个用户对于每一部电影的评分。数据格式如下: userId, movieId, rating, timestamp userId: 每个用户的id movieId: 每部电影的id rating: 用户评分,是5星制 timestamp: 自1970年1月1日零点后到用户提交评价的时间的秒数 movies.csv文件 movieId: 每部电影的Id title:每部电影的名字 编程实现RDD转DataFrame,并选择一个合理的SQL分析。
时间: 2024-02-11 22:07:41 浏览: 40
首先,我们需要读取两个文件,分别为Ratings.csv和movies.csv文件,并将它们转换为RDD。
```scala
val spark = SparkSession.builder().appName("RDD to DataFrame").master("local[*]").getOrCreate()
val ratingsRDD = spark.sparkContext.textFile("path/to/Ratings.csv")
val moviesRDD = spark.sparkContext.textFile("path/to/movies.csv")
```
接下来,我们需要解析每个文件中的数据,并将其转换为DataFrame。对于Ratings.csv文件,我们需要将其转换为包含四个字段的DataFrame:userId、movieId、rating和timestamp。对于movies.csv文件,我们需要将其转换为包含两个字段的DataFrame:movieId和title。
```scala
import org.apache.spark.sql.functions._
// 解析Ratings.csv文件,并将其转换为DataFrame
val ratingsDF = ratingsRDD.map(line => {
val fields = line.split(",")
(fields(0).toInt, fields(1).toInt, fields(2).toDouble, fields(3).toLong)
}).toDF("userId", "movieId", "rating", "timestamp")
// 解析movies.csv文件,并将其转换为DataFrame
val moviesDF = moviesRDD.map(line => {
val fields = line.split(",")
(fields(0).toInt, fields(1))
}).toDF("movieId", "title")
```
现在我们可以将这两个DataFrame进行连接,以便进行SQL分析。例如,我们可以计算每个电影的平均评分,并按照评分降序排列。
```scala
import spark.implicits._
// 将ratingsDF和moviesDF进行连接
val joinedDF = ratingsDF.join(moviesDF, Seq("movieId"), "inner")
// 计算每个电影的平均评分,并按照评分降序排列
val avgRatingsDF = joinedDF.groupBy($"movieId", $"title").agg(avg($"rating").as("avg_rating"))
avgRatingsDF.orderBy($"avg_rating".desc).show()
```
以上代码会输出每个电影的平均评分,并按照评分降序排列的结果。
相关推荐
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![xlsx](https://img-home.csdnimg.cn/images/20210720083732.png)