使用scala2.12语言编写Flink1.14.4消费kafka中的数据,统计商城每分钟的GMV,将结果存入redis中
时间: 2024-06-10 14:09:45 浏览: 168
以下是使用scala2.12语言编写Flink1.14.4消费kafka中的数据,统计商城每分钟的GMV,将结果存入redis中的代码示例:
```scala
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
case class OrderData(userId: String, timestamp: Long, amount: Double)
object GMVAnalysis {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConsumerConfig = Map(
"bootstrap.servers" -> "localhost:9092",
"group.id" -> "flink-gmv-consumer-group",
"auto.offset.reset" -> "earliest"
)
val kafkaConsumer = new FlinkKafkaConsumer[String](
"orders",
new SimpleStringSchema(),
kafkaConsumerConfig
)
val dataStream = env.addSource(kafkaConsumer)
.map(line => {
val fields = line.split(",")
OrderData(fields(0), fields(1).toLong, fields(2).toDouble)
})
val gmvStream = dataStream
.keyBy(_.userId)
.timeWindow(Time.minutes(1))
.apply(new GMVWindowFunction)
val redisConfig = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
gmvStream.addSink(new RedisSink[(String, Double)](redisConfig, new GMVSinkFunction))
env.execute("GMV Analysis")
}
}
class GMVWindowFunction extends WindowFunction[OrderData, (String, Double), String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[OrderData], out: Collector[(String, Double)]): Unit = {
val gmv = input.map(_.amount).sum
out.collect((window.getEnd.toString, gmv))
}
}
class GMVSinkFunction extends SinkFunction[(String, Double)] {
override def invoke(value: (String, Double)): Unit = {
val jedis = new Jedis("localhost", 6379)
jedis.set(value._1, value._2.toString)
jedis.close()
}
}
```
以上代码中,我们通过FlinkKafkaConsumer从kafka中读取订单数据,并将其转换为OrderData对象。然后将数据按用户名进行分组,并使用1分钟的时间窗口进行聚合,计算每个窗口内的GMV并输出到Redis中。
需要注意的是,该示例中使用了redis.clients.jedis库来访问Redis。如果您使用的是Flink 1.13及以上版本,可以使用官方提供的flink-connector-redis库来访问Redis,这样更加便捷。具体使用方法请参考官方文档。
阅读全文