统计每个商铺的平均客单价 flink的代码
时间: 2024-02-01 08:13:24 浏览: 164
Flink简单入门代码Demo
假设数据源是一个流式的订单数据流,包含以下字段:商铺ID(shopId)、订单金额(orderAmount)、订单时间(orderTime)。
可以使用Flink的DataStream API进行实现,代码如下:
```java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class ShopAvgOrderAmount {
public static void main(String[] args) throws Exception {
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取订单数据源
DataStream<Order> orders = env.addSource(new OrderSource());
// 按商铺ID分组
DataStream<ShopOrder> shopOrders = orders
.keyBy(new KeySelector<Order, String>() {
@Override
public String getKey(Order order) throws Exception {
return order.getShopId();
}
})
// 计算每个商铺的订单总金额和订单数量
.timeWindow(Time.minutes(10))
.map(new MapFunction<Order, ShopOrder>() {
@Override
public ShopOrder map(Order order) throws Exception {
return new ShopOrder(order.getShopId(), order.getOrderAmount(), 1);
}
})
.reduce(new ReduceFunction<ShopOrder>() {
@Override
public ShopOrder reduce(ShopOrder so1, ShopOrder so2) throws Exception {
return new ShopOrder(so1.getShopId(), so1.getOrderAmount() + so2.getOrderAmount(), so1.getOrderCount() + so2.getOrderCount());
}
});
// 计算每个商铺的平均客单价
DataStream<ShopAvgOrderAmount> shopAvgOrderAmounts = shopOrders
.map(new MapFunction<ShopOrder, ShopAvgOrderAmount>() {
@Override
public ShopAvgOrderAmount map(ShopOrder so) throws Exception {
return new ShopAvgOrderAmount(so.getShopId(), so.getOrderAmount() / so.getOrderCount());
}
});
// 打印结果
shopAvgOrderAmounts.print();
// 执行任务
env.execute("ShopAvgOrderAmount");
}
// 订单实体类
public static class Order {
private String shopId;
private double orderAmount;
private long orderTime;
public Order() {}
public Order(String shopId, double orderAmount, long orderTime) {
this.shopId = shopId;
this.orderAmount = orderAmount;
this.orderTime = orderTime;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public double getOrderAmount() {
return orderAmount;
}
public void setOrderAmount(double orderAmount) {
this.orderAmount = orderAmount;
}
public long getOrderTime() {
return orderTime;
}
public void setOrderTime(long orderTime) {
this.orderTime = orderTime;
}
}
// 每个商铺的订单统计实体类
public static class ShopOrder {
private String shopId;
private double orderAmount;
private int orderCount;
public ShopOrder() {}
public ShopOrder(String shopId, double orderAmount, int orderCount) {
this.shopId = shopId;
this.orderAmount = orderAmount;
this.orderCount = orderCount;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public double getOrderAmount() {
return orderAmount;
}
public void setOrderAmount(double orderAmount) {
this.orderAmount = orderAmount;
}
public int getOrderCount() {
return orderCount;
}
public void setOrderCount(int orderCount) {
this.orderCount = orderCount;
}
}
// 每个商铺的平均客单价实体类
public static class ShopAvgOrderAmount {
private String shopId;
private double avgOrderAmount;
public ShopAvgOrderAmount() {}
public ShopAvgOrderAmount(String shopId, double avgOrderAmount) {
this.shopId = shopId;
this.avgOrderAmount = avgOrderAmount;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public double getAvgOrderAmount() {
return avgOrderAmount;
}
public void setAvgOrderAmount(double avgOrderAmount) {
this.avgOrderAmount = avgOrderAmount;
}
}
}
```
该代码实现了按商铺ID对订单数据流进行分组,然后计算每个商铺的订单总金额和订单数量,并最终计算每个商铺的平均客单价。其中,需要自定义三个实体类:订单实体类(Order)、每个商铺的订单统计实体类(ShopOrder)和每个商铺的平均客单价实体类(ShopAvgOrderAmount)。
阅读全文