spark数据预处理(数据可能不完整、不一致、重复、含噪音等)代码
时间: 2024-09-21 17:09:41 浏览: 33
Spark在数据预处理中提供了一系列功能,可以处理缺失值、异常值、重复项以及数据清洗等问题。以下是一个简单的例子,展示如何使用Apache Spark DataFrame API进行数据预处理:
```python
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设我们有一个包含原始数据的DataFrame df
df_original = spark.read.format("csv") \
.option("header", "true") \
.load("your_data.csv")
# 数据预处理步骤:
# 1. 检查并处理缺失值(NaN)
df_cleaned = df_original.fillna(value={'column_name': 'default_value'}) # 使用默认值填充
# 或者更复杂的方式,如平均值、众数等
df_cleaned = df_original.na.replace({'column_name': df_cleaned.groupby('column_name').mean()['column_name']})
# 2. 删除重复项
df_no_duplicates = df_cleaned.dropDuplicates()
# 3. 数据类型转换
df_casted = df_no_duplicates.withColumn("column_name", df_no_duplicates["column_name"].cast("data_type"))
# 4. 清理异常值
def clean_outliers(column_name, threshold):
lower_bound = df_casted[column_name].quantile(threshold)
upper_bound = df_casted[column_name].quantile(1 - threshold)
return df_casted[(df_casted[column_name] > lower_bound) & (df_casted[column_name] < upper_bound)]
df_cleaned = clean_outliers('column_name', 0.05) # 示例,去除5%四分位之外的数据
# 结果保存
df_cleaned.write.format("csv").save("cleaned_data.csv")
阅读全文