将统计结果存放在 MySQL 数据库 mydb 的 cancelreason 表中(表已经提前创建)。
时间: 2024-02-24 16:00:46 浏览: 115
将sql查询结果保存到数据表
5星 · 资源好评率100%
好的,下面是将统计结果存入 MySQL 数据库的代码:
```python
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import mysql.connector
conf = SparkConf().setMaster("local").setAppName("CancelReasonCount")
sc = SparkContext(conf=conf)
# 读取数据
data = sc.textFile("/data/workspace/myshixun/data/canceldata.txt")
# 过滤出撤销理由不为未知的数据
valid_data = data.filter(lambda x: "未知" not in x)
# 将每个撤销理由的计数累加
cancel_reason_count = valid_data.map(lambda x: x.split("|")[9]) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda x, y: x + y)
# 取前 10 种理由
top_10_reasons = cancel_reason_count.takeOrdered(10, key=lambda x: -x[1])
# 创建 SparkSession
spark = SparkSession.builder.appName("CancelReasonCount").getOrCreate()
# 将结果转换为 DataFrame
df = spark.createDataFrame(top_10_reasons, ["reason", "count"])
# 将结果写入 MySQL 数据库
df.write.format("jdbc").options(
url="jdbc:mysql://localhost/mydb",
driver="com.mysql.cj.jdbc.Driver",
dbtable="cancelreason",
user="root",
password="password"
).mode("overwrite").save()
```
这段代码在统计完成后,将结果转换为 Spark DataFrame,并将 DataFrame 存入 MySQL 数据库中。你需要将代码中的数据库连接信息(url、user、password)替换为你自己的数据库连接信息。
阅读全文