setstreamtimecharacteristic
时间: 2023-04-25 10:03:57 浏览: 70
setstreamtimecharacteristic是一个函数,用于设置流的时间特性。它可以用于音频和视频流,用于指定流的时间基准和时间戳的单位。这个函数通常在流的初始化过程中调用,以确保流的时间特性正确设置。
相关问题
import org.apache.flink.api.java.io.TextInputFormat import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ //输入数据的样例类 case class UserBehavior( userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long ) object Test1 { def main(args: Array[String]): Unit = { //flink的流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //设定Time类型为EventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //并发为1 env.setParallelism(1) //获得数据源的文件 val stream = env.readFile[String]( new TextInputFormat(null), "/data/workspace/myshixun/pflinkhotitems1_2/src/UserBehavior1.csv") //********** Begin ********** //********** End ********** stream1.writeAsText("/root/files/result.txt").setParallelism(1) env.execute("获得数据") } }
根据提示,我们需要在输入数据之后添加代码来对数据进行处理。可以使用 Flink 中的 DataStream API 对数据进行操作,例如过滤、分组、聚合等。
下面是一个示例代码,将读取的数据流按照用户行为分组,统计每个行为的数量,并将结果输出到文件中:
```scala
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
object Test1 {
def main(args: Array[String]): Unit = {
//flink的流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设定Time类型为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//并发为1
env.setParallelism(1)
//获得数据源的文件
val stream = env.readFile[String](
new TextInputFormat(null),
"/data/workspace/myshixun/pflinkhotitems1_2/src/UserBehavior1.csv")
// 对数据进行处理
val dataStream = stream
.map(line => {
val fields = line.split(",")
UserBehavior(fields(0).toLong, fields(1).toLong, fields(2).toInt, fields(3), fields(4).toLong)
})
.filter(_.behavior == "pv") // 过滤出行为为 pv 的数据
.map(behavior => (behavior.behavior, 1)) // 将行为转换为键值对,并将数量设为 1
.keyBy(_._1) // 按照行为分组
.timeWindow(Time.minutes(10)) // 设定时间窗口为 10 分钟
.sum(1) // 统计数量
// 输出结果到文件
dataStream.writeAsText("/root/files/result.txt").setParallelism(1)
// 执行作业
env.execute("获得数据")
}
}
```
在上面的代码中,首先创建了一个 TextInputFormat 对象,并使用 readFile() 方法从文件中读取数据。然后使用 map() 方法将每行数据解析为 UserBehavior 对象,并使用 filter() 方法过滤出行为为 pv 的数据。接着使用 map() 方法将每个行为转换为键值对,并将数量设为 1,使用 keyBy() 方法按照行为分组,使用 timeWindow() 方法设定时间窗口为 10 分钟,使用 sum() 方法统计数量。最后使用 writeAsText() 方法将结果输出到文件中,使用 execute() 方法执行作业。
//输入数据样例类 case class ApacheLogEvent(ip:String,userId:String,evetTime:Long,method:String,url:String) //窗口聚合结果样例类 case class UrlViewCount(url:String,windowEnd:Long,count:Long) object abc { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream = env.readTextFile("D:\\idea\\ideal\\flink-tutorial\\src\\main\\resources\\apache.log") .map(data=>{ val dataArray = data.split(" ") //定义事件转换 20/05/2015:17:05:11 val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") val timestamp = simpleDateFormat.parse(dataArray(3).trim).getTime ApacheLogEvent(dataArray(0).trim,dataArray(1).trim,timestamp,dataArray(5).trim,dataArray(6).trim) }) //*1000看是秒还是毫秒 秒*1000 毫秒不乘 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) { override def extractTimestamp(t: ApacheLogEvent): Long = t.evetTime }) .keyBy(_.url) .timeWindow(Time.minutes(10),Time.seconds(5)) //允许60秒的延迟数据去更新 .allowedLateness(Time.seconds(60)) .aggregate(new CountAgg(),new WindowResult()) .keyBy(_.windowEnd) .process(new TopNHotUrls(5)) dataStream.print() env.execute("network flow job") } }
这段代码是一个Scala语言的Flink应用程序的主函数。具体来说,它定义了一个输入数据样例类ApacheLogEvent,它包括了IP地址、用户ID、事件时间、请求方法和URL地址等信息。同时,它还定义了一个窗口聚合结果样例类UrlViewCount,它包括了URL地址、窗口结束时间和计数器等信息。
在主函数中,它首先获取Flink的流执行环境,并设置并行度和时间特征。接下来,它从文件中读取Apache日志数据,并对数据进行处理,将时间戳转换为事件时间,并使用BoundedOutOfOrdernessTimestampExtractor对数据进行时间戳提取和乱序限制。然后,它按照URL地址进行分组,并使用时间窗口对数据进行聚合。该应用程序还允许60秒的延迟数据去更新,并使用CountAgg和WindowResult对窗口中的数据进行聚合和处理。最后,它使用KeyedProcessFunction对数据流进行键控处理,并使用TopNHotUrls对URL地址进行热门度排名。最后,它打印结果并执行该应用程序。