pyspark 操作Delta Lake工具类
时间: 2023-11-24 20:06:56 浏览: 95
以下是使用 PySpark 操作 Delta Lake 工具类的示例代码:
1. 导入必要的库和创建 SparkSession 对象
```python
from pyspark.sql import SparkSession
from delta.tables import *
spark = SparkSession.builder.appName("DeltaLakeExample").getOrCreate()
```
2. 创建 Delta Lake 表
```python
# 创建一个 DataFrame
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
df = spark.createDataFrame(data, ["name", "age"])
# 将 DataFrame 写入 Delta Lake 表
df.write.format("delta").save("/path/to/delta-table")
```
3. 读取 Delta Lake 表
```python
# 从 Delta Lake 表中读取数据
df = spark.read.format("delta").load("/path/to/delta-table")
df.show()
```
4. 更新 Delta Lake 表
```python
# 更新 Delta Lake 表中的数据
deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")
deltaTable.update("age > 1", {"age": "age + 1"})
```
5. 删除 Delta Lake 表中的数据
```python
# 删除 Delta Lake 表中的数据
deltaTable.delete("age > 2")
```
阅读全文