在scala中利用ratings.csv和movies.csv,编程实现RDD转Data f r a me
时间: 2024-02-01 15:16:43 浏览: 63
spark: RDD与DataFrame之间的相互转换方法
在Scala中,可以使用Spark SQL的API将RDD转换为DataFrame。下面是一个实现示例:
```scala
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object RDD2DataFrameExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD to DataFrame Example").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// Read ratings.csv into RDD
val ratingsRDD = sc.textFile("ratings.csv").map(line => line.split(",")).map(tokens => (tokens(0).toInt, tokens(1).toInt, tokens(2).toDouble))
// Define schema for ratings DataFrame
val ratingsSchema = StructType(
StructField("userId", IntegerType, true) ::
StructField("movieId", IntegerType, true) ::
StructField("rating", IntegerType, true) :: Nil)
// Convert RDD to DataFrame
val ratingsDF = sqlContext.createDataFrame(ratingsRDD).toDF("userId", "movieId", "rating").select("userId", "movieId", "rating")
// Read movies.csv into RDD
val moviesRDD = sc.textFile("movies.csv").map(line => line.split(",")).map(tokens => (tokens(0).toInt, tokens(1)))
// Define schema for movies DataFrame
val moviesSchema = StructType(
StructField("movieId", IntegerType, true) ::
StructField("title", StringType, true) :: Nil)
// Convert RDD to DataFrame
val moviesDF = sqlContext.createDataFrame(moviesRDD).toDF("movieId", "title").select("movieId", "title")
// Join ratingsDF and moviesDF on movieId column
val joinedDF = ratingsDF.join(moviesDF, "movieId")
// Show the resulting DataFrame
joinedDF.show()
}
}
```
请注意,上面的代码假设ratings.csv和movies.csv文件都在当前工作目录中。您需要根据实际情况更改文件路径。
阅读全文