spark streaming怎么实时消费kafka上的数据
时间: 2023-11-24 12:04:00 浏览: 96
Spark Streaming 可以通过 Kafka Direct API 或 Receiver API 来实时消费 Kafka 上的数据。
使用 Kafka Direct API 的方式,需要引入 spark-streaming-kafka 相关的依赖,然后创建 Kafka Direct Stream,并指定 Kafka 的参数和消费的 topic。
示例代码如下:
```scala
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, String]("bootstrap.servers" -> "localhost:9092")
val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
stream.map(record => (record._1, record._2)).print()
ssc.start()
ssc.awaitTermination()
```
使用 Kafka Receiver API 的方式,需要引入 spark-streaming-kafka 相关的依赖,然后创建 Kafka Receiver Stream,并指定 Kafka 的参数和消费的 topic。
示例代码如下:
```scala
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
val topics = Set("test")
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY_SER)
stream.map(record => (record._1, record._2)).print()
ssc.start()
ssc.awaitTermination()
```
需要注意的是,使用 Receiver API 的方式可能会有数据丢失的问题,因此建议使用 Direct API。
阅读全文