怎么用spark streaming从Kafka消费数据
时间: 2023-08-16 09:03:40 浏览: 154
spark streamming消费kafka数据存入hbase示例代码
使用Spark Streaming从Kafka消费数据的步骤如下:
1. 首先需要引入相关依赖包,包括Spark Streaming Core、Kafka和Spark Streaming Kafka等。
2. 创建Spark Streaming Context并设置批处理时间间隔。
3. 创建Kafka参数映射,包括Kafka集群地址、消费者组、消费的主题等。
4. 创建一个DStream,使用KafkaUtils.createDirectStream方法从Kafka中消费数据。这个方法返回一个包含每个批次数据的RDD的DStream。
5. 对DStream进行数据处理,包括转换、过滤、聚合等。
6. 启动Spark Streaming应用程序并等待它完成。
下面是一个示例代码,演示了如何使用Spark Streaming从Kafka消费数据:
```
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.kafka.common.serialization.StringDeserializer
val ssc = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("my-topic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
.filter(_._2.contains("error"))
.count()
.print()
ssc.start()
ssc.awaitTermination()
```
在这个示例中,我们从名为“my-topic”的Kafka主题中消费数据,并在每个批次中对包含“error”字符串的记录进行计数。
阅读全文