Quellcode durchsuchen

调整订阅内容

guoqiang vor 2 Wochen
Ursprung
Commit
0221a75c84

+ 247 - 187
zgzt-sys-java/jeecg-module-conn/src/main/java/org/jeecg/modules/push/utils/MqttClientCallback.java

@@ -18,26 +18,20 @@ import org.jeecg.modules.heatsActuals.entity.HeatsActuals;
 import org.jeecg.modules.heatsActuals.service.IHeatsActualsService;
 import org.jeecg.modules.stackingAndLoadingVehicles.service.IStackingAndLoadingVehiclesService;
 import org.jeecg.modules.storageBill.service.IStorageBillService;
-import org.springframework.beans.factory.annotation.Autowired;
 
-import java.util.*;
+import java.util.Date;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * 常规MQTT回调函数
- *
  */
 @Slf4j
-public class MqttClientCallback implements MqttCallback, MqttCallbackExtended{
+public class MqttClientCallback implements MqttCallback, MqttCallbackExtended {
 
-    /**
-     * 系统的mqtt客户端id
-     */
     private String mqttClientId;
     private MqttClient myMQTTClient;
     private String topics;
-    long reconnectTimes = 1;
-
+    private long reconnectTimes = 1;
 
     public MqttClientCallback(String mqttClientId, MqttClient myMQTTClient, String topics) {
         this.mqttClientId = mqttClientId;
@@ -45,261 +39,327 @@ public class MqttClientCallback implements MqttCallback, MqttCallbackExtended{
         this.topics = topics;
     }
 
-    public MqttClientCallback(){
-
+    // 不推荐直接使用无参构造函数,除非确保后续手动设置必要字段
+    public MqttClientCallback() {
+        throw new UnsupportedOperationException("请使用带参数的构造函数初始化");
     }
-    private MqttMsgMapper mqttMsgMapper= SpringContextHolder.getBean(MqttMsgMapper.class);
 
+    private MqttMsgMapper mqttMsgMapper = SpringContextHolder.getBean(MqttMsgMapper.class);
     private IBilletBasicInfoService billetBasicInfoService = SpringContextHolder.getBean(IBilletBasicInfoService.class);
-
     private IHeatsActualsService heatsActualsService = SpringContextHolder.getBean(IHeatsActualsService.class);
-
     private IStorageBillService storageBillService = SpringContextHolder.getBean(IStorageBillService.class);
-
     private IBilletHotsendBaseService billetHotsendBaseService = SpringContextHolder.getBean(IBilletHotsendBaseService.class);
-
     private IStackingAndLoadingVehiclesService stackingAndLoadingVehiclesService = SpringContextHolder.getBean(IStackingAndLoadingVehiclesService.class);
-
     private IBilletAssemblyNumberService billetAssemblyNumberService = SpringContextHolder.getBean(IBilletAssemblyNumberService.class);
-
     private IBilletHotsendChangeShiftService billetHotsendChangeShiftService = SpringContextHolder.getBean(IBilletHotsendChangeShiftService.class);
 
-    // 使用原子类AtomicInteger保证在多线程环境下计数准确
     private AtomicInteger messageCount = new AtomicInteger(0);
 
-    /**
-     * MQTT 断开连接会执行此方法
-     */
     @Override
     public void connectionLost(Throwable throwable) {
-        log.error(">>>>>>>>>>>>>>>>mqtt连接丢失,原因:{}", throwable.getMessage());
-//        log.error("连接丢失,原因: {}", throwable.getMessage());
+//        log.error("连接丢失,原因: {}", throwable.getMessage(), throwable);
 //        int retryCount = 0;
-//        int MAX_RETRY_COUNT = 5;
-//        long RETRY_DELAY = 5000;
-//        while (retryCount < MAX_RETRY_COUNT) {
+//        int maxRetryCount = 5;
+//        long retryDelay = 5000;
+//
+//        while (retryCount < maxRetryCount) {
 //            try {
 //                log.error("正在进行第 {} 次重连", retryCount + 1);
-//                Thread.sleep(RETRY_DELAY);
+//                Thread.sleep(retryDelay);
 //                myMQTTClient.reconnect();
 //                break;
 //            } catch (InterruptedException | MqttException e) {
 //                retryCount++;
 //            }
 //        }
-//        if (retryCount == MAX_RETRY_COUNT) {
+//
+//        if (retryCount == maxRetryCount) {
 //            log.error("已达到最大重试次数,放弃重连");
 //        }
-//        while (true) {
+//
+//        int reconnectAttempts = 0;
+//        int maxReconnectAttempts = 10; // 控制最大尝试次数
+//
+//        while (reconnectAttempts < maxReconnectAttempts) {
 //            try {
 //                reconnectTimes += 1;
-//                log.warn("mqtt reconnect times = {} try again...  mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
+//                log.warn("mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
 //                myMQTTClient.reconnect();
-//                String[] topics = this.topics.split(",");
-//                if(topics != null && topics.length != 0){
-//                    for (String topic : topics) {
-//                        myMQTTClient.subscribe(topic);
-//                    }
+//
+//                String[] topicArray = topics != null ? topics.split(",") : new String[0];
+//                for (String topic : topicArray) {
+//                    myMQTTClient.subscribe(topic);
 //                }
 //            } catch (MqttException e) {
 //                log.error("mqtt断连异常", e);
 //            }
+//
 //            try {
-//                Thread.sleep(5000);
+//                Thread.sleep(retryDelay);
 //            } catch (InterruptedException e1) {
+//                Thread.currentThread().interrupt();
 //            }
+//
+//            reconnectAttempts++;
 //        }
     }
 
-    /**
-     * publish发布成功后会执行到这里
-     */
     @Override
     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
-        // 从iMqttDeliveryToken中获取主题名称
         String topic = iMqttDeliveryToken.getTopics()[0];
-        // 使用正确的变量topic记录主题相关日志信息
         log.info(">>>>>>>>>>>>>>>>发布消息成功,主题: {}", topic);
     }
 
-    /**
-     * subscribe订阅后得到的消息会执行到这里
-     */
     @Override
-    public void messageArrived(String topic, MqttMessage message){
-        // 某个主题不打印
-        if(!"syn/strandnosize/receive".equals(topic)){
+    public void messageArrived(String topic, MqttMessage message) {
+        if (!"syn/strandnosize/receive".equals(topic)) {
             log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>mqtt收到主题{}的消息:{}", topic, new String(message.getPayload()));
         }
 
-        // 通过原子类的自增方法来统计消息次数
         messageCount.incrementAndGet();
         try {
-            MqttMsg mqttMsg = new MqttMsg();
-            mqttMsg.setClientId(mqttClientId);
-            mqttMsg.setCreateDate(new Date());
-            mqttMsg.setDataValue(message.toString());
-            mqttMsg.setTopic(topic);
-            mqttMsgMapper.delete(new LambdaQueryWrapper<MqttMsg>().eq(MqttMsg::getTopic, topic));
-            mqttMsgMapper.insert(mqttMsg);
-            String payload = new String(message.getPayload());
+            byte[] payloadBytes = message.getPayload();
+            if (payloadBytes == null || payloadBytes.length == 0) {
+                log.warn("收到空消息体");
+                return;
+            }
+            String payload = new String(payloadBytes);
 
-            TopicType topicType = null;
-            for (TopicType type : TopicType.values()) {
-                if (topic.contains(type.getTopicValue())) {
-                    topicType = type;
-                    break;
-                }
+            saveMqttMessage(topic, payload);
+
+            TopicType topicType = getTopicType(topic);
+            if (topicType == null) {
+                return;
             }
+
             switch (topicType) {
                 case TRACE_PERFORMANCE_CONVERTER_ADD:
-                    try {
-                        HeatsActuals heatsActuals = JSON.parseObject(payload, HeatsActuals.class);
-                        heatsActualsService.addC(heatsActuals);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理炉次实绩消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleHeatsActuals(payload);
+                    break;
                 case TRACE_PERFORMANCE_BILLET_ADD:
-                    try {
-                        BilletBasicInfo billetBasicInfo = JSON.parseObject(payload, BilletBasicInfo.class);
-                        billetBasicInfoService.addC(billetBasicInfo);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理钢坯基础消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleBilletBasicInfo(payload);
+                    break;
                 case SYN_STORAGE_BILL_ADD:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-                        storageBillService.autoAddStorageBill(jsonObject);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理钢坯装运单新增消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleStorageBillAdd(payload);
+                    break;
                 case SYN_BILLET_HOTSEND_BASE_SAVE:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-                        billetHotsendBaseService.autoAddRodLineCommon(jsonObject);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理自动化新增5号机直轧棒1和6号机高线保存出错: ", e.getMessage());
-                    }
-                    return;
+                    handleAutoAddRodLineCommon(payload);
+                    break;
                 case SYN_BILLET_HOTSEND_BASE_SHIPP_SAVE:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-                        billetHotsendBaseService.autoAddRodLine(jsonObject);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理5、6号机-自动化热装保存消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleAutoAddRodLine(payload);
+                    break;
                 case SYN_BILLET_HOTSEND_BASE_SHIPP_DEPART:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-                        billetHotsendBaseService.autoDepartTrucking(jsonObject);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理5、6号机-自动化离站发车消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleAutoDepartTrucking(payload);
+                    break;
                 case SYN_BILLET_ADD_STACKING:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-                        stackingAndLoadingVehiclesService.autoAddStack(jsonObject);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理新增堆垛消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleAutoAddStack(payload);
+                    break;
                 case SYN_BILLET_STACKING_AND_LOADING_VEHICLES_LOADING:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-                        stackingAndLoadingVehiclesService.autoSaveStackLoading(jsonObject);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理堆垛保存消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleAutoSaveStackLoading(payload);
+                    break;
                 case TRACE_BILLETASSEMBLYNUMBER_BILLET_ADD:
-                    try {
-                        BilletAssemblyNumber billetAssemblyNumber = JSON.parseObject(payload, BilletAssemblyNumber.class);
-                        billetAssemblyNumberService.addBilletAssemblyNumber(billetAssemblyNumber);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理组坯实绩消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleBilletAssemblyNumber(payload);
+                    break;
                 case SYN_STORAGE_BILL_UPDATE:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-                        storageBillService.autoUpdateLicensePlate(jsonObject);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理钢坯装运单车牌更新消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleAutoUpdateLicensePlate(payload);
+                    break;
                 case SYN_BILLET_CHANGESHIFT:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-                        log.info("{}{}", "5号机自动化交班接收消息:", jsonObject);
-                        billetHotsendChangeShiftService.autoChangeShift(jsonObject);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理5号机自动化交班消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleAutoChangeShiftFive(payload);
+                    break;
                 case SYN_BILLET_SIX_CHANGESHIFT:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-                        log.info("{}{}", "6号机自动化交班接收消息:", jsonObject);
-                        billetHotsendChangeShiftService.autoChangeShiftSix(jsonObject);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理6号机自动化交班消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleAutoChangeShiftSix(payload);
+                    break;
                 case SYN_STRANDNO_SIZE_RECEIVE:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-//                        log.info("{}{}", "接收到流号定尺消息:", jsonObject);
-                        billetHotsendBaseService.receiveStrandNoSize(jsonObject);
-                    } catch (Exception e) {
-//                        log.info("{}{}", "mqtt接收流号定尺消息出错: ", e.getMessage());
-                    }
-                    return;
+                    handleReceiveStrandNoSize(payload);
+                    break;
                 case SYN_STACKING_CLEARLOCATION:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-                        stackingAndLoadingVehiclesService.stackingClearLocation(jsonObject);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理堆垛位置清空异常: ", e.getMessage());
-                    }
-                    return;
+                    handleStackingClearLocation(payload);
+                    break;
                 case SYN_STACK_PLACEHOLDERS:
-                    try {
-                        JSONObject jsonObject = JSON.parseObject(payload);
-                        stackingAndLoadingVehiclesService.stackQueryLocation(jsonObject);
-                    } catch (Exception e) {
-                        log.info("{}{}", "mqtt处理堆垛层数占位总数异常: ", e.getMessage());
-                    }
-                    return;
-                default:
+                    handleStackQueryLocation(payload);
                     break;
             }
+
             log.info("mqtt到目前为止,已接收消息的总次数为: {}", messageCount.get());
+
+        } catch (Exception e) {
+            log.error("mqtt消息处理出错", e);
+        }
+    }
+
+    private void saveMqttMessage(String topic, String payload) {
+        try {
+            MqttMsg mqttMsg = new MqttMsg();
+            mqttMsg.setClientId(mqttClientId);
+            mqttMsg.setCreateDate(new Date());
+            mqttMsg.setDataValue(payload);
+            mqttMsg.setTopic(topic);
+            mqttMsgMapper.delete(new LambdaQueryWrapper<MqttMsg>().eq(MqttMsg::getTopic, topic));
+            mqttMsgMapper.insert(mqttMsg);
+        } catch (Exception e) {
+            log.error("保存MQTT消息到数据库失败", e);
+        }
+    }
+
+    private TopicType getTopicType(String topic) {
+        for (TopicType type : TopicType.values()) {
+            if (topic.equals(type.getTopicValue())) {
+                return type;
+            }
+        }
+        return null;
+    }
+
+    private void handleHeatsActuals(String payload) {
+        try {
+            HeatsActuals heatsActuals = JSON.parseObject(payload, HeatsActuals.class);
+            heatsActualsService.addC(heatsActuals);
+        } catch (Exception e) {
+            log.error("mqtt处理炉次实绩消息出错", e);
+        }
+    }
+
+    private void handleBilletBasicInfo(String payload) {
+        try {
+            BilletBasicInfo billetBasicInfo = JSON.parseObject(payload, BilletBasicInfo.class);
+            billetBasicInfoService.addC(billetBasicInfo);
+        } catch (Exception e) {
+            log.error("mqtt处理钢坯实绩消息出错", e);
+        }
+    }
+
+    private void handleStorageBillAdd(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            storageBillService.autoAddStorageBill(jsonObject);
+        } catch (Exception e) {
+            log.error("mqtt处理钢坯装运单自动创建消息出错", e);
+        }
+    }
+
+    private void handleAutoAddRodLineCommon(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            billetHotsendBaseService.autoAddRodLineCommon(jsonObject);
+        } catch (Exception e) {
+            log.error("mqtt处理5号机直轧棒一/6号机热送高线出错", e);
+        }
+    }
+
+    private void handleAutoAddRodLine(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            billetHotsendBaseService.autoAddRodLine(jsonObject);
+        } catch (Exception e) {
+            log.error("mqtt处理5、6号机-自动化热装出错", e);
+        }
+    }
+
+    private void handleAutoDepartTrucking(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            billetHotsendBaseService.autoDepartTrucking(jsonObject);
         } catch (Exception e) {
-            // 记录数据库操作或者其他全局错误
-            log.info("{}{}", "mqtt消息处理出错: ", e.getMessage());
+            log.error("mqtt处理5、6号机-自动化离站发车消息出错", e);
+        }
+    }
+
+    private void handleAutoAddStack(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            stackingAndLoadingVehiclesService.autoAddStack(jsonObject);
+        } catch (Exception e) {
+            log.error("mqtt处理自动化钢坯装运单堆垛装运出错", e);
+        }
+    }
+
+    private void handleAutoSaveStackLoading(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            stackingAndLoadingVehiclesService.autoSaveStackLoading(jsonObject);
+        } catch (Exception e) {
+            log.error("mqtt处理自动化堆垛出错", e);
+        }
+    }
+
+    private void handleBilletAssemblyNumber(String payload) {
+        try {
+            BilletAssemblyNumber billetAssemblyNumber = JSON.parseObject(payload, BilletAssemblyNumber.class);
+            billetAssemblyNumberService.addBilletAssemblyNumber(billetAssemblyNumber);
+        } catch (Exception e) {
+            log.error("mqtt处理组坯实绩消息出错", e);
+        }
+    }
+
+    private void handleAutoUpdateLicensePlate(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            storageBillService.autoUpdateLicensePlate(jsonObject);
+        } catch (Exception e) {
+            log.error("mqtt处理钢坯装运单车牌更新消息出错", e);
+        }
+    }
+
+    private void handleAutoChangeShiftFive(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            log.info("{}{}", "5号机自动化交班接收消息:", jsonObject);
+            billetHotsendChangeShiftService.autoChangeShift(jsonObject);
+        } catch (Exception e) {
+            log.error("mqtt处理5号机自动化交班消息出错", e);
+        }
+    }
+
+    private void handleAutoChangeShiftSix(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            log.info("{}{}", "6号机自动化交班接收消息:", jsonObject);
+            billetHotsendChangeShiftService.autoChangeShiftSix(jsonObject);
+        } catch (Exception e) {
+            log.error("mqtt处理6号机自动化交班消息出错", e);
+        }
+    }
+
+    private void handleReceiveStrandNoSize(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            billetHotsendBaseService.receiveStrandNoSize(jsonObject);
+        } catch (Exception e) {
+            log.error("mqtt接收流号定尺消息出错", e);
+        }
+    }
+
+    private void handleStackingClearLocation(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            stackingAndLoadingVehiclesService.stackingClearLocation(jsonObject);
+        } catch (Exception e) {
+            log.error("mqtt处理堆垛位置清空异常", e);
+        }
+    }
+
+    private void handleStackQueryLocation(String payload) {
+        try {
+            JSONObject jsonObject = JSON.parseObject(payload);
+            stackingAndLoadingVehiclesService.stackQueryLocation(jsonObject);
+        } catch (Exception e) {
+            log.error("mqtt处理堆垛层数占位总数异常", e);
         }
     }
 
     @Override
     public void connectComplete(boolean b, String s) {
-        if (b) {
-            log.info("客户端 [{}] 在 {} 重连成功", this.mqttClientId, System.currentTimeMillis());
-        } else {
-            log.info("客户端 [{}] 在 {} 首次连接成功", this.mqttClientId, System.currentTimeMillis());
-        }
-        String[] topics = this.topics.split(",");
-        if(topics != null && topics.length != 0){
-            for (String topic : topics) {
-                try {
-                    myMQTTClient.subscribe(topic);
-                } catch (MqttException e) {
-                    e.printStackTrace();
-                }
+//        if (b) {
+//            log.info("客户端 [{}] 在 {} 重连成功", this.mqttClientId, System.currentTimeMillis());
+//        } else {
+//            log.info("客户端 [{}] 在 {} 首次连接成功", this.mqttClientId, System.currentTimeMillis());
+//        }
+        String[] topicArray = topics != null ? topics.split(",") : new String[0];
+        for (String topic : topicArray) {
+            try {
+                myMQTTClient.subscribe(topic);
+            } catch (MqttException e) {
+                log.error("订阅主题失败: {}", topic, e);
             }
         }
     }