在scala中实现,有rating.csv和moives.csv两个文件,编程实现RDD转DataFrame,并选择一个合理的SQL分析
时间: 2023-11-22 16:56:17 浏览: 167
pyspark 读取csv文件创建DataFrame的两种方法
首先,我们需要将rating.csv和movies.csv两个文件读入到RDD中,并对数据进行处理,然后将其转换为DataFrame。
以下是Scala代码示例:
```scala
import org.apache.spark.sql.SparkSession
object RDDToDataFrameExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("RDDToDataFrameExample")
.master("local[*]")
.getOrCreate()
// 读取rating.csv文件并转换成RDD
val ratingRDD = spark.sparkContext.textFile("rating.csv")
.map(line => line.split(","))
.map(cols => (cols(0).toInt, cols(1).toInt, cols(2).toDouble, cols(3).toInt))
// 读取movies.csv文件并转换成RDD
val moviesRDD = spark.sparkContext.textFile("movies.csv")
.map(line => line.split(","))
.map(cols => (cols(0).toInt, cols(1), cols(2)))
// 将ratingRDD转换为DataFrame
val ratingDF = spark.createDataFrame(ratingRDD)
.toDF("userId", "movieId", "rating", "timestamp")
// 将moviesRDD转换为DataFrame
val moviesDF = spark.createDataFrame(moviesRDD)
.toDF("movieId", "title", "genres")
// 注册DataFrame为临时表
ratingDF.createOrReplaceTempView("ratings")
moviesDF.createOrReplaceTempView("movies")
// SQL查询语句
val sqlQuery = "SELECT title, AVG(rating) AS avg_rating FROM ratings JOIN movies ON ratings.movieId = movies.movieId GROUP BY title ORDER BY avg_rating DESC LIMIT 10"
// 执行SQL查询
val result = spark.sql(sqlQuery)
// 打印结果
result.show()
spark.stop()
}
}
```
上述代码实现了以下操作:
1. 读取rating.csv和movies.csv文件,并将它们转换为RDD。
2. 将ratingRDD和moviesRDD转换为DataFrame。
3. 注册DataFrame为临时表,以便进行SQL查询。
4. 编写SQL查询语句,查询每部电影的平均评分并按照平均评分排序。
5. 执行SQL查询并打印结果。
在这个例子中,我们选择了一个简单的SQL查询,查询每部电影的平均评分并按照平均评分排序。你可以根据自己的需求选择合适的SQL查询。
阅读全文