def passed_temperature_analyse(filename): print("开始分析气温") # spark = SparkSession.builder.master("spark://master:7077").appName("passed_temperature_analyse").getOrCreate() spark = SparkSession.builder.master("local").appName("passed_temperature_analyse").getOrCreate() # spark = SparkSession.builder.master("local[4]").appName("passed_rain_analyse").getOrCreate() df = spark.read.csv(filename, header=True) df_temperature = df.select( # 选择需要的列 df['province'], df['city_name'], df['city_code'], df['temperature'].cast(DecimalType(scale=1)), F.date_format(df['time'], "yyyy-MM-dd").alias("date"), # 得到日期数据 F.hour(df['time']).alias("hour") # 得到小时数据 ) # 筛选四点时次 # df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2,4,6,8])) df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 14, 20])) # df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24])) df_avg_temperature = df_4point_temperature.groupBy("province", "city_name", "city_code", "date").agg( F.count("temperature"), F.avg("temperature").alias("avg_temperature")).filter("count(temperature) = 4").sort( F.asc("avg_temperature")).select("province", "city_name", "city_code", "date", F.format_number('avg_temperature', 1).alias("avg_temperature")) df_avg_temperature.cache() avg_temperature_list = df_avg_temperature.collect() df_avg_temperature.coalesce(1).write.json("file:///home/lee/lab5/passed_temperature.json") print("气温分析完毕") return avg_temperature_list[0:10] # 最低的10个
时间: 2024-04-24 15:22:29 浏览: 156
这是一个使用Spark进行气温分析的Python函数。函数接收一个文件名作为参数,然后读取该文件中的数据,对数据进行处理,最后返回气温最低的10个城市的信息。
函数首先创建一个SparkSession对象,用于与Spark集群进行交互。然后读取指定文件中的数据,选择需要的列,并将温度数据转换为Decimal类型。接着通过日期和小时对数据进行处理,筛选出4个时次的气温记录。然后将数据按省份、城市、日期进行分组,计算每个城市在4个时次的平均气温,并筛选出记录条数为4的数据。最后按平均气温从小到大排序,并将结果写入到JSON文件中。
函数返回气温最低的10个城市的信息,以列表形式返回。
相关问题
def passed_rain_analyse(filename): # 计算各个城市过去24小时累积雨量 print("开始分析累积降雨量") # spark = SparkSession.builder.master("spark://master:7077").appName("passed_rain_analyse").getOrCreate() # spark = SparkSession.builder.master("local[4]").appName("passed_rain_analyse").getOrCreate() spark = SparkSession.builder.master("local").appName("passed_rain_analyse").getOrCreate() df = spark.read.csv(filename, header=True) df_rain = df.select(df['province'], df['city_name'], df['city_code'], df['rain1h'].cast(DecimalType(scale=1))).filter(df['rain1h'] < 1000) # 筛选数据,去除无效数据 df_rain_sum = df_rain.groupBy("province", "city_name", "city_code").agg(F.sum("rain1h").alias("rain24h")).sort( F.desc("rain24h")) # 分组、求和、排序 df_rain_sum.cache() df_rain_sum.coalesce(1).write.csv("file:///home/lee/lab5/passed_rain_analyse.csv") # spark.catalog.refreshTable(filename) print("累积降雨量分析完毕!") return df_rain_sum.head(20)
这段代码使用了PySpark进行对城市过去24小时累积雨量的分析,具体解释如下:
1. 首先,定义了一个名为passed_rain_analyse的函数,并接受一个文件名作为参数,用于读取数据文件。
2. 创建了一个SparkSession对象spark,用于连接到Spark集群或者在本地启动一个Spark应用程序。
3. 使用spark.read.csv函数读取数据文件,同时设置header=True以读取CSV文件的第一行作为列名。
4. 对读取的DataFrame对象进行筛选操作,使用select函数选择"province"、"city_name"、"city_code"、"rain1h"四个字段,并使用cast函数将"rain1h"字段转换为DecimalType类型。同时,使用filter函数去除"rain1h"字段大于等于1000的无效数据。
5. 使用groupBy函数将DataFrame对象按照"province"、"city_name"、"city_code"三个字段进行分组,然后使用agg函数对每个分组进行聚合操作,计算"rain1h"字段的和,并将结果存储在"rain24h"字段中,并使用sort函数按照"rain24h"字段进行降序排序。
6. 使用cache函数将DataFrame对象缓存,加速后续的查询操作。
7. 使用coalesce函数将结果写入CSV文件中。
8. 最后,返回分析结果中的前20行数据。
总体来说,该函数使用了PySpark的DataFrame API进行数据处理和计算,实现了对城市过去24小时累积雨量的分析,并将结果存储在CSV文件中。
spark = SparkSession.builder.master("local").appName("passed_rain_analyse").getOrCreate()
这段代码是使用 Spark 构建一个本地的 SparkSession,用于启动一个 Spark 应用程序并分析降雨数据。其中:
- `SparkSession` 是 Spark 2.0 引入的新的入口点,用于创建 DataFrame、执行 SQL 查询等操作。
- `builder` 方法用于创建一个 Builder 对象,用于设置 SparkSession 的各种参数。
- `master("local")` 设置 Spark 应用程序的运行模式为本地模式,也就是在本地运行 Spark 集群。
- `appName("passed_rain_analyse")` 设置应用程序的名称为 "passed_rain_analyse"。
- `getOrCreate()` 用于获取一个已存在的 SparkSession 或创建一个新的 SparkSession。
这段代码的作用是为后续的降雨数据分析提供一个 Spark 运行环境,其中,"passed_rain_analyse" 是应用程序的名称,"local" 表示运行模式为本地模式。
阅读全文