class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] { private var itemState : ListState[ItemViewCount] = _ override def open(parameters: Configuration): Unit = { super.open(parameters) // 命名状态变量的名字和状态变量的类型 val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount]) // 从运行时上下文中获取状态并赋值 itemState = getRuntimeContext.getListState(itemsStateDesc) } override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = { // 每条数据都保存到状态中 itemState.add(input) // 注册 windowEnd+1 的 EventTime Timer,当触发时,说明收齐了属于windowEnd 窗口的所有商品数据 // 也就是当程序看到 windowend + 1 的水位线 watermark 时,触发 onTimer 回调函数 context.timerService.registerEventTimeTimer(input.windowEnd + 1) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { // 获取收到的所有商品点击量 val allItems: ListBuffer[ItemViewCount] = ListBuffer() import scala.collection.JavaConversions._ for (item <- itemState.get) { allItems += item } // 提前清除状态中的数据,释放空间 itemState.clear() // 按照点击量从大到小排序 val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize) // 将排名信息格式化成 String, 便于打印 val result: StringBuilder = new StringBuilder() result.append("******************************\n") result.append("窗口结束时间: ").append(new Timestamp(timestamp - 1)).append("\n") for(i <- sortedItems.indices){ val currentItem: ItemViewCount = sortedItems(i) // e.g. No1: 商品 ID=12224 浏览量 =2413 result.append("No").append(i+1).append(":") .append(" 商品id=").append(currentItem.itemId) .append(" 热门度 =").append(currentItem.count).append("\n") } result.append("******************************") // 控制输出频率,模拟实时滚动结果 Thread.sleep(500) out.collect(result.toString) }
这段代码是什么意思?
这段代码定义了一个KeyedProcessFunction函数,输入参数是ItemViewCount类型的数据,输出参数是String类型的数据。在函数中,定义了一个ListState状态变量itemState,并在open()方法中初始化。在processElement()方法中,每次接收到一个ItemViewCount类型的数据,都将其保存到itemState状态变量中,并注册一个定时器,定时器的触发时间是当前ItemViewCount数据所属的窗口结束时间+1。
在onTimer()方法中,当定时器触发时,首先从itemState状态变量中获取所有的ItemViewCount数据,并清空itemState状态变量。然后按照点击量从大到小排序,取出前topSize个热门商品,并将其格式化成字符串输出。最后通过out.collect()方法将结果输出。
这个函数的作用是计算出每个窗口期间内的热门商品排行榜,并输出结果。
interface: Bird wings: int fly(): void interface: Fish fins: int swim(): void Class: FlyingFish color: String eat(): void写出代码
// 定义鸟类接口Bird
interface Bird {
int wings; // 鸟类翅膀的数量
void fly(); // 鸟类飞行的能力
}
// 定义鱼类接口Fish
interface Fish {
int fins; // 鱼类鳍的数量
void swim(); // 鱼类游泳的能力
}
// 定义特殊类FlyingFish,它同时具备鸟和鱼的部分特征
class FlyingFish extends Object implements Bird, Fish { // Java中通常使用Object作为基类
String color; // 飞行鱼的颜色
@Override
public void eat() { // 飞行鱼吃的行为
System.out.println("飞行鱼吃东西");
}
// 实现Bird接口的方法
@Override
public int wings {
return 2; // 这里假设大多数飞行鱼有两对翅膀
}
// 实现Bird接口的飞行动作
@Override
public void fly() {
System.out.println("飞行鱼开始飞翔");
}
// 实现Fish接口的方法
@Override
public int fins {
return 0; // 飞行鱼没有正常的鱼鳍,此处设为0
}
// 实现Fish接口的游泳动作,由于是飞行鱼,这里可以简单地忽略
@Override
public void swim() {
System.out.println("飞行鱼实际上不会游泳,但这个方法未实现");
}
}
public class Main {
public static void main(String[] args) {
FlyingFish flyingFish = new FlyingFish();
flyingFish.fly(); // 调用飞翔方法
flyingFish.swim(); // 调用游泳方法(会打印出默认信息)
System.out.println("颜色: " + flyingFish.color); // 输出飞行鱼的颜色
}
}
根据代码功能,补充如下代码 trait Operate3 { //特质 def insert(id: Int): Unit = { println("插入数据 = " + id) } } class OracleDB { //空 } abstract class MySQL3_ { def say() } object MixInDemo01 { def main(args: Array[String]): Unit = { val oracleDB = new OracleDB oracleDB.insert(100) // val mySql_ = new MySQL3_ mySql_.insert(999) mySql_.say() } }
根据代码功能,需要进行如下补充:
在 OracleDB 类中,需要混入 Operate3 特质,以便该类能够调用 insert 方法。
MySQL3_ 类是一个抽象类,需要在其子类中完成 insert 方法的具体实现。
下面是补充后的代码:
trait Operate3 { //特质 def insert(id: Int): Unit = { println("插入数据 = " + id) } }
class OracleDB extends Operate3 { //混入 Operate3 特质 //空 }
abstract class MySQL3_ { def insert(id: Int): Unit //在子类中具体实现 def say(): Unit }
class MySQLImpl extends MySQL3_ { override def insert(id: Int): Unit = { println("MySQL插入数据 = " + id) }
override def say(): Unit = { println("MySQL实现的say方法") } }
object MixInDemo01 { def main(args: Array[String]): Unit = { val oracleDB = new OracleDB oracleDB.insert(100)
val mySql_ = new MySQLImpl
mySql_.insert(999)
mySql_.say()
} }
在 main 方法中,首先创建了 OracleDB 的实例对象 oracleDB,然后调用了其 insert 方法,输出了插入数据的信息。
接着创建了 MySQLImpl 的实例对象 mySql_,调用了其 insert 和 say 方法,输出了 MySQL 插入数据和 MySQL 实现的 say 方法的信息。
相关推荐

















