scala spark groupby count效率高还是agg count效率高
时间: 2023-08-01 21:07:21 浏览: 51
在 Spark 中,使用 groupBy 和 agg 进行计数是两种常见的方式。实际上,它们的效率取决于具体的数据和操作。一般情况下,使用 agg 进行计数的效率更高,因为它可以在单个阶段完成计算,并且可以避免数据移动。而使用 groupBy 进行计数则需要多个阶段,因为它需要将数据进行分组,并在每个分组内进行计算。但是,如果数据量非常小,使用 groupBy 进行计数也可能会比使用 agg 更快。因此,选择哪种方式需要根据具体的情况进行评估和测试。
相关问题
用Spark SQL分析,找出每种类型的评分最高10部电影
好的,下面是使用Spark SQL分析,找出每种类型的评分最高10部电影的示例代码:
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
// 读取movies、ratings和movie_genres三张表的数据
val movies = spark.read.format("csv").option("header", "true").load("movies.csv")
val ratings = spark.read.format("csv").option("header", "true").load("ratings.csv")
val movieGenres = spark.read.format("csv").option("header", "true").load("movie_genres.csv")
// 将movie_genres表进行展开,将每个电影的每个类型都展开成一行
val explodedGenres = movieGenres.withColumn("genre", explode(split(col("genres"), "\\|")))
.drop("genres")
// 将ratings和explodedGenres表与movies表进行JOIN,获得每部电影的评分和类型信息
val joinedData = ratings.join(movies, Seq("movie_id")).join(explodedGenres, Seq("movie_id"))
// 计算每种类型的电影的评分,并按照类型和评分降序排列
val genreRatings = joinedData.groupBy("genre", "title")
.agg(avg("rating").as("average_rating"), count("rating").as("rating_count"))
.withColumn("rank", dense_rank().over(Window.partitionBy("genre").orderBy(desc("average_rating"))))
.filter($"rank" <= 10)
.orderBy("genre", "rank")
// 打印结果
genreRatings.show()
```
在这个代码中,我们首先读取了movies、ratings和movie_genres三张表的数据,然后将movie_genres表进行展开,将每个电影的每个类型都展开成一行。接下来,我们将ratings和explodedGenres表与movies表进行JOIN,获得每部电影的评分和类型信息。
然后,我们按照类型和评分降序排列,计算每种类型的电影的评分,并使用窗口函数dense_rank()计算每种类型的电影评分排名。最后,我们筛选出每种类型评分最高的10部电影,并按照类型和排名升序排列。
注意,在实际使用时需要根据实际数据表结构来修改代码。
select substring(page_type,1,6) as page_type, count(*) as count_num, round((count(*)/411665.0)*100,4) as weights from law_part where page_type_part=101 group by substring(page_type,1,6) order by count_num desc;改成sparksql
下面是将该SQL语句转换为Spark SQL的代码:
```
SELECT SUBSTRING(page_type,1,6) AS page_type, COUNT(*) AS count_num, ROUND((COUNT(*)/411665.0)*100,4) AS weights
FROM law_part
WHERE page_type_part=101
GROUP BY SUBSTRING(page_type,1,6)
ORDER BY count_num DESC;
```
转换为Spark SQL后的代码:
```scala
import org.apache.spark.sql.functions._
val resultDF = law_part
.filter($"page_type_part" === 101)
.groupBy(substring($"page_type", 1, 6).as("page_type"))
.agg(count("*").as("count_num"))
.withColumn("weights", round((col("count_num")/411665.0)*100, 4))
.orderBy(desc("count_num"))
```
其中,`law_part`是你的数据表,可以通过`spark.read.jdbc()`或其他方式加载数据。