pyspark 常见命令
时间: 2023-11-22 12:43:04 浏览: 200
09-SparkV1.2(PySpark)-LAPTOP-G48G0MSR.docx
pyspark是一个用于大规模数据处理的开源分布式计算框架。下面是一些常见的pyspark命令:
1. 读取文件并定义模式:
```
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, StringType, IntegerType
schema = StructType([
StructField('x1', StringType()),
StructField('x2', DoubleType())
])
xs = spark.read.schema(schema)\
.option('header', 'false')\
.csv(path.format(s3_bucket), sep='\\t')\
.select(*sel_col)
```
2. 查询字段:
```
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.types import Row
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
sc = spark.sparkContext
rd = sc.textFile("D:\data\people.txt")
rd2 = rd.map(lambda x: x.split(","))
people = rd2.map(lambda p: Row(name=p[0], age=int(p[1])))
peopleDF = spark.createDataFrame(people)
peopleDF.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name, age FROM people WHERE name='Andy'")
teenagers.show(5)
teenNames = teenagers.rdd.map(lambda p: p.age).collect()
for name in teenNames:
print(name)
```
3. 添加列:
```
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
df = df.withColumn('new_col', F.monotonically_increasing_id())\
.withColumn('row_number', F.row_number().over(windowSpec))
```
4. 重命名列:
```
df = df.withColumnRenamed('X1', 'newname')
```
5. 排序/删除列:
```
df.sort('x1').drop('x1')
```
阅读全文