pyspark 消费kafka
时间: 2023-07-06 16:37:40 浏览: 185
scala编写,以spark stream方式监听消费kafka的topic,同步写入elasticsearch
要使用 PySpark 消费 Kafka,可以使用 PySpark 的 KafkaUtils 包。下面是一个简单的示例代码:
```python
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
# 创建 SparkSession 和 StreamingContext
spark = SparkSession.builder.appName("KafkaStreaming").getOrCreate()
ssc = StreamingContext(spark.sparkContext, 5)
# 设置 Kafka 相关参数
brokers = "localhost:9092"
topic = "test"
# 创建 Kafka 数据流
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
# 处理数据流
lines = kafkaStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.pprint()
# 启动 StreamingContext
ssc.start()
ssc.awaitTermination()
```
这个代码片段创建了一个 PySpark Streaming 应用程序,它从 Kafka 主题 `test` 中接收数据,并对数据进行简单的处理,最后打印输出。你可以将 `brokers` 和 `topic` 参数替换为你自己的 Kafka 集群地址和主题名称。
阅读全文