scala2.12编写Flink1.14.4消费kafka中的数据,统计商城每分钟的GMV
时间: 2024-04-30 09:24:09 浏览: 152
Flink消费Kafka数据1
以下是使用Scala 2.12编写的Flink 1.14.4消费Kafka中数据并统计商城每分钟GMV的代码:
```scala
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object GMVCalculator {
def main(args: Array[String]): Unit = {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set up Kafka consumer properties
val kafkaProps = new Properties()
kafkaProps.setProperty("bootstrap.servers", "localhost:9092")
kafkaProps.setProperty("group.id", "flink-gmv-calculator")
// create a Kafka consumer
val kafkaConsumer = new FlinkKafkaConsumer[String]("transactions", new SimpleStringSchema(), kafkaProps)
// add Kafka consumer as a data source
val dataStream = env.addSource(kafkaConsumer)
// parse each transaction record into a case class
case class Transaction(id: String, timestamp: Long, amount: Double)
val transactionStream = dataStream.map(record => {
val fields = record.split(",")
Transaction(fields(0), fields(1).toLong, fields(2).toDouble)
})
// group transactions by minute and sum the amounts
val gmvStream = transactionStream
.keyBy(transaction => transaction.timestamp / (60 * 1000)) // group by minute
.sum("amount") // sum the amounts
// print the GMV for each minute
gmvStream.print()
// execute the Flink job
env.execute("GMV Calculator")
}
}
```
在上述代码中,我们首先设置了执行环境,然后创建了一个Kafka消费者,并将其添加为数据源。接着,我们解析每个交易记录并将其转换为一个case class。随后,我们按照每分钟分组交易记录,并对每分钟的交易金额求和,得到商城每分钟的GMV。最后,我们打印每分钟的GMV,并执行Flink作业。
阅读全文