from pyspark import SparkConf from pyspark.sql import SparkSession import pyspark.sql.functions as f def data_process(raw_data_path): spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate() business = spark.read.json(raw_data_path) split_col = f.split(business['categories'], ',') business = business.withColumn("categories", split_col).filter(business["city"] != "").dropna() business.createOrReplaceTempView("business") b_etl = spark.sql("SELECT business_id, name, city, state, latitude, longitude, stars, review_count, is_open, categories, attributes FROM business").cache() b_etl.createOrReplaceTempView("b_etl") outlier = spark.sql( "SELECT b1.business_id, SQRT(POWER(b1.latitude - b2.avg_lat, 2) + POWER(b1.longitude - b2.avg_long, 2)) \ as dist FROM b_etl b1 INNER JOIN (SELECT state, AVG(latitude) as avg_lat, AVG(longitude) as avg_long \ FROM b_etl GROUP BY state) b2 ON b1.state = b2.state ORDER BY dist DESC") outlier.createOrReplaceTempView("outlier") joined = spark.sql("SELECT b.* FROM b_etl b INNER JOIN outlier o ON b.business_id = o.business_id WHERE o.dist<10") joined.write.parquet("file:///home/hadoop/wangyingmin/yelp-etl/business_etl", mode="overwrite") if __name__ == "__main__": raw_hdfs_path = 'file:///home/hadoop/wangyingmin/yelp_academic_dataset_business.json' print("Start cleaning raw data!") data_process(raw_hdfs_path) print("Successfully done")注释
时间: 2024-04-03 16:35:30 浏览: 22
这是一段使用 PySpark 进行数据清洗和处理的代码。具体来说,代码包含以下步骤:
1. 创建一个 SparkSession 对象,使用 SparkConf 进行配置。
2. 读取原始数据文件,使用 split 函数对 categories 列进行分割,过滤掉 city 列为空或者为 null 的行。
3. 将处理后的数据保存为一个临时表 business。
4. 从 business 表中选择需要的字段,计算每个州内所有商家的平均经纬度,并根据与平均经纬度的距离进行排序,得到异常值。
5. 将异常值与 business 表进行内连接,得到最终的数据集。
6. 将最终数据集保存为 Parquet 格式文件。
代码中还包含了一些注释,对代码的每个步骤进行了简单的说明。
相关问题
from pyspark.sql import SparkSession import logging logging.basicConfig(level="WARN")
这段代码的作用是导入SparkSession类和logging模块,并设置logging模块的日志级别为WARN。SparkSession类是Spark 2.0版本引入的新的API,用于创建Spark应用程序的入口点,并提供了许多用于数据处理的方法。logging模块用于记录程序的运行日志,通过设置日志级别可以控制记录的日志信息的详细程度。在这里,将日志级别设置为WARN,表示只记录警告级别及以上的日志信息。
from .data import sql_data ImportError: attempted relative import with no known parent package
This error occurs when you try to do a relative import in a module that doesn't have a known parent package. To fix this error, you need to make sure that the module you're importing from is in a package and that the package is on the Python path.
One way to fix this error is to use an absolute import instead of a relative import. For example, if you have a package called "mypackage" and a module called "mymodule" that needs to import a module called "sql_data", you can use an absolute import like this:
from mypackage.data import sql_data
Alternatively, you can make sure that the module you're importing from is in a package. To do this, you need to create an empty __init__.py file in the directory that contains the module. This tells Python that the directory is a package and allows you to do relative imports.
Once you've done this, you can use a relative import like this:
from .data import sql_data
Note that the dot before "data" indicates that you're doing a relative import. The dot represents the current package or module, so this import statement is equivalent to "from mypackage.mymodule.data import sql_data".
相关推荐
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![pdf](https://img-home.csdnimg.cn/images/20210720083512.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)