var accumulator: (Long, Long) = (0L, 0L) val a2: DataStream[Double] = f2.map(x => { val jsonObject = JsonMethods.parse(x) val JDouble(sbljl) = (jsonObject \ "order_status").extract[JDouble] if (sbljl.equals(JDouble(1001))) { accumulator = (accumulator._1 + 1L, accumulator._2 + 1L) } else { accumulator = (accumulator._1, accumulator._2 + 1L) } accumulator._1 / accumulator._2.toDouble }).keyBy(0) .windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1))) a2.map(x => { val cancelRate = x // 计算取消订单占比 val roundedCancelRate = BigDecimal(cancelRate).setScale(1, BigDecimal.RoundingMode.HALF_UP).toDouble // 四舍五入保留一位小数 ("someKey", roundedCancelRate) // 使用一个虚拟的key }).addSink(new RedisSink[(String, Double)](flinkJedisPoolConfig, new TotalPriceRedisMapper2)).windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1))类型不匹配
时间: 2024-04-22 15:26:28 浏览: 106
accumulator:累加器
5星 · 资源好评率100%
根据您提供的代码,您在`addSink`方法之添加了额外的`windowAll`方法,而这是不正确的,导致类型不匹配的错误。
在您的代码中,`a2`已经通过`keyBy(0)`进行了分区,并使用了`TumblingProcessingTimeWindows.of(Time.minutes(1))`定义了一个窗口。因此,您不需要再次使用`windowAll`方法。
下面是修正后的代码:
```scala
var accumulator: (Long, Long) = (0L, 0L)
val a2: DataStream[Double] = f2.map(x => {
val jsonObject = JsonMethods.parse(x)
val JDouble(sbljl) = (jsonObject \ "order_status").extract[JDouble]
if (sbljl.equals(JDouble(1001))) {
accumulator = (accumulator._1 + 1L, accumulator._2 + 1L)
} else {
accumulator = (accumulator._1, accumulator._2 + 1L)
}
accumulator._1 / accumulator._2.toDouble
}).keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
a2.map(x => {
val cancelRate = x // 计算取消订单占比
val roundedCancelRate = BigDecimal(cancelRate).setScale(1, BigDecimal.RoundingMode.HALF_UP).toDouble // 四舍五入保留一位小数
("someKey", roundedCancelRate) // 使用一个虚拟的key
}).addSink(new RedisSink[(String, Double)](flinkJedisPoolConfig, new TotalPriceRedisMapper2))
```
在修正后的代码中,我移除了额外的`windowAll`方法,并将其替换为正确的`window`方法来定义窗口。
请确保您在使用`window`方法时,根据您的需求选择合适的窗口类型,并根据需要进行进一步的处理。
阅读全文