基于scala语言的sparkstreaming读取kafka实现字符串分割并去除字符串为"zkz"的数据案例
时间: 2024-05-05 15:19:36 浏览: 119
java基于spark streaming和kafka,hbase的日志统计分析系统.rar
5星 · 资源好评率100%
好的,我明白了你的要求。以下是一个基于Scala语言的Spark Streaming读取Kafka的示例代码,它实现了对字符串的分割,并过滤掉包含“zkz”的数据:
```scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaStreamingDemo {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("KafkaStreamingDemo").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test_group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val stream = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
val lines = stream.map(record => record.value)
val filteredLines = lines.filter(line => !line.contains("zkz"))
val words = filteredLines.flatMap(line => line.split(" "))
words.print()
ssc.start()
ssc.awaitTermination()
}
}
```
这个程序首先创建了一个SparkConf对象和一个StreamingContext对象。然后,它创建了一个Kafka相关的参数对象kafkaParams和一个包含要订阅的主题的字符串数组topics。 接着,它使用KafkaUtils.createDirectStream函数从Kafka中读取数据,并且提取出字符串记录的值。然后,程序过滤了包含“zkz”的行,并使用flatMap函数将每行分割成单词。最后,程序输出这些单词。
希望能够帮助你解决问题。
阅读全文