Flink之滑动窗口(之滑动窗口(SlidingEventTimeWindows))
def main(args: Array[String]): Unit = {
// 环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val dstream: DataStream[String] = env.socketTextStream("hadoop1",7777)
val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
val arr: Array[String] = text.split(" ")
(arr(0), arr(1).toLong, 1)
}
val textWithEventTimeDstream: DataStream[(String, Long, Int)] =
textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long,
Int)](Time.milliseconds(1000)) {
override def extractTimestamp(element: (String, Long, Int)): Long = {
return element._2
}
})
val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
textKeyStream.print("textkey:")
val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] =
textKeyStream.window(SlidingEventTimeWindows.of(Time.seconds(2),Time.milliseconds(500)))
val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set,
(key, ts, count)) =>
set += ts
}
groupDstream.print("window::::").setParallelism(1)
env.execute()
}
作者:qq_37969476
评论10