spark系列二:load和save
时间: 2023-04-12 18:03:08 浏览: 180
Spark系列二:load和save是Spark中用于读取和保存数据的API。load函数可以从不同的数据源中读取数据,如HDFS、本地文件系统、Hive、JDBC等,而save函数可以将数据保存到不同的数据源中,如HDFS、本地文件系统、Hive、JDBC等。
load函数的语法如下:
```
spark.read.format(source).option(key, value).load(path)
```
其中,source表示数据源的类型,如"csv"、"json"、"parquet"等;key和value表示读取数据时的选项,如"header"表示是否包含表头,"inferSchema"表示是否自动推断数据类型等;path表示数据源的路径。
save函数的语法如下:
```
df.write.format(source).option(key, value).save(path)
```
其中,df表示要保存的数据集,source表示数据源的类型,如"csv"、"json"、"parquet"等;key和value表示保存数据时的选项,如"header"表示是否包含表头,"mode"表示保存模式等;path表示数据保存的路径。
相关问题
java spark dataset_Spark 2.0介绍:Dataset介绍和使用
好的,Dataset是Spark 2.0引入的一个新的API,它是强类型的数据集合,旨在弥补DataFrame和RDD之间的差距,同时提供类型安全和面向对象的编程模型。
与DataFrame不同,Dataset不仅提供了类似于SQL的查询功能,还提供了更多的面向对象的API,因此更适合于开发人员进行复杂的数据分析和操作。
Dataset还具有更好的类型安全性,因为它们是强类型的,这意味着编译器可以在编译时捕获类型不匹配的错误,而不是在运行时抛出异常。
在使用Dataset时,我们可以使用Scala或Java编写类型安全的代码,并在运行时获得更好的性能。
下面是一个简单的Java示例,演示如何使用Dataset:
```
// 创建一个SparkSession对象
SparkSession spark = SparkSession.builder()
.appName("Java Dataset Example")
.master("local")
.getOrCreate();
// 读取一个CSV文件,并将其转换为Dataset
Dataset<Row> df = spark.read().format("csv")
.option("header", "true")
.load("path/to/file.csv");
// 将Dataset转换为类型安全的Java对象
Encoder<MyObject> myObjectEncoder = Encoders.bean(MyObject.class);
Dataset<MyObject> myObjects = df.as(myObjectEncoder);
// 进行复杂的操作
Dataset<MyObject> filtered = myObjects.filter(myObject -> myObject.getAge() > 18);
// 将结果保存到文件中
filtered.write().format("csv").save("path/to/output");
```
在这个示例中,我们首先读取一个CSV文件,并将其转换为DataFrame。然后,我们使用Encoder将DataFrame转换为类型安全的Java对象。接着,我们进行过滤操作,只选择年龄大于18岁的对象,并将结果保存到文件中。
希望这个简单的示例可以帮助你了解如何使用Dataset进行数据操作。
spark = SparkSession.builder. \ master("local[*]"). \ appName("SparkSQL_Example"). \ config("spark.sql.shuffle.partitions", 2). \ config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse"). \ config("hive.metastore.uris", "thrift://node1:9083"). \ enableHiveSupport(). \ getOrCreate() df = spark.read.format("json"). \ load("../data/mini.json"). \ dropna(thresh=1, subset=["storeProvince"]). \ filter("storeProvince != 'null'"). \ filter("receivable<10000"). \ select("receivable", "storeProvince", "dateTS", "storeID", "payType") # TODO 需求一: 各省销售额统计 province_sale_df = df.groupBy("storeProvince"). \ sum("receivable"). \ withColumn("sum(receivable)", f.round("sum(receivable)", 2)). \ orderBy("sum(receivable)", ascending=False). \ show() print(type(province_sale_df)) province_sale_df.write.mode("overwrite").\ format("jdbc").\ option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\ option("dbtable", "province_sale").\ option("user", "root").\ option("password", "123456").\ option("encoding", "utf-8").\ save()
在这段代码中,您正在使用SparkSession创建一个Spark应用程序,并将其连接到本地[*]主机。然后,您正在加载一个JSON文件,并在其中进行一些数据清理和筛选,然后将其分组,并按销售额对各省进行排序。接下来,您正在使用JDBC将此数据写入MySQL数据库表中。
但是,您在DataFrame上调用了`show()`方法,该方法返回的是一个DataFrame并非写入的内容,因此在尝试将其写入MySQL数据库之前,您需要将其保存到一个变量中。例如,您可以将`province_sale_df`变量中的结果保存到一个新的DataFrame中,然后将其写入MySQL数据库:
```
province_sale_result_df = df.groupBy("storeProvince") \
.sum("receivable") \
.withColumn("sum(receivable)", f.round("sum(receivable)", 2)) \
.orderBy("sum(receivable)", ascending=False)
province_sale_result_df.show()
province_sale_result_df.write.mode("overwrite") \
.format("jdbc") \
.option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8") \
.option("dbtable", "province_sale") \
.option("user", "root") \
.option("password", "123456") \
.option("encoding", "utf-8") \
.save()
```
这样就可以将结果保存到MySQL数据库中了。
阅读全文