spark sql 更新数据库
时间: 2023-10-10 10:10:06 浏览: 83
Spark SQL提供了多种方式来更新数据库。这里列举其中两种常用的方式:
1. 使用JDBC连接器更新数据库
使用JDBC连接器可以方便地连接到数据库,并使用SQL语句来更新数据。以下是一个Spark SQL更新MySQL数据库的示例:
```scala
import java.util.Properties
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder()
.appName("Spark SQL update database")
.getOrCreate()
// 加载MySQL驱动
Class.forName("com.mysql.jdbc.Driver")
// 定义连接属性
val properties = new Properties()
properties.put("user", "root")
properties.put("password", "password")
// 加载数据
val df = spark.read.jdbc("jdbc:mysql://localhost:3306/test", "person", properties)
// 更新数据
df.where(df("age") >= 20).foreachPartition(iter => {
Class.forName("com.mysql.jdbc.Driver")
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "password")
iter.foreach(row => {
val stmt = conn.prepareStatement("UPDATE person SET salary = ? WHERE id = ?")
stmt.setDouble(1, row.getAs[Double]("salary") * 1.2)
stmt.setInt(2, row.getAs[Int]("id"))
stmt.executeUpdate()
})
conn.close()
})
```
2. 使用Spark SQL的JDBC数据源更新数据库
Spark SQL支持使用JDBC数据源将数据写入关系型数据库。以下是一个Spark SQL更新MySQL数据库的示例:
```scala
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder()
.appName("Spark SQL update database")
.getOrCreate()
// 加载数据
val df = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "person")
.option("user", "root")
.option("password", "password")
.load()
// 更新数据
df.where(df("age") >= 20)
.withColumn("salary", df("salary") * 1.2)
.write.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test")
.option("dbtable", "person")
.option("user", "root")
.option("password", "password")
.mode(SaveMode.Overwrite)
.save()
```
以上两种方式都可以用来更新数据库,具体使用哪种方式取决于具体的场景和需求。
阅读全文