配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表8-2所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。
时间: 2023-11-28 07:48:02 浏览: 143
首先需要下载并安装MySQL JDBC驱动程序。然后在Spark中使用如下代码连接MySQL数据库:
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MySQLIntegration").getOrCreate()
jdbc_url = "jdbc:mysql://localhost:3306/test"
connection_properties = {
"user": "your_username",
"password": "your_password",
"driver": "com.mysql.cj.jdbc.Driver"
}
df = spark.read.jdbc(url=jdbc_url, table="person", properties=connection_properties)
```
其中,`jdbc_url`是连接MySQL数据库的URL,`connection_properties`包含连接数据库所需的用户名、密码和驱动程序。
接下来,可以使用DataFrame的`write`方法将数据插入到MySQL中:
```python
data = [('Alice', 25), ('Bob', 30)]
columns = ['name', 'age']
df = spark.createDataFrame(data, columns)
df.write.jdbc(url=jdbc_url, table="person", mode="append", properties=connection_properties)
```
这里使用了一个包含两行数据的DataFrame,将其写入到名为`person`的MySQL表中,使用`mode="append"`表示追加数据到表中。
最后,可以通过DataFrame的聚合函数计算`age`的最大值和总和,并打印出来:
```python
from pyspark.sql.functions import max, sum
max_age = df.select(max("age")).collect()[0][0]
total_age = df.select(sum("age")).collect()[0][0]
print("Max age:", max_age)
print("Total age:", total_age)
```
阅读全文