用Python代码实现spark消费kafka数据并写入到mysql中
时间: 2023-02-26 13:03:42 浏览: 382
python 消费 kafka 数据教程
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql.types import *sc = SparkContext("local[2]", "KafkaToMySQL")
ssc = StreamingContext(sc, 10)
sqlContext = SQLContext(sc)#设置Kafka消费者参数
kafkaParams = {"bootstrap.servers": "localhost:9092",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"group.id": "group1",
"auto.offset.reset": "latest",
"enable.auto.commit": "false"}# 设置Kafka主题
kafkaStream = KafkaUtils.createDirectStream(ssc, ["topic1"], kafkaParams)# 将Kafka的输入转换为DataFrame
values = kafkaStream.map(lambda x: x[1])
df = values.map(lambda x: x.split(",")).map(lambda x: (x[0],x[1],x[2])).toDF(['name','age','address'])# 将DataFrame写入到MySQL中
df.write.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/test") \
.option("dbtable", "test.test") \
.option("user", "root") \
.option("password", "123456") \
.save()ssc.start()
ssc.awaitTermination()
阅读全文