在scala中实现,有rating.csv和moives.csv两个文件,编程实现RDD转DataFrame,并选择一个合理的SQL分析
时间: 2023-11-22 14:56:24 浏览: 203
pyspark 读取csv文件创建DataFrame的两种方法
在Scala中实现将RDD转换成DataFrame可以使用Spark SQL的编程接口。具体实现步骤如下:
1. 创建SparkSession对象
```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("RDD to DataFrame")
.master("local[*]")
.getOrCreate()
```
2. 读取rating.csv和movies.csv文件,将数据转换成RDD
```scala
val ratingRDD = spark.sparkContext.textFile("path/to/rating.csv")
val moviesRDD = spark.sparkContext.textFile("path/to/movies.csv")
```
3. 定义rating.csv和movies.csv文件中数据的Schema
```scala
import org.apache.spark.sql.types._
val ratingSchema = StructType(Seq(
StructField("userId", IntegerType, nullable = false),
StructField("movieId", IntegerType, nullable = false),
StructField("rating", DoubleType, nullable = false),
StructField("timestamp", LongType, nullable = false)
))
val moviesSchema = StructType(Seq(
StructField("movieId", IntegerType, nullable = false),
StructField("title", StringType, nullable = false),
StructField("genres", StringType, nullable = false)
))
```
4. 将ratingRDD和moviesRDD转换成DataFrame
```scala
import org.apache.spark.sql.Row
val ratingRowRDD = ratingRDD.map(line => Row.fromSeq(line.split(",")))
val ratingDF = spark.createDataFrame(ratingRowRDD, ratingSchema)
val moviesRowRDD = moviesRDD.map(line => Row.fromSeq(line.split(",")))
val moviesDF = spark.createDataFrame(moviesRowRDD, moviesSchema)
```
5. 使用Spark SQL进行分析
```scala
import org.apache.spark.sql.functions._
// 计算每个电影的平均评分
val movieAvgRatingDF = ratingDF.groupBy("movieId")
.agg(avg("rating").as("avgRating"))
// 查找最受欢迎的电影(平均评分最高)
val mostPopularMovieDF = movieAvgRatingDF.join(moviesDF, Seq("movieId"))
.orderBy(desc("avgRating"))
.limit(10)
mostPopularMovieDF.show()
```
以上代码实现了将rating.csv和movies.csv文件中的数据转换成DataFrame,并使用Spark SQL计算每个电影的平均评分,以及查找平均评分最高的前10个电影。
阅读全文