from pyspark import SparkConf from pyspark.sql import SparkSession import pyspark.sql.functions as f def data_process(raw_data_path): spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate() business = spark.read.json(raw_data_path) split_col = f.split(business['categories'], ',') business = business.withColumn("categories", split_col).filter(business["city"] != "").dropna() business.createOrReplaceTempView("business") b_etl = spark.sql("SELECT business_id, name, city, state, latitude, longitude, stars, review_count, is_open, categories, attributes FROM business").cache() b_etl.createOrReplaceTempView("b_etl") outlier = spark.sql( "SELECT b1.business_id, SQRT(POWER(b1.latitude - b2.avg_lat, 2) + POWER(b1.longitude - b2.avg_long, 2)) \ as dist FROM b_etl b1 INNER JOIN (SELECT state, AVG(latitude) as avg_lat, AVG(longitude) as avg_long \ FROM b_etl GROUP BY state) b2 ON b1.state = b2.state ORDER BY dist DESC") outlier.createOrReplaceTempView("outlier") joined = spark.sql("SELECT b.* FROM b_etl b INNER JOIN outlier o ON b.business_id = o.business_id WHERE o.dist<10") joined.write.parquet("file:///home/hadoop/wangyingmin/yelp-etl/business_etl", mode="overwrite") if __name__ == "__main__": raw_hdfs_path = 'file:///home/hadoop/wangyingmin/yelp_academic_dataset_business.json' print("Start cleaning raw data!") data_process(raw_hdfs_path) print("Successfully done")注释
时间: 2024-04-03 15:35:30 浏览: 109
基于智能温度监测系统设计.doc
这是一段使用 PySpark 进行数据清洗和处理的代码。具体来说,代码包含以下步骤:
1. 创建一个 SparkSession 对象,使用 SparkConf 进行配置。
2. 读取原始数据文件,使用 split 函数对 categories 列进行分割,过滤掉 city 列为空或者为 null 的行。
3. 将处理后的数据保存为一个临时表 business。
4. 从 business 表中选择需要的字段,计算每个州内所有商家的平均经纬度,并根据与平均经纬度的距离进行排序,得到异常值。
5. 将异常值与 business 表进行内连接,得到最终的数据集。
6. 将最终数据集保存为 Parquet 格式文件。
代码中还包含了一些注释,对代码的每个步骤进行了简单的说明。
阅读全文