spark streaming消费kafka数据进行词频统计,用scala语言
时间: 2023-07-12 07:06:03 浏览: 142
好的,以下是一个简单的示例代码,假设我们要消费名为“test”的Kafka主题,并对其中的单词进行词频统计:
```scala
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
object WordCount {
def main(args: Array[String]): Unit = {
// 创建 SparkConf 对象
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
// 创建 StreamingContext 对象
val ssc = new StreamingContext(conf, Seconds(5))
// 设置 Kafka 相关参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 创建一个 Kafka DStream
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("test"), kafkaParams)
)
// 从 Kafka DStream 中提取单词
val words = kafkaStream.flatMap(record => record.value().split(" "))
// 对单词进行计数
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// 输出计数结果
wordCounts.print()
// 启动 Spark Streaming
ssc.start()
ssc.awaitTermination()
}
}
```
上述代码中,我们首先创建了一个 `SparkConf` 对象,并设置了应用程序名为“WordCount”,使用本地模式运行。然后创建了一个 `StreamingContext` 对象,每隔 5 秒钟将接收到的数据进行批处理。
接下来,我们设置了 Kafka 相关参数,包括 Broker 地址、键值对反序列化器、消费者组 ID 等。然后使用 `KafkaUtils.createDirectStream` 方法创建了一个 Kafka DStream。在这个 DStream 上,我们使用 `flatMap` 方法将每个消息的值按空格拆分成单词,并使用 `map` 和 `reduceByKey` 方法对单词进行计数。最后,我们调用 `print` 方法输出计数结果。
最后,我们启动 Spark Streaming 并等待计算完成。
阅读全文