SparkStreaming连接Kafka
时间: 2024-04-30 18:24:32 浏览: 8
要在Spark Streaming中连接Kafka,需要使用KafkaUtils类提供的createDirectStream方法。以下是连接Kafka并在Spark Streaming中消费消息的示例代码:
```
import org.apache.spark.streaming.kafka._
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
val topics = Set("my-topic")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
stream.map(_._2).print()
ssc.start()
ssc.awaitTermination()
```
在上面的示例代码中,我们首先创建一个名为kafkaParams的Map对象,其中包含Kafka集群的地址和端口。然后,我们定义了要消费的Kafka主题的集合。接下来,我们使用KafkaUtils.createDirectStream方法创建了一个输入DStream,该方法使用指定的参数连接到Kafka,并从指定的主题中消费消息。最后,我们使用map操作将DStream中的键值对转换为只包含消息值的DStream,并将其打印出来。
需要注意的是,该示例使用了Kafka 0.10版本及以上的新消费者API。如果您使用的是旧版本的Kafka,则需要使用不同的API。