在scala中实现,有rating.csv和moives.csv两个文件,编程实现RDD转DataFrame,并选择一个合理的SQL分析
时间: 2023-11-22 10:56:07 浏览: 86
pyspark 读取csv文件创建DataFrame的两种方法
在 Scala 中,我们可以使用 Spark SQL 的 API 将 RDD 转换为 DataFrame。以下是一个基本示例,假设你已经将两个 CSV 文件加载到了 RDD 中。
```scala
import org.apache.spark.sql.{DataFrame, SparkSession}
// 创建 SparkSession
val spark = SparkSession.builder().appName("RDD to DataFrame").master("local").getOrCreate()
// 读取 rating.csv 文件并创建 RDD
val ratingRDD = spark.sparkContext.textFile("path/to/rating.csv")
// 将 RDD 转换为 DataFrame
val ratingDF = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(ratingRDD)
// 读取 movies.csv 文件并创建 RDD
val moviesRDD = spark.sparkContext.textFile("path/to/movies.csv")
// 将 RDD 转换为 DataFrame
val moviesDF = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(moviesRDD)
```
在这里,我们使用 `read` 方法将 RDD 转换为 DataFrame。`option("header", "true")` 表示 CSV 文件中包含列名,`option("inferSchema", "true")` 表示让 Spark 推断列的数据类型。
接下来,我们可以使用 Spark SQL 的 API 执行 SQL 分析。以下是一个简单的示例,使用内置的 `count` 函数统计每个电影的评级数:
```scala
import org.apache.spark.sql.functions._
// 将 ratingDF 和 moviesDF 进行关联,得到每个电影的评级数
val joinedDF = ratingDF.join(moviesDF, "movieId")
val ratingCountDF = joinedDF.groupBy("title").agg(count("rating").as("rating_count"))
// 打印前 10 行
ratingCountDF.show(10)
```
在这个例子中,我们首先将 `ratingDF` 和 `moviesDF` 进行关联,得到每个电影的评级信息和电影名称。然后,我们使用 `groupBy` 方法按电影名称分组,并使用 `count` 函数统计每个电影的评级数。最后,我们使用 `show` 方法打印前 10 行结果。
注意,这只是一个简单的示例。你可以根据实际需求编写更复杂的 SQL 分析。
阅读全文