《深入理解spark》之 结构化流(spark streaming+spark sql 处理结构化数据)的一个demo
时间: 2023-04-29 09:02:25 浏览: 134
流式计算组件Spark Streaming使用
这里是一个使用Spark Streaming和Spark SQL处理结构化数据的示例:
假设我们有一个实时的用户行为日志,其中包含用户ID、行为类型和时间戳。我们想要实时计算每个用户的行为数量,并将结果存储在一个数据库中。
首先,我们使用Spark Streaming从Kafka中读取实时日志数据,并将其转换为DataFrame:
```scala
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
val spark = SparkSession.builder().appName("StructuredStreamingDemo").getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
val kafkaParams = Map[String, String]("bootstrap.servers" -> "localhost:9092")
val topics = Set("user_behavior_logs")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
val logs = kafkaStream.map(_._2.split(","))
.map(log => (log(0), log(1), log(2).toLong))
.toDF("user_id", "behavior_type", "timestamp")
```
接下来,我们使用Spark SQL对DataFrame进行查询和聚合操作,计算每个用户的行为数量:
```scala
import org.apache.spark.sql.functions._
val result = logs.groupBy("user_id", "behavior_type")
.agg(count("*").as("count"))
.select("user_id", "behavior_type", "count")
result.writeStream
.outputMode("update")
.format("console")
.start()
result.writeStream
.outputMode("update")
.foreach(new JdbcForeachWriter())
.start()
```
最后,我们可以将结果输出到控制台或数据库中。这里我们使用自定义的JdbcForeachWriter将结果写入MySQL数据库:
```scala
import java.sql.{Connection, DriverManager, PreparedStatement}
class JdbcForeachWriter extends ForeachWriter[Row] {
var conn: Connection = _
var statement: PreparedStatement = _
def open(partitionId: Long, version: Long): Boolean = {
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "password")
statement = conn.prepareStatement("INSERT INTO user_behavior(user_id, behavior_type, count) VALUES (?, ?, ?)")
true
}
def process(row: Row): Unit = {
statement.setString(1, row.getString(0))
statement.setString(2, row.getString(1))
statement.setLong(3, row.getLong(2))
statement.executeUpdate()
}
def close(errorOrNull: Throwable): Unit = {
statement.close()
conn.close()
}
}
```
这样,我们就完成了一个使用Spark Streaming和Spark SQL处理结构化数据的示例。
阅读全文