帮我写一个SparkStreaming 充当 Kafka 的消费者消费 Kafka 中的数据,并进行分析
时间: 2024-05-06 09:16:54 浏览: 96
好的,下面是一个简单的示例代码:
```scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
object KafkaConsumer {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("KafkaConsumer")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> "test-consumer-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
stream.flatMap(record => record.value.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.print()
ssc.start()
ssc.awaitTermination()
}
}
```
这个程序使用 Spark Streaming 作为 Kafka 的消费者,消费了一个名为 "test" 的 Kafka 主题中的数据,并对数据进行了简单的单词计数。在这个示例中,我们使用了直接流(Direct Stream)来消费 Kafka 中的数据。我们也可以使用 Receiver-based Stream 来消费 Kafka 中的数据,但是 Receiver-based Stream 消耗的资源更多,因此在生产环境下推荐使用直接流。
阅读全文