在scala中利用ratings.csv和movies.csv,编程实现RDD转Data f r a me
时间: 2024-02-01 14:16:11 浏览: 152
在 Scala 中,可以使用 Spark 中的 `SparkSession` 对象将 RDD 转换为 DataFrame。假设我们有以下的 `ratings.csv` 和 `movies.csv` 文件:
```
// ratings.csv
userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
2,1,3.0,964981247
2,3,1.0,964982224
2,6,2.0,964983815
2,47,4.0,964982931
2,50,3.0,964982703
// movies.csv
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
```
我们可以通过以下代码将它们转换为 DataFrame:
```scala
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types._
// 创建 SparkSession 对象
val spark = SparkSession.builder()
.appName("RDD to DataFrame")
.master("local[*]")
.getOrCreate()
// 读取 ratings.csv 文件
val ratingsRDD = spark.sparkContext.textFile("ratings.csv")
.map(_.split(","))
.map(row => Row(row(0).toInt, row(1).toInt, row(2).toDouble, row(3).toLong))
// 定义 ratings 的 schema
val ratingsSchema = StructType(
Seq(
StructField("userId", IntegerType, true),
StructField("movieId", IntegerType, true),
StructField("rating", DoubleType, true),
StructField("timestamp", LongType, true)
)
)
// 将 RDD 转换为 DataFrame
val ratingsDF = spark.createDataFrame(ratingsRDD, ratingsSchema)
// 读取 movies.csv 文件
val moviesRDD = spark.sparkContext.textFile("movies.csv")
.map(_.split(","))
.map(row => Row(row(0).toInt, row(1), row(2)))
// 定义 movies 的 schema
val moviesSchema = StructType(
Seq(
StructField("movieId", IntegerType, true),
StructField("title", StringType, true),
StructField("genres", StringType, true)
)
)
// 将 RDD 转换为 DataFrame
val moviesDF = spark.createDataFrame(moviesRDD, moviesSchema)
```
上述代码中,我们首先创建了一个 `SparkSession` 对象,然后使用 `textFile` 方法读取 `ratings.csv` 和 `movies.csv` 文件,并将每一行数据转换为 `Row` 对象。接着,我们定义了 `ratings` 和 `movies` 的 schema,然后使用 `createDataFrame` 方法将 RDD 转换为 DataFrame。最终,我们得到了 `ratingsDF` 和 `moviesDF` 两个 DataFrame 对象。
阅读全文