import org.apache.flink.api.java.io.TextInputFormat import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ //输入数据的样例类 case class UserBehavior( userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long ) object Test1 { def main(args: Array[String]): Unit = { //flink的流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //设定Time类型为EventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //并发为1 env.setParallelism(1) //获得数据源的文件 val stream = env.readFile[String]( new TextInputFormat(null), "/data/workspace/myshixun/pflinkhotitems1_2/src/UserBehavior1.csv") //********** Begin ********** //********** End ********** stream1.writeAsText("/root/files/result.txt").setParallelism(1) env.execute("获得数据") } }
时间: 2023-07-05 09:28:31 浏览: 202
Flink JavaAPI开发
5星 · 资源好评率100%
根据提示,我们需要在输入数据之后添加代码来对数据进行处理。可以使用 Flink 中的 DataStream API 对数据进行操作,例如过滤、分组、聚合等。
下面是一个示例代码,将读取的数据流按照用户行为分组,统计每个行为的数量,并将结果输出到文件中:
```scala
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
object Test1 {
def main(args: Array[String]): Unit = {
//flink的流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设定Time类型为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//并发为1
env.setParallelism(1)
//获得数据源的文件
val stream = env.readFile[String](
new TextInputFormat(null),
"/data/workspace/myshixun/pflinkhotitems1_2/src/UserBehavior1.csv")
// 对数据进行处理
val dataStream = stream
.map(line => {
val fields = line.split(",")
UserBehavior(fields(0).toLong, fields(1).toLong, fields(2).toInt, fields(3), fields(4).toLong)
})
.filter(_.behavior == "pv") // 过滤出行为为 pv 的数据
.map(behavior => (behavior.behavior, 1)) // 将行为转换为键值对,并将数量设为 1
.keyBy(_._1) // 按照行为分组
.timeWindow(Time.minutes(10)) // 设定时间窗口为 10 分钟
.sum(1) // 统计数量
// 输出结果到文件
dataStream.writeAsText("/root/files/result.txt").setParallelism(1)
// 执行作业
env.execute("获得数据")
}
}
```
在上面的代码中,首先创建了一个 TextInputFormat 对象,并使用 readFile() 方法从文件中读取数据。然后使用 map() 方法将每行数据解析为 UserBehavior 对象,并使用 filter() 方法过滤出行为为 pv 的数据。接着使用 map() 方法将每个行为转换为键值对,并将数量设为 1,使用 keyBy() 方法按照行为分组,使用 timeWindow() 方法设定时间窗口为 10 分钟,使用 sum() 方法统计数量。最后使用 writeAsText() 方法将结果输出到文件中,使用 execute() 方法执行作业。
阅读全文