编写pyspark代码导入csv文件中的数据,并进行数据清洗
时间: 2024-12-13 18:25:02 浏览: 7
在PySpark中,我们可以使用`SparkSession`和`DataFrameReader`模块来读取CSV文件并进行数据清洗。以下是基本步骤:
```python
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder.appName("Data Cleaning").getOrCreate()
# 读取CSV文件
df = spark.read.format('csv') \
.option('header', 'true') \ # 如果CSV文件有标题行,设置此选项为True
.option('inferSchema', 'true') \ # 自动推断列的数据类型
.option('delimiter', ',') \ # 如果不是默认的逗号分隔,可以指定分隔符
.load('path_to_your_csv_file.csv')
# 数据清洗示例:
# 删除含有空值的行
df = df.dropna()
# 更改数据类型,例如将日期字符串转换为日期类型
df = df.withColumn('date_column', df['date_column'].cast('date'))
# 处理重复项,可以选择保留第一条或最后一条
df = df.distinct() if you_want_unique_rows else df.dropDuplicates()
# 过滤特定条件的数据,例如只保留age大于18的行
df = df.filter(df['age'] > 18)
# 输出清洗后的数据量以检查清洗效果
print("Cleaned data size:", df.count())
# 保存清洗后的数据到新的CSV文件
df.write.format('csv').save('output_cleaned_data.csv', header=True)
```
记得替换`path_to_your_csv_file.csv`和实际的数据清洗需求。完成以上操作后,你就有了一个经过初步清洗的DataFrame。
阅读全文