Scala编程实现RDD转DataFrame,并选择一个合理的SQL分析。 ratings.csv文件和movies.csv文件 ratings.csv文件里面的内容包含了每一个用户对于每一部电影的评分。数据格式如下: userId, movieId, rating, timestamp userId: 每个用户的id movieId: 每部电影的id rating: 用户评分,是5星制 timestamp: 自1970年1月1日零点后到用户提交评价的时间的秒数 movies.csv文件 movieId: 每部电影的Id title:每部电影的名字 import org.apache.spark.sql.Row import org.apache.spark.sql.types._
时间: 2024-02-11 14:08:16 浏览: 115
首先,我们需要导入必要的包并读取两个CSV文件为RDD对象:
```scala
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val ratingsRDD = sc.textFile("ratings.csv")
.map(line => line.split(","))
.map(cols => Row(cols(0).toInt, cols(1).toInt, cols(2).toDouble))
val moviesRDD = sc.textFile("movies.csv")
.map(line => line.split(","))
.map(cols => Row(cols(0).toInt, cols(1), cols(2)))
```
接下来,我们需要定义ratings.csv和movies.csv文件的Schema:
```scala
val ratingsSchema = StructType(
Array(
StructField("userId", IntegerType, true),
StructField("movieId", IntegerType, true),
StructField("rating", DoubleType, true)
)
)
val moviesSchema = StructType(
Array(
StructField("movieId", IntegerType, true),
StructField("title", StringType, true),
StructField("genres", StringType, true)
)
)
```
然后,我们可以使用SparkSession对象将RDD对象转换为DataFrame对象:
```scala
val spark = SparkSession.builder().appName("RDD to DataFrame").getOrCreate()
val ratingsDF = spark.createDataFrame(ratingsRDD, ratingsSchema)
val moviesDF = spark.createDataFrame(moviesRDD, moviesSchema)
```
现在,我们已经成功将两个RDD对象转换为DataFrame对象。
接下来,我们可以使用Spark SQL对这些数据进行分析,例如,我们可以使用以下代码来计算每部电影的平均评分:
```scala
ratingsDF.createOrReplaceTempView("ratings")
moviesDF.createOrReplaceTempView("movies")
val avgRatings = spark.sql("SELECT movies.title, AVG(ratings.rating) as avg_rating FROM ratings JOIN movies ON ratings.movieId = movies.movieId GROUP BY movies.title ORDER BY avg_rating DESC")
avgRatings.show()
```
这将输出每部电影的平均评分,按照评分从高到低排序。
阅读全文