在scala中实现,有rating.csv和moives.csv两个文件,编程实现RDD转DataFrame,并选择一个合理的SQL分析
时间: 2023-11-27 19:49:41 浏览: 89
pyspark 读取csv文件创建DataFrame的两种方法
首先,我们需要导入Spark SQL和DataFrame相关的库:
```scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
```
接着,我们可以创建一个SparkSession:
```scala
val spark = SparkSession.builder()
.appName("RDD to DataFrame")
.master("local[*]")
.getOrCreate()
```
然后,我们可以读取rating.csv和movies.csv文件,并创建对应的DataFrame:
```scala
val ratingSchema = StructType(
StructField("userId", IntegerType, true) ::
StructField("movieId", IntegerType, true) ::
StructField("rating", DoubleType, true) ::
StructField("timestamp", StringType, true) :: Nil)
val movieSchema = StructType(
StructField("movieId", IntegerType, true) ::
StructField("title", StringType, true) ::
StructField("genres", StringType, true) :: Nil)
val ratingDF = spark.read
.option("header", true)
.option("inferSchema", false)
.schema(ratingSchema)
.csv("path/to/rating.csv")
val movieDF = spark.read
.option("header", true)
.option("inferSchema", false)
.schema(movieSchema)
.csv("path/to/movies.csv")
```
接下来,我们可以将两个DataFrame合并,并使用Spark SQL进行数据分析。例如,我们可以统计每个电影的平均评分:
```scala
ratingDF.createOrReplaceTempView("ratings")
movieDF.createOrReplaceTempView("movies")
val resultDF = spark.sql("SELECT title, AVG(rating) as avg_rating FROM ratings JOIN movies ON ratings.movieId = movies.movieId GROUP BY title ORDER BY avg_rating DESC")
resultDF.show()
```
这里我们首先将ratingDF和movieDF注册成临时表,然后使用JOIN操作将它们合并。最后,我们使用GROUP BY和AVG函数来计算每个电影的平均评分,并按照平均评分排序输出结果。
注意,上面的代码中的路径需要替换成实际文件路径。
阅读全文