从kafka 读取数据,求访问量最高的两个网页scala代码
时间: 2023-05-12 07:04:40 浏览: 90
以前自己写的一套,从kafka获取数据源,通过flink插入my-flink_kafka_mysql-hbase.zip
以下是一个从 Kafka 读取数据并计算访问量最高的两个网页的 Scala 代码示例:
```scala
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName("KafkaStream")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = Set("weblogs")
val logs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
val pageVisits = logs.map(_._2.split(" ")(6))
.map(page => (page, 1))
.reduceByKey(_ + _)
val topPages = pageVisits.transform(_.sortBy(_._2, false))
.map(_._1)
.take(2)
topPages.print()
ssc.start()
ssc.awaitTermination()
```
这段代码使用 Spark Streaming 从 Kafka 中读取名为 "weblogs" 的主题中的数据,并计算每个网页的访问量。最后,它将按访问量排序并打印出前两个网页。
阅读全文