val accumulator: (Long, Long) = (0L, 0L) val a2: DataStream[Double] = f2.map(x => { val jsonObject = JsonMethods.parse(x) val JDouble(sbljl) = (jsonObject \ "order_status").extract[JDouble] if (sbljl.equals(JDouble(1001))) { (accumulator._1 + 1L, accumulator._2 + 1L) } else { (accumulator._1, accumulator._2 + 1L) } }).keyBy(0) .windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1))) .aggregate(new CustomAggregateFunction()) a2.map(x => { val cancelRate = x / accumulator._2.toDouble // 计算取消订单占比 val roundedCancelRate = BigDecimal(cancelRate).setScale(1, BigDecimal.RoundingMode.HALF_UP).toDouble // 四舍五入保留一位小数 (key, roundedCancelRate) }).addSink(new RedisSink[(String, Double)](flinkJedisPoolConfig, new TotalPriceRedisMapper2)) env.execute() class TotalPriceRedisMapper2 extends RedisMapper[(String, Double)] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.SET, "cancelrate") } override def getKeyFromData(data: (String, Double)): String = data._1 override def getValueFromData(data: (String, Double)): String = data._2.toString }无法解析符号 CustomAggregateFunction解决办法
时间: 2024-04-22 10:26:53 浏览: 149
解决keras,val_categorical_accuracy:,0.0000e+00问题
根据您提供的代码,无法解析符号CustomAggregateFunction。这意味着您在代码中引用了一个未定义的CustomAggregateFunction类。
解决此问题的步骤如下:
1. 确保您已经导入所需的CustomAggregateFunction类。如果CustomAggregateFunction是您自己编写的类,请确保该类已经正确定义在您的代码中。
2. 检查CustomAggregateFunction类的导入语句是否正确。请确保使用import语句将CustomAggregateFunction类导入到您的代码中。
3. 如果CustomAggregateFunction类来自于外部库或包,确保已经正确添加该库的依赖,并在代码中正确引入CustomAggregateFunction类。
4. 检查CustomAggregateFunction类的命名空间或包路径是否正确。确保在代码中使用正确的命名空间或包路径来引用CustomAggregateFunction类。
如果上述步骤都没有解决问题,请提供更多关于CustomAggregateFunction类的信息,以便我能够更好地帮助您解决问题。
阅读全文