//格式化输出 val result:StringBuilder = new StringBuilder() result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n") for (i <- sortedUrlViews.indices){ val currentUrlView = sortedUrlViews(i) result.append("NO").append(i+1).append(":") .append("URL=").append(currentUrlView.url) .append("访问量=").append(currentUrlView.count).append("\n") } result.append("=====================") Thread.sleep(1000) out.collect(result.toString()) } }
时间: 2024-04-28 13:22:35 浏览: 213
这是一段Scala代码,是对前面提到的自定义的处理函数TopNHotUrls进行输出的部分。具体来说,代码通过StringBuilder来格式化输出TopNHotUrls函数的结果,包括时间、URL和访问量,并将其输出到下游节点。
在具体实现中,使用了append方法将时间、URL和访问量添加到StringBuilder对象中,并使用Thread.sleep方法来模拟1秒钟的输出延迟。最后,调用out.collect方法将StringBuilder对象中的结果输出到下游节点。
需要注意的是,这段代码是在onTimer方法中进行输出的,也就是在时间窗口结束后进行输出。因此,它只会输出窗口内的数据,而不是所有的数据。如果需要输出所有的数据,可以在processElement方法中进行输出。
相关问题
class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] { private var itemState : ListState[ItemViewCount] = _ override def open(parameters: Configuration): Unit = { super.open(parameters) // 命名状态变量的名字和状态变量的类型 val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount]) // 从运行时上下文中获取状态并赋值 itemState = getRuntimeContext.getListState(itemsStateDesc) } override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = { // 每条数据都保存到状态中 itemState.add(input) // 注册 windowEnd+1 的 EventTime Timer,当触发时,说明收齐了属于windowEnd 窗口的所有商品数据 // 也就是当程序看到 windowend + 1 的水位线 watermark 时,触发 onTimer 回调函数 context.timerService.registerEventTimeTimer(input.windowEnd + 1) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { // 获取收到的所有商品点击量 val allItems: ListBuffer[ItemViewCount] = ListBuffer() import scala.collection.JavaConversions._ for (item <- itemState.get) { allItems += item } // 提前清除状态中的数据,释放空间 itemState.clear() // 按照点击量从大到小排序 val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize) // 将排名信息格式化成 String, 便于打印 val result: StringBuilder = new StringBuilder() result.append("******************************\n") result.append("窗口结束时间: ").append(new Timestamp(timestamp - 1)).append("\n") for(i <- sortedItems.indices){ val currentItem: ItemViewCount = sortedItems(i) // e.g. No1: 商品 ID=12224 浏览量 =2413 result.append("No").append(i+1).append(":") .append(" 商品id=").append(currentItem.itemId) .append(" 热门度 =").append(currentItem.count).append("\n") } result.append("******************************") // 控制输出频率,模拟实时滚动结果 Thread.sleep(500) out.collect(result.toString) }
这段代码是什么意思?
这段代码定义了一个KeyedProcessFunction函数,输入参数是ItemViewCount类型的数据,输出参数是String类型的数据。在函数中,定义了一个ListState状态变量itemState,并在open()方法中初始化。在processElement()方法中,每次接收到一个ItemViewCount类型的数据,都将其保存到itemState状态变量中,并注册一个定时器,定时器的触发时间是当前ItemViewCount数据所属的窗口结束时间+1。
在onTimer()方法中,当定时器触发时,首先从itemState状态变量中获取所有的ItemViewCount数据,并清空itemState状态变量。然后按照点击量从大到小排序,取出前topSize个热门商品,并将其格式化成字符串输出。最后通过out.collect()方法将结果输出。
这个函数的作用是计算出每个窗口期间内的热门商品排行榜,并输出结果。
阅读全文