完成Spark RDD读写MySQL数据库练习,写出程序源代码,及运行结果截图。
时间: 2024-09-25 09:02:22 浏览: 47
spark rdd转dataframe 写入mysql的实例讲解
在完成Spark RDD(Resilient Distributed Dataset,弹性分布式数据集)与MySQL数据库的交互练习时,通常会涉及到使用Spark SQL和JDBC(Java Database Connectivity)库。下面是一个简单的示例代码,假设你已经有了一个SparkSession实例`spark`:
```python
from pyspark.sql import SparkSession, SQLContext
import mysql.connector
# 创建SparkSession
spark = SparkSession.builder.getOrCreate()
# 创建SQLContext用于SQL操作
sql_context = SQLContext(spark)
# 假设表名为"test_table",且有字段id、name和value
rdd_data = spark.sparkContext.parallelize([(1, 'Alice', 100), (2, 'Bob', 200)])
# 将RDD转换为DataFrame
df = sql_context.createDataFrame(rdd_data, ["id", "name", "value"])
# 连接MySQL数据库
db_config = {
"host": "your_host",
"user": "your_user",
"password": "your_password",
"database": "your_database"
}
try:
cnx = mysql.connector.connect(**db_config)
except mysql.connector.Error as err:
print("Error connecting to MySQL: {}".format(err))
# 写入DataFrame到MySQL
query = """INSERT INTO test_table (id, name, value) VALUES (?, ?, ?)"""
cursor = cnx.cursor()
for row in df.rdd.zipWithIndex().collect():
cursor.execute(query, tuple(row))
cnx.commit()
# 从MySQL读取数据到RDD
read_query = "SELECT * FROM test_table"
mysql_df = sql_context.read.format("jdbc").options(
url="jdbc:mysql://{}:{}/{}".format(db_config["host"], db_config["port"], db_config["database"]),
driver="com.mysql.cj.jdbc.Driver",
user=db_config["user"],
password=db_config["password"],
query=read_query
).load()
# 打印读取后的RDD内容
print(mysql_df.show())
# 关闭连接
cursor.close()
cnx.close()
# 结果截图无法直接在这里展示,你需要在实际环境运行此代码并保存截图
```
请注意,你需要替换上述代码中的数据库连接信息。运行后,你会看到新插入的数据以及从MySQL读取回来的数据。
阅读全文