SPark SQL 从 DB 读取数据方法和方式 scala
时间: 2024-03-15 10:47:27 浏览: 176
在 Spark SQL 中,可以通过 JDBC 数据源和 DataFrame API 从数据库中读取数据。下面是两种方式的详细说明:
1. JDBC 数据源
使用 JDBC 数据源可以直接从关系型数据库中读取数据。首先需要在 Spark 中添加 JDBC 驱动程序,例如:
```scala
spark-shell --driver-class-path mysql-connector-java.jar --jars mysql-connector-java.jar
```
然后可以使用 `spark.read.jdbc()` 方法读取数据,例如:
```scala
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/mydatabase")
.option("dbtable", "mytable")
.option("user", "myuser")
.option("password", "mypassword")
.load()
```
这个例子中,我们使用 MySQL 数据库,并读取了 `mytable` 表的数据。
2. DataFrame API
DataFrame API 是 Spark SQL 中的核心 API,可以通过它从多种数据源中读取数据,包括文件、Hive 表、JDBC 数据源等。使用 DataFrame API 读取数据库中的数据,可以采用以下步骤:
1. 创建 JDBC 连接
```scala
val jdbcUrl = "jdbc:mysql://localhost:3306/mydatabase"
val jdbcUsername = "myuser"
val jdbcPassword = "mypassword"
val jdbcDriver = "com.mysql.jdbc.Driver"
Class.forName(jdbcDriver)
val connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
```
2. 执行 SQL 查询
```scala
val statement = connection.createStatement()
val query = "SELECT * FROM mytable"
val resultSet = statement.executeQuery(query)
```
3. 将结果转换为 DataFrame
```scala
val schema = StructType(
Seq(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
)
)
val rows = new ArrayBuffer[Row]()
while (resultSet.next()) {
val id = resultSet.getInt("id")
val name = resultSet.getString("name")
val age = resultSet.getInt("age")
rows += Row(id, name, age)
}
val rdd = spark.sparkContext.parallelize(rows)
val df = spark.createDataFrame(rdd, schema)
```
这个例子中,我们手动创建了 DataFrame 的结构,并将查询结果转换为 DataFrame。注意,在实际生产环境中,应该避免手动创建 DataFrame 的结构,而是使用 Spark SQL 自动推断结构的功能。
阅读全文