from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import Row from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType if __name__ == "__main__": sc = SparkContext( 'local', 'test') spark=SQLContext(sc) jdbcDF=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user", "root").option("password", "123").load() jdbcDF.filter(jdbcDF.age>20).collect()//检测是否连接成功 studentRDD = sc.parallelize(["3 Mary F 26","4 Tom M 23"]).map(lambda line : line.split(" ")) schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)]) rowRDD = studentRDD.map(lambda p : Row(int(p[0]),p[1].strip(), p[2].strip(),int(p[3]))) employeeDF = spark.createDataFrame(rowRDD, schema) prop = {} prop['user'] = 'root' prop['password'] = '123' prop['driver'] = "com.mysql.jdbc.Driver" employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest",'employee','append', prop) jdbcDF.collect() jdbcDF.agg({"age": "max"}).show() jdbcDF.agg({"age": "sum"}).show()
时间: 2023-07-19 18:49:23 浏览: 142
C#创建存储过程代码.doc
这段代码使用了PySpark连接MySQL数据库,并进行了一些基本的数据读取、过滤、写入和聚合操作。
首先,通过SparkContext和SQLContext创建了一个SparkSession,然后使用Spark的JDBC连接器读取MySQL数据库中的employee表格数据,并使用filter()方法过滤出年龄大于20岁的员工数据。接着,使用parallelize()和map()方法创建了一个包含两个学生信息的RDD,并使用createDataFrame()方法将其转换为DataFrame。最后,使用write.jdbc()方法将DataFrame中的数据写入到MySQL数据库的employee表格中,并使用agg()方法进行了最大值和求和的聚合操作。
需要注意的是,这段代码中的数据库连接信息和账户信息需要根据实际情况进行修改,以保证连接成功。另外,如果要使用JDBC连接器进行数据读写操作,还需要将相应的JDBC驱动程序添加到classpath中。
阅读全文