Spark Streaming怎么保存到mysql
时间: 2024-05-14 07:13:53 浏览: 110
Spark Streaming可以通过将数据保存到MySQL来持久化处理结果。具体实现可以使用JDBC连接器将结果写入MySQL数据库。示例代码如下:
```scala
import java.util.Properties
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.util.Date
import java.text.SimpleDateFormat
//数据库连接信息
val url = "jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8"
val username = "root"
val password = "password"
val driver = "com.mysql.cj.jdbc.Driver"
//连接数据库
val prop = new Properties()
prop.setProperty("user", username)
prop.setProperty("password", password)
prop.setProperty("driver", driver)
//动态生成一个连接MySQL的对象
def createConnection() = {
Class.forName(driver)
DriverManager.getConnection(url, username, password)
}
//将结果写入MySQL数据库
def saveToMySQL(iterator: Iterator[(String, Long)]) = {
val conn = createConnection()
val sql = "insert into wordcount(word, count) values (?, ?)"
val pstmt = conn.prepareStatement(sql)
iterator.foreach { case (word, count) =>
pstmt.setString(1, word)
pstmt.setLong(2, count)
pstmt.executeUpdate()
}
pstmt.close()
conn.close()
}
//创建SparkStreamingContext
val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaSparkStreaming")
val ssc = new StreamingContext(conf, Seconds(5))
//创建Kafka连接信息
val topics = Set("test")
val brokers = "localhost:9092"
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
//读取Kafka中的数据
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//进行计算
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.foreachRDD(rdd => {
rdd.foreachPartition(iter => saveToMySQL(iter))
})
ssc.start()
ssc.awaitTermination()
```
这是一个Spark Streaming处理Kafka数据并将结果写入MySQL的示例代码。其中,saveToMySQL函数用于将结果写入MySQL,将结果写入MySQL的语句需要根据具体情况而定。注意,此处省略了数据库连接的异常处理,实际应用中应该加入相关的处理逻辑。
阅读全文