在scala中实现,有rating.csv和moives.csv两个文件,编程实现RDD转DataFrame,并选择一个合理的SQL分析
时间: 2023-11-22 10:56:11 浏览: 143
在 Scala 中,我们可以使用 Spark SQL 模块来实现将 RDD 转换为 DataFrame,并进行 SQL 分析。
以下是一个示例代码,假设 rating.csv 和 movies.csv 文件已经上传到了 HDFS 上:
```scala
import org.apache.spark.sql.SparkSession
object RDDToDataFrame {
case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Long)
case class Movie(movieId: Int, title: String, genres: String)
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("RDDToDataFrame")
.master("local[*]")
.getOrCreate()
// 读取 rating.csv 文件
val ratingRDD = spark.sparkContext.textFile("hdfs://path/to/rating.csv")
.map(line => {
val fields = line.split(",")
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble, fields(3).toLong)
})
// 读取 movies.csv 文件
val movieRDD = spark.sparkContext.textFile("hdfs://path/to/movies.csv")
.map(line => {
val fields = line.split(",")
Movie(fields(0).toInt, fields(1), fields(2))
})
// 将 RDD 转换为 DataFrame
import spark.implicits._
val ratingDF = ratingRDD.toDF()
val movieDF = movieRDD.toDF()
// 创建表
ratingDF.createOrReplaceTempView("ratings")
movieDF.createOrReplaceTempView("movies")
// 执行 SQL 查询
val result = spark.sql("SELECT title, AVG(rating) AS avg_rating FROM ratings JOIN movies ON ratings.movieId = movies.movieId GROUP BY title ORDER BY avg_rating DESC LIMIT 10")
// 展示结果
result.show()
// 关闭 SparkSession
spark.stop()
}
}
```
在上述代码中,我们首先定义了两个 case class,用于表示 rating 和 movie 两个数据集。接着,我们读取了 rating.csv 和 movies.csv 文件,并将它们转换为 RDD。然后,我们使用 `toDF()` 方法将 RDD 转换为 DataFrame,并对 DataFrame 创建了一个临时视图。最后,我们执行了一条 SQL 查询,计算出每个电影的平均评分,并按照平均评分从高到低进行排序,最后展示了前 10 条结果。
需要注意的是,在使用 Spark SQL 进行分析时,我们需要注意数据类型的正确性,例如上述代码中,我们将 timestamp 字段转换为了 Long 类型。同时,还需要注意表的创建和视图的命名,以便后续的查询和操作。