sparksql 基本操作
时间: 2023-12-02 09:42:59 浏览: 82
以下是 Spark SQL 的基本操作方法:
1. 创建 SparkSession 对象
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
```
2. 读取数据源并创建 DataFrame
```python
df = spark.read.format("csv").option("header", "true").load("path/to/file.csv")
```
3. 注册 DataFrame 为临时表
```python
df.createOrReplaceTempView("temp_table_name")
```
4. 执行 SQL 查询
```python
result = spark.sql("SELECT * FROM temp_table_name WHERE column_name = 'value'")
```
5. 将 DataFrame 转换为 RDD
```python
rdd = df.rdd
```
6. 将 RDD 转换为 DataFrame
```python
from pyspark.sql import Row
rdd = sc.parallelize([(1, "John"), (2, "Bob"), (3, "Alice")])
df = rdd.map(lambda x: Row(id=x[0], name=x[1])).toDF()
```
7. 将 DataFrame 保存到数据源
```python
df.write.format("csv").option("header", "true").save("path/to/save")
```
演示:
假设我们有一个 csv 文件,包含以下内容:
```
id,name,age
1,John,25
2,Bob,30
3,Alice,28
```
我们可以使用以下代码读取该文件并创建 DataFrame:
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
df = spark.read.format("csv").option("header", "true").load("path/to/file.csv")
df.show()
```
输出结果为:
```
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1| John| 25|
| 2| Bob| 30|
| 3|Alice| 28|
+---+-----+---+
```
接下来,我们可以将 DataFrame 注册为临时表并执行 SQL 查询:
```python
df.createOrReplaceTempView("temp_table_name")
result = spark.sql("SELECT * FROM temp_table_name WHERE age > 25")
result.show()
```
输出结果为:
```
+---+-----+---+
| id| name|age|
+---+-----+---+
| 2| Bob| 30|
| 3|Alice| 28|
+---+-----+---+
```
阅读全文