Scala ratings.csv文件 文件里面的内容包含了每一个用户对于每一部电影的评分。数据格式如下: userId, movieId, rating, timestamp userId: 每个用户的id movieId: 每部电影的id rating: 用户评分,是5星制 timestamp: 自1970年1月1日零点后到用户提交评价的时间的秒数 movies.csv文件 movieId: 每部电影的Id title:每部电影的名字 编程实现RDD转DataFrame,并选择一个合理的SQL分析。
时间: 2024-02-12 22:07:54 浏览: 224
ratings.csv
以下是Scala代码将ratings.csv文件转换为DataFrame,并对数据进行简单的SQL分析:
```scala
import org.apache.spark.sql.SparkSession
// 创建SparkSession
val spark = SparkSession.builder()
.appName("ratings.csv Analysis")
.master("local[*]")
.getOrCreate()
// 读取ratings.csv文件
val ratingsRDD = spark.sparkContext.textFile("ratings.csv")
// 将每行数据按逗号分隔,并转换为(用户ID, 电影ID, 评分)的元组
val ratings = ratingsRDD.map(line => {
val fields = line.split(",")
(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
})
// 将元组转换为DataFrame,并为列命名
import spark.implicits._
val ratingsDF = ratings.toDF("userId", "movieId", "rating")
// 创建movies.csv文件的DataFrame
val moviesDF = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("movies.csv")
// 使用SQL分析,找出每部电影的平均评分
ratingsDF.createOrReplaceTempView("ratings")
moviesDF.createOrReplaceTempView("movies")
val avgRatings = spark.sql("SELECT title, AVG(rating) AS avg_rating FROM ratings JOIN movies ON ratings.movieId = movies.movieId GROUP BY title ORDER BY avg_rating DESC")
// 显示结果
avgRatings.show()
```
在上面的代码中,我们首先使用SparkSession创建一个名为“ratings.csv Analysis”的应用程序,并读取ratings.csv文件。我们将每行数据按逗号分隔,并将其转换为(用户ID, 电影ID, 评分)的元组。然后,我们将元组转换为DataFrame,并为列命名。接下来,我们读取movies.csv文件,并将其转换为DataFrame。我们使用SQL分析找出每部电影的平均评分,并将结果按平均评分降序排列。最后,我们显示结果。
阅读全文