Spark 2.11 DataFrame创建全解析:多种方式与源码洞察

3 下载量 180 浏览量 更新于2024-08-29 收藏 40KB PDF 举报
在Spark 2.0版本中,DataFrame是Apache Spark的重要数据处理工具,它提供了一种结构化的、可以进行SQL查询的数据集。本文将详细介绍如何使用Spark 2.x的各种方式来创建DataFrame,包括案列类(CaseClass)、元组(Tuple)、JavaBean对象、JSON、Row对象、集合(Set和Map)、数组以及Parquet数组等数据结构。 首先,我们来看创建DataFrame的几种方法: 1. CaseClass创建DataFrame: 使用Scala的CaseClass(类似于Java的POJO)可以直接转化为DataFrame。CaseClass提供了结构化数据的定义,可以直接通过`toDF()`方法将CaseClass实例转换为DataFrame。例如,定义一个`Stu`案列类: ```scala case class Stu(name: String, age: Int, city: String, score: Double) ``` 可以这样创建DataFrame: ```scala val stuList = List(Stu("张飞", 21, "北京", 80.0), ... ) // 填充列表 val stuDF = spark.createDataFrame(stuList.map(stu => Row(stu.name, stu.age, stu.city, stu.score)), StructType(Seq( StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("city", StringType, true), StructField("score", DoubleType, true) ))) ``` 2. Tuple创建DataFrame: Spark支持元组类型,可以将元组转换为DataFrame。例如,使用`(String, Int, String, Double)`的元组创建DataFrame: ```scala val tupleList = List(("张飞", 21, "北京", 80.0), ...) // 填充列表 val tupleDF = spark.createDataFrame(tupleList, StructType(Seq( StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("city", StringType, true), StructField("score", DoubleType, true) ))) ``` 3. JavaBean创建DataFrame: 如果数据源是JavaBean对象,需要先将其序列化为RDD,然后使用`createDataFrame()`函数创建DataFrame。例如,假设有一个`Student`类: ```java public class Student { private String name; private int age; // getters and setters } ``` 将JavaBean转换为RDD后创建DataFrame: ```java JavaRDD<Student> javaRDD = ... // 从某种数据源获取JavaRDD JavaPairRDD<String, Student> pairRDD = javaRDD.mapToPair(s -> new Tuple2<>(s.getName(), s)); DataFrame df = spark.createDataFrame(pairRDD, new StructType().add("name", StringType).add("student", Student.class)); ``` 4. Json创建DataFrame: 通过读取Json文件或字符串,可以使用`SparkSession.read.json()`方法创建DataFrame。例如: ```scala val jsonString = "[{'name': '张飞', 'age': 21, 'city': '北京', 'score': 80.0}, ...]" val jsonDF = spark.read.json(Seq(jsonString).toDS) ``` 5. Row创建DataFrame: 使用Row直接构造DataFrame,Row是Spark中的轻量级、不可变的行对象: ```scala val rowList = List(Row("张飞", 21, "北京", 80.0), ...) // 填充列表 val rowDF = spark.createDataFrame(rowList, schema) ``` 6. 集合(Set和Map)创建DataFrame: 集合类型通常需要先转换为其他形式,如列表或数组,再创建DataFrame。 7. Array数组创建DataFrame: 类似于集合,Array也需要转换为列表或序列。 8. Parquet数组创建DataFrame: 如果数据存储在Parquet格式中,可以利用Spark的Parquet读取功能直接创建DataFrame。 在实际操作中,选择哪种方式取决于数据的原始来源和使用场景。Maven依赖部分列出了创建DataFrame时可能需要的一些库,如`scala-library`、`spark-sql`、`mysql-connector-java`等,它们分别用于不同功能的支持。 在分析和源码解析部分,会深入探讨这些方法背后的原理,包括Spark SQL的隐式转换机制、RDD与DataFrame之间的关系,以及如何处理不同类型的数据转换和映射过程。理解这些核心概念有助于更好地使用Spark DataFrame进行数据处理和分析。