在scala中,有ratings.csv和movies.csv,编程实现RDD转DataFrame,选择一个合理的SQL分析
时间: 2024-02-01 16:13:17 浏览: 70
可以使用Spark SQL来实现RDD转DataFrame的操作,代码如下:
```scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType}
val spark = SparkSession.builder().appName("RDD to DataFrame").master("local[*]").getOrCreate()
// 定义ratings.csv的Schema
val ratingsSchema = StructType(Array(
StructField("userId", IntegerType, true),
StructField("movieId", IntegerType, true),
StructField("rating", DoubleType, true),
StructField("timestamp", StringType, true)
))
// 定义movies.csv的Schema
val moviesSchema = StructType(Array(
StructField("movieId", IntegerType, true),
StructField("title", StringType, true),
StructField("genres", StringType, true)
))
// 读取ratings.csv和movies.csv文件
val ratingsRDD = spark.sparkContext.textFile("path/to/ratings.csv").map(_.split(",")).map(row => (row(0).toInt, row(1).toInt, row(2).toDouble, row(3)))
val moviesRDD = spark.sparkContext.textFile("path/to/movies.csv").map(_.split(",")).map(row => (row(0).toInt, row(1), row(2)))
// 将RDD转为DataFrame
val ratingsDF = spark.createDataFrame(ratingsRDD).toDF("userId", "movieId", "rating", "timestamp")
val moviesDF = spark.createDataFrame(moviesRDD).toDF("movieId", "title", "genres")
// 注册为临时表
ratingsDF.createOrReplaceTempView("ratings")
moviesDF.createOrReplaceTempView("movies")
// 执行SQL查询
val result = spark.sql("SELECT title, AVG(rating) AS avg_rating FROM movies JOIN ratings ON movies.movieId = ratings.movieId GROUP BY title ORDER BY avg_rating DESC LIMIT 10")
result.show()
```
此处选择的SQL分析是查询每部电影的平均评分,并按照平均评分从高到低排序,返回前10部电影的标题和平均评分。
阅读全文