//输入数据样例类 case class ApacheLogEvent(ip:String,userId:String,evetTime:Long,method:String,url:String) //窗口聚合结果样例类 case class UrlViewCount(url:String,windowEnd:Long,count:Long) object abc { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream = env.readTextFile("D:\\idea\\ideal\\flink-tutorial\\src\\main\\resources\\apache.log") .map(data=>{ val dataArray = data.split(" ") //定义事件转换 20/05/2015:17:05:11 val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") val timestamp = simpleDateFormat.parse(dataArray(3).trim).getTime ApacheLogEvent(dataArray(0).trim,dataArray(1).trim,timestamp,dataArray(5).trim,dataArray(6).trim) }) //*1000看是秒还是毫秒 秒*1000 毫秒不乘 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) { override def extractTimestamp(t: ApacheLogEvent): Long = t.evetTime }) .keyBy(_.url) .timeWindow(Time.minutes(10),Time.seconds(5)) //允许60秒的延迟数据去更新 .allowedLateness(Time.seconds(60)) .aggregate(new CountAgg(),new WindowResult()) .keyBy(_.windowEnd) .process(new TopNHotUrls(5)) dataStream.print() env.execute("network flow job") } }
时间: 2024-04-28 16:22:26 浏览: 112
stringcase:JavaScript的字符串大小写(字母大小写)检测和转换
这段代码是一个Scala语言的Flink应用程序的主函数。具体来说,它定义了一个输入数据样例类ApacheLogEvent,它包括了IP地址、用户ID、事件时间、请求方法和URL地址等信息。同时,它还定义了一个窗口聚合结果样例类UrlViewCount,它包括了URL地址、窗口结束时间和计数器等信息。
在主函数中,它首先获取Flink的流执行环境,并设置并行度和时间特征。接下来,它从文件中读取Apache日志数据,并对数据进行处理,将时间戳转换为事件时间,并使用BoundedOutOfOrdernessTimestampExtractor对数据进行时间戳提取和乱序限制。然后,它按照URL地址进行分组,并使用时间窗口对数据进行聚合。该应用程序还允许60秒的延迟数据去更新,并使用CountAgg和WindowResult对窗口中的数据进行聚合和处理。最后,它使用KeyedProcessFunction对数据流进行键控处理,并使用TopNHotUrls对URL地址进行热门度排名。最后,它打印结果并执行该应用程序。
阅读全文