MqttClientOptions options = new MqttClientOptions(); // options.setClientId("alarm-center_" + this.properties.getGroup() + "_" + this.properties.getNode()); options.setUsername(properties.getUsername()); options.setPassword(properties.getPassword()); options.setClientId(UUID.randomUUID().toString().replace("-","")); options.setCleanSession(true); options.setConnectTimeout(3000); client = VertxMqttClient.create(vertx, options); String share = "$share/" + this.properties.getGroup() + "/"; String topic = "/device/#"; // device下的所有topic client.publishHandler( mqttPublishMessage -> { // 判断是否是上线消息,记录每一个设备的上线时间,应对例外条件中开机启动中的无视时间 eg:/device/xjfd/generator_401012/online JSONObject payload = JSONObject.parseObject(mqttPublishMessage.payload().toString(StandardCharsets.UTF_8)); if (StringUtils.endsWith(mqttPublishMessage.topicName(), "online")) { String deviceId = payload.getString("deviceId"); redisService.setCacheObject(ONLINE_CACHE_KEY + deviceId, System.currentTimeMillis()); return; } if(StringUtils.endsWith(mqttPublishMessage.topicName(), "property/report")) { // 处理数据 try { alarmRegistry.handleMessage(payload); } catch (Throwable e) { log.error("async error -->" + e.getMessage(), e); } } }); client.connect(properties.getPort(), properties.getHost(), s -> { if (s.succeeded()){ client.subscribe(share + topic,properties.getQos()); log.info("center 建立MQTT链接"); } }); 这里有bug么
时间: 2024-02-14 10:26:51 浏览: 140
aliOss:https://github.comaliyunaliyun-oss-go-sdk的api
根据你提供的代码,没有明显的bug。不过需要注意以下几点:
1. 在使用`client.publishHandler()`方法时,需要确保已经成功连接到MQTT服务器,否则可能会导致未连接成功就开始订阅的问题。
2. 在处理`mqttPublishMessage`时,需要确保`mqttPublishMessage.payload().toString(StandardCharsets.UTF_8)`返回的是有效的JSON字符串,否则可能会导致JSON解析异常。
3. 在处理异常时,建议记录详细的错误信息,以方便排查问题。
如果你遇到了具体的问题或者有其他需求,请提供更多信息,我会尽力帮助你。
阅读全文