spark基于dataframe和sparksql对hdfs文件夹下多个文件进行读、写、join等操作
时间: 2023-04-20 20:00:52 浏览: 278
Spark可以基于DataFrame和SparkSQL对HDFS文件夹下的多个文件进行读、写、join等操作。
首先,我们可以使用SparkSession来创建一个Spark应用程序,并读取HDFS文件夹下的多个文件:
```
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
df = spark.read.format("csv").option("header", "true").load("hdfs://path/to/folder/*.csv")
```
上述代码中,我们使用了SparkSession来创建一个名为“example”的Spark应用程序,并使用`read`方法从HDFS文件夹中读取所有以.csv结尾的文件,并将它们合并成一个DataFrame。
接下来,我们可以对DataFrame进行各种操作,例如过滤、聚合、排序等:
```
df_filtered = df.filter(df["age"] > 30)
df_grouped = df.groupBy("gender").agg({"salary": "mean"})
df_sorted = df.sort(df["age"].desc())
```
上述代码中,我们使用了DataFrame的`filter`方法来过滤出年龄大于30岁的记录,使用`groupBy`和`agg`方法来按性别分组并计算平均工资,以及使用`sort`方法按年龄降序排序。
最后,我们可以使用DataFrame的`write`方法将结果写入HDFS:
```
df_filtered.write.format("csv").option("header", "true").save("hdfs://path/to/output/folder")
df_grouped.write.format("csv").option("header", "true").save("hdfs://path/to/output/folder")
df_sorted.write.format("csv").option("header", "true").save("hdfs://path/to/output/folder")
```
上述代码中,我们使用了DataFrame的`write`方法将过滤、分组和排序后的结果分别写入HDFS中的一个输出文件夹中。
阅读全文