from pyspark.sql import SparkSession import matplotlib.pyplot as plt # 指定Mysql的配置 from pyspark.sql.types import StructField, StringType, StructType options = { "url": "jdbc:mysql://localhost:3306/test?useSSL=true", "driver": "com.mysql.jdbc.Driver", "dbtable": "(SELECT publishTime from travels) t1", "user": "root", "password": "root" } spark = SparkSession.builder.getOrCreate() # 加载Mysql数据 data = spark.read.format("jdbc").options(**options).load() # 将每一行的taglist转为列表 def convert_to_quarter(line): val = line[0].split("-") if val[1] in ["01", "02", "03"]: return "春季", 1 elif val[1] in ["04", "05", "06"]: return "夏季", 1 elif val[1] in ["07", "08", "09"]: return "秋季", 1 elif val[1] in ["10", "11", "12"]: return "冬季", 1 zeroValue = 0 rdd = data.rdd.map(lambda line: convert_to_quarter(line)).foldByKey(zeroValue, lambda v, x: v + x) schemaString = "quarter count" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) schema_data = spark.createDataFrame(rdd, schema).orderBy("count", ascending=False) # 将数据转换为Pandas数据帧 result_pdf = schema_data.limit(5).toPandas() # 设置matplotlib支持中文 plt.rcParams['font.family'] = ['sans-serif'] plt.rcParams['font.sans-serif'] = ['SimHei'] # colors=color, explode=explode, plt.pie(result_pdf["count"], labels=result_pdf["quarter"], shadow=True, autopct='%1.1f%%') plt.legend() plt.show()每行代码解释
时间: 2023-07-19 22:36:10 浏览: 85
import numpy as np import matplotlib.pyplot as plt import math
这段代码主要是通过 PySpark 读取 MySQL 中的数据,将每行数据的日期按季度进行分类统计,并将结果转换为 Pandas 数据帧,最后使用 Matplotlib 绘制饼图。
具体解释如下:
1. 导入必要的库
```
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from pyspark.sql.types import StructField, StringType, StructType
```
2. 配置 MySQL 的连接信息
```
options = {
"url": "jdbc:mysql://localhost:3306/test?useSSL=true",
"driver": "com.mysql.jdbc.Driver",
"dbtable": "(SELECT publishTime from travels) t1",
"user": "root",
"password": "root"
}
```
这里指定了 MySQL 数据库的 URL、驱动程序、表名、用户名和密码等信息。
3. 创建 SparkSession 对象
```
spark = SparkSession.builder.getOrCreate()
```
4. 加载 MySQL 数据
```
data = spark.read.format("jdbc").options(**options).load()
```
使用 PySpark 从 MySQL 中读取数据,并将其转换为 DataFrame 对象。
5. 将每一行的日期转换为季度并进行统计
```
# 将每一行的日期转换为季度并进行统计
def convert_to_quarter(line):
val = line[0].split("-")
if val[1] in ["01", "02", "03"]:
return "春季", 1
elif val[1] in ["04", "05", "06"]:
return "夏季", 1
elif val[1] in ["07", "08", "09"]:
return "秋季", 1
elif val[1] in ["10", "11", "12"]:
return "冬季", 1
zeroValue = 0
rdd = data.rdd.map(lambda line: convert_to_quarter(line)).foldByKey(zeroValue, lambda v, x: v + x)
```
上述代码将每行数据的日期转换为季度,并将其作为 key 进行统计,最终得到每个季度的数量。
6. 将结果转换为 Pandas 数据帧
```
schemaString = "quarter count"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
schema_data = spark.createDataFrame(rdd, schema).orderBy("count", ascending=False)
result_pdf = schema_data.limit(5).toPandas()
```
这里将 PySpark 的 DataFrame 对象转换为 Pandas 的数据帧,方便后续的可视化操作。
7. 绘制饼图
```
plt.rcParams['font.family'] = ['sans-serif']
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.pie(result_pdf["count"], labels=result_pdf["quarter"], shadow=True, autopct='%1.1f%%')
plt.legend()
plt.show()
```
最后使用 Matplotlib 绘制饼图,显示每个季度的数量占比。其中设置了字体为中文,以及饼图的阴影和百分比显示等参数。
阅读全文