spark streaming消费kafka数据进行词频统计,用scala语言,我的kafka版本是0.10
时间: 2023-06-30 09:06:24 浏览: 59
好的,下面是一个简单的示例代码,用于使用Spark Streaming消费Kafka数据并对接收到的消息中的单词进行计数:
```scala
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> "test-consumer-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
val lines = stream.map(_.value)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
```
在这个示例中,我们首先创建了一个Spark Streaming上下文,并设置了我们要连接的Kafka集群的参数。接下来,我们使用`KafkaUtils`对象的`createDirectStream()`方法来创建一个直接连接到Kafka集群的DStream。我们从DStream中提取消息内容,并将其拆分为单词。最后,我们使用`reduceByKey()`函数对每个单词进行计数,并将结果打印出来。
希望这个示例能帮助你开始使用Spark Streaming消费Kafka数据并进行词频统计。