在scala中,有ratings.csv和movies.csv,编程实现RDD转dataframe,选择一个合理的SQL分析
时间: 2024-02-03 14:11:49 浏览: 94
在Scala中,可以使用Spark SQL来实现RDD转DataFrame。假设我们已经读取了ratings.csv和movies.csv文件并将它们转换成了RDD,代码如下:
```scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StructType, StringType}
val spark = SparkSession.builder()
.appName("RDD to DataFrame")
.master("local[*]")
.getOrCreate()
val ratingsRdd = spark.sparkContext.textFile("ratings.csv")
val moviesRdd = spark.sparkContext.textFile("movies.csv")
```
接下来,我们需要定义ratings.csv和movies.csv的模式,然后将它们转换成DataFrame:
```scala
val ratingsSchema = new StructType().add("user_id", IntegerType, true)
.add("movie_id", IntegerType, true)
.add("rating", IntegerType, true)
.add("timestamp", IntegerType, true)
val ratingsDf = spark.createDataFrame(ratingsRdd.map(_.split(","))
.map(x => (x(0).toInt, x(1).toInt, x(2).toInt, x(3).toInt)))
.toDF("user_id", "movie_id", "rating", "timestamp")
val moviesSchema = new StructType().add("movie_id", IntegerType, true)
.add("title", StringType, true)
.add("genres", StringType, true)
val moviesDf = spark.createDataFrame(moviesRdd.map(_.split(","))
.map(x => (x(0).toInt, x(1), x(2))))
.toDF("movie_id", "title", "genres")
```
现在,我们已经成功将RDD转换成DataFrame,接下来可以使用Spark SQL进行数据分析。比如,我们可以计算每个电影的平均评分:
```scala
ratingsDf.createOrReplaceTempView("ratings")
moviesDf.createOrReplaceTempView("movies")
val avgRatingsDf = spark.sql("SELECT m.title, AVG(r.rating) as avg_rating " +
"FROM movies m " +
"JOIN ratings r ON m.movie_id = r.movie_id " +
"GROUP BY m.title " +
"ORDER BY avg_rating DESC")
avgRatingsDf.show()
```
这段代码首先将ratingsDf和moviesDf注册为临时表,然后使用SQL查询语句计算每个电影的平均评分。结果会按照平均评分从高到低排序并输出。
阅读全文