使用Scala版flink根据shop_data.csv数据,统计每个每个商铺的有效完成率(有效完成率=有效完成单数/接单总数)
时间: 2024-02-05 19:10:50 浏览: 92
销售金额及完成率统计图表.xlsx
假设shop_data.csv文件的内容如下:
```
shop_id,order_id,is_completed
1,1,true
1,2,false
1,3,true
2,4,true
2,5,true
3,6,false
3,7,true
```
可以使用Scala版Flink来实现对每个商铺的有效完成率的统计。以下是代码示例:
```scala
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
object ShopDataAnalysis {
case class ShopData(shopId: Int, orderId: Int, isCompleted: Boolean)
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val inputPath = "path/to/shop_data.csv"
val outputPath = "path/to/shop_completion_rate.csv"
val shopData = env.readCsvFile[ShopData](
inputPath,
ignoreFirstLine = true,
includedFields = Array(0, 1, 2)
)
val totalOrders = shopData.map(sd => (sd.shopId, 1))
.groupBy(0)
.sum(1)
val completedOrders = shopData.filter(_.isCompleted)
.map(sd => (sd.shopId, 1))
.groupBy(0)
.sum(1)
val completionRate = totalOrders.join(completedOrders)
.where(0)
.equalTo(0)
.map(pair => (pair._1, pair._2._2.toDouble / pair._2._1.toDouble))
completionRate.writeAsCsv(outputPath, writeMode = WriteMode.OVERWRITE)
env.execute("Shop Data Analysis")
}
}
```
代码首先定义了一个 `ShopData` 类来表示每一个商铺的订单数据。然后使用 `env.readCsvFile` 方法读取 CSV 文件,并且忽略第一行,只包含 `shop_id`、`order_id` 和 `is_completed` 这三个字段。接下来,通过 `map`、`groupBy` 和 `sum` 方法计算每个商铺的总订单数和有效完成订单数。最后,使用 `join` 和 `map` 方法计算每个商铺的有效完成率,并将结果写入到 CSV 文件中。
需要注意的是,上述代码中的 `path/to/shop_data.csv` 和 `path/to/shop_completion_rate.csv` 分别表示输入文件和输出文件的路径,需要根据实际情况进行修改。
阅读全文