在scala中利用ratings.csv和movies.csv,编程实现RDD转Data f r a me
时间: 2024-02-01 10:16:42 浏览: 26
在Scala中,我们可以使用Spark SQL来实现RDD转Data Frame。以下是一份示例代码:
```scala
import org.apache.spark.sql.{SparkSession, Row, DataFrame}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
object RDDToDataFrameExample {
def main(args: Array[String]) {
// 创建 SparkSession 对象
val spark = SparkSession.builder()
.appName("RDDToDataFrameExample")
.master("local[*]")
.getOrCreate()
// 读取 movies.csv 数据
val moviesRDD = spark.sparkContext.textFile("movies.csv")
val moviesHeader = moviesRDD.first()
val moviesSchema = StructType(moviesHeader.split(",").map(field => StructField(field, StringType, nullable = true)))
val moviesRows = moviesRDD.filter(_ != moviesHeader).map(_.split(",")).map(row => Row.fromSeq(row))
val moviesDF = spark.createDataFrame(moviesRows, moviesSchema)
// 读取 ratings.csv 数据
val ratingsRDD = spark.sparkContext.textFile("ratings.csv")
val ratingsHeader = ratingsRDD.first()
val ratingsSchema = StructType(ratingsHeader.split(",").map(field => StructField(field, StringType, nullable = true)))
val ratingsRows = ratingsRDD.filter(_ != ratingsHeader).map(_.split(",")).map(row => Row.fromSeq(row))
val ratingsDF = spark.createDataFrame(ratingsRows, ratingsSchema)
// 将两个 DataFrame 进行 join 操作
val joinedDF = moviesDF.join(ratingsDF, Seq("movieId"), "inner")
// 打印结果
joinedDF.show()
// 关闭 SparkSession 对象
spark.stop()
}
}
```
在这个示例中,我们首先读取 movies.csv 和 ratings.csv 文件,并使用 StructType 定义它们的数据结构。然后,我们将每行数据转换为 Row 对象,并使用 createDataFrame 方法将它们转换为 DataFrame。最后,我们使用 join 操作将两个 DataFrame 进行连接,并使用 show 方法打印结果。
请注意,这只是一个示例代码,你需要根据实际情况进行相应的修改。