编程实现RDD转DataFrame,实现统计每部电影的评分人数
时间: 2023-08-17 10:15:56 浏览: 94
好的,我会根据您的要求回答问题。下面是实现RDD转DataFrame并统计每部电影的评分人数的代码(假设您的数据集中包含三列:用户ID、电影ID和评分):
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
# 创建SparkSession对象
spark = SparkSession.builder.appName("RDD to DataFrame").getOrCreate()
# 读取数据集
rdd = spark.sparkContext.textFile("ratings.dat")
# 转换RDD为DataFrame
df = rdd.map(lambda x: x.split("::")).\
toDF(["user_id", "movie_id", "rating"])
# 统计每部电影的评分人数
result = df.groupBy("movie_id").agg(count("user_id").alias("num_ratings"))
# 打印结果
result.show()
```
这段代码中,首先创建了一个SparkSession对象,然后读取了一个数据集ratings.dat,将其转换为DataFrame,并使用groupBy和agg函数统计每部电影的评分人数。最后打印结果。
相关问题
spark编程实现RDD转DataFrame,实现统计每部电影的评分人数
好的,下面是使用Spark编程实现RDD转DataFrame并统计每部电影的评分人数的代码(假设您的数据集中包含三列:用户ID、电影ID和评分):
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
# 创建SparkSession对象
spark = SparkSession.builder.appName("RDD to DataFrame").getOrCreate()
# 读取数据集
rdd = spark.sparkContext.textFile("ratings.dat")
# 转换RDD为DataFrame
df = rdd.map(lambda x: x.split("::")).\
toDF(["user_id", "movie_id", "rating"])
# 注册为临时表
df.createOrReplaceTempView("ratings")
# 使用Spark SQL统计每部电影的评分人数
result = spark.sql("SELECT movie_id, COUNT(DISTINCT user_id) AS num_ratings FROM ratings GROUP BY movie_id")
# 打印结果
result.show()
```
这段代码中,首先创建了一个SparkSession对象,然后读取了一个数据集ratings.dat,将其转换为DataFrame,并将其注册为一个临时表ratings。接着使用Spark SQL语句统计每部电影的评分人数,并将结果存储在result变量中。最后打印结果。
在scala中编程实现RDD转DataFrame,实现统计每部电影的评分人数
可以按照以下步骤实现:
1. 创建一个 RDD,包含电影ID和评分信息:
```
val ratingsRDD = sc.textFile("ratings.csv").map(line => {
val fields = line.split(",")
(fields(1).toInt, fields(2).toDouble)
})
```
2. 导入 Spark SQL 相关的库,并将 RDD 转换为 DataFrame:
```
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
val spark = SparkSession.builder().appName("RDD to DataFrame").getOrCreate()
val schema = StructType(
StructField("movieId", IntegerType, true) ::
StructField("rating", DoubleType, true) :: Nil
)
val ratingsDF = spark.createDataFrame(ratingsRDD.map(r => Row(r._1, r._2)), schema)
```
3. 使用 DataFrame API 进行统计:
```
import org.apache.spark.sql.functions._
val countDF = ratingsDF.groupBy("movieId").agg(count("rating").as("numRatings"))
```
这将按电影ID对评分信息进行分组,并计算每部电影的评分人数。
阅读全文