sparksqls实现行列互转
时间: 2025-01-03 20:41:55 浏览: 6
### 使用 Spark SQL 实现实行列转换
#### 行转列(Pivoting)
行转列通常用于将多行数据聚合到单个结果集中,形成更宽泛的结果表。这可以通过 `PIVOT` 操作来完成。
对于简单的场景,可以使用内置函数如 `pivot()` 来实现实现:
```sql
SELECT department,
SUM(CASE WHEN gender = 'Male' THEN salary ELSE 0 END) AS male_salary,
SUM(CASE WHEN gender = 'Female' THEN salary ELSE 0 END) AS female_salary
FROM employees
GROUP BY department;
```
当涉及到更多动态值时,则应采用标准的 `PIVOT` 语句[^3]:
```sql
SELECT *
FROM (
SELECT department, gender, salary
FROM employees
)
PIVOT(
SUM(salary) FOR gender IN ('Male', 'Female')
);
```
此查询会创建一个新的表格视图,在其中每个性别对应的薪资总和被展示在同一行内不同列下。
#### 列转行(Unpivoting)
相反的过程——即把多个属性展开成若干条记录的形式称为“unpivot”。虽然 Spark SQL 并未直接提供 unpivot 的功能,但是可以通过自定义的方式轻松达成目的。
一种方法是利用 union all 结合 case when 或者 explode 函数配合 map 类型来进行转换:
```scala
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(
(1, "A", 100),
(2, "B", 200)
)).toDF("id", "category", "value")
// Method using Union All with Case When
df.select($"id",
expr("CASE WHEN category='A' THEN value END as A"),
expr("CASE WHEN category='B' THEN value END as B"))
.na.fill(0)
// Or use Explode function combined with Map type
df.withColumn("pairs", arrays_zip(array(lit("A"), lit("B")), array(col("value"))))
.withColumn("exploded_pairs", explode_outer($"pairs"))
.select($"id", $"exploded_pairs.*")
```
第二种方式更加简洁高效,并且能够很好地处理缺失值的情况。
为了更好地理解这两种变换的实际应用场景以及它们之间相互作用的影响,建议读者尝试构建一些实际案例并观察其效果变化。
阅读全文