sparkstreaming集成kafka
时间: 2023-06-17 14:06:15 浏览: 130
Spark Streaming 和 Kafka 集成可以让我们快速、高效地处理实时数据流。下面是一个简单的 Spark Streaming 和 Kafka 集成的示例:
首先,我们需要在 Spark Streaming 应用中添加 Kafka 相关依赖:
```
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.4.5"
```
然后,我们可以使用以下代码创建一个 Spark Streaming 应用程序:
```
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaStreamingApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("KafkaStreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val topicsSet = Set("test")
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
lines.print()
ssc.start()
ssc.awaitTermination()
}
}
```
以上代码创建了一个名为 KafkaStreamingApp 的 Spark Streaming 应用程序,使用 2 个线程并每 5 秒处理一次数据。在此示例中,我们从名为「test」的 Kafka 主题中读取数据,并将其打印到控制台上。
最后,我们需要运行 Kafka 和 Spark Streaming 应用程序:
1. 运行 Kafka:
```
bin/kafka-server-start.sh config/server.properties
```
2. 向 Kafka 主题中发送数据:
```
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
```
3. 运行 Spark Streaming 应用程序:
```
spark-submit --class KafkaStreamingApp --master local[2] --deploy-mode client target/xxx.jar
```
以上就是 Spark Streaming 和 Kafka 集成的一个简单示例。实际情况下,我们需要根据具体情况进行配置和调整。
阅读全文