spark = SparkSession.builder. \ master("local[*]"). \ appName("SparkSQL_Example"). \ config("spark.sql.shuffle.partitions", 2). \ config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse"). \ config("hive.metastore.uris", "thrift://node1:9083"). \ enableHiveSupport(). \ getOrCreate() df = spark.read.format("json"). \ load("../data/mini.json"). \ dropna(thresh=1, subset=["storeProvince"]). \ filter("storeProvince != 'null'"). \ filter("receivable<10000"). \ select("receivable", "storeProvince", "dateTS", "storeID", "payType") # TODO 需求一: 各省销售额统计 province_sale_df = df.groupBy("storeProvince"). \ sum("receivable"). \ withColumn("sum(receivable)", f.round("sum(receivable)", 2)). \ orderBy("sum(receivable)", ascending=False). \ show() print(type(province_sale_df)) province_sale_df.write.mode("overwrite").\ format("jdbc").\ option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\ option("dbtable", "province_sale").\ option("user", "root").\ option("password", "123456").\ option("encoding", "utf-8").\ save()
时间: 2023-07-03 17:19:54 浏览: 169
ANT_发布项目中_build.xml_文件的详细配置
4星 · 用户满意度95%
在这段代码中,您正在使用SparkSession创建一个Spark应用程序,并将其连接到本地[*]主机。然后,您正在加载一个JSON文件,并在其中进行一些数据清理和筛选,然后将其分组,并按销售额对各省进行排序。接下来,您正在使用JDBC将此数据写入MySQL数据库表中。
但是,您在DataFrame上调用了`show()`方法,该方法返回的是一个DataFrame并非写入的内容,因此在尝试将其写入MySQL数据库之前,您需要将其保存到一个变量中。例如,您可以将`province_sale_df`变量中的结果保存到一个新的DataFrame中,然后将其写入MySQL数据库:
```
province_sale_result_df = df.groupBy("storeProvince") \
.sum("receivable") \
.withColumn("sum(receivable)", f.round("sum(receivable)", 2)) \
.orderBy("sum(receivable)", ascending=False)
province_sale_result_df.show()
province_sale_result_df.write.mode("overwrite") \
.format("jdbc") \
.option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8") \
.option("dbtable", "province_sale") \
.option("user", "root") \
.option("password", "123456") \
.option("encoding", "utf-8") \
.save()
```
这样就可以将结果保存到MySQL数据库中了。
阅读全文