from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import Row from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType if __name__ == "__main__": sc = SparkContext( 'local', 'test') spark=SQLContext(sc) jdbcDF=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user", "root").option("password", "123").load() jdbcDF.filter(jdbcDF.age>20).collect()//检测是否连接成功 studentRDD = sc.parallelize(["3 Mary F 26","4 Tom M 23"]).map(lambda line : line.split(" ")) schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)]) rowRDD = studentRDD.map(lambda p : Row(int(p[0]),p[1].strip(), p[2].strip(),int(p[3]))) employeeDF = spark.createDataFrame(rowRDD, schema) prop = {} prop['user'] = 'root' prop['password'] = '123' prop['driver'] = "com.mysql.jdbc.Driver" employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest",'employee','append', prop) jdbcDF.collect() jdbcDF.agg({"age": "max"}).show() jdbcDF.agg({"age": "sum"}).show()
时间: 2023-07-19 08:50:59 浏览: 169
Spark与Mysql的交互
5星 · 资源好评率100%
这段代码看起来是一个 Spark SQL 的示例代码,它实现了从 MySQL 数据库中读取数据,对数据进行过滤、聚合等操作,然后将结果写回到 MySQL 数据库中。具体来说,代码中使用了以下 Spark SQL API:
1. 通过 `SQLContext` 创建 Spark SQL 的上下文环境
2. 使用 `read.format("jdbc")` 方法从 MySQL 数据库中读取数据
3. 使用 `filter` 方法对数据进行过滤
4. 使用 `collect` 方法将过滤后的数据以 RDD 的形式返回
5. 使用 `createDataFrame` 方法将 RDD 转换为 DataFrame
6. 使用 `write.jdbc` 方法将 DataFrame 中的数据写回到 MySQL 数据库中
7. 使用 `agg` 方法进行聚合操作,例如 `agg({"age": "max"})` 表示计算 age 列的最大值
如果您有具体的问题需要解决,请告诉我。
阅读全文