Преглед изворни кода

Merge branch 'fpg-master' of 123.57.213.14:guoqiang.duan/zgzt-sys-java into fpg-master

guoqiang пре 6 месеци
родитељ
комит
853d63f554

+ 32 - 7
jeecg-module-conn/src/main/java/org/jeecg/modules/heatsActuals/service/impl/HeatsActualsServiceImpl.java

@@ -1,6 +1,7 @@
 package org.jeecg.modules.heatsActuals.service.impl;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import lombok.extern.slf4j.Slf4j;
 import org.jeecg.common.util.oConvertUtils;
 import org.jeecg.modules.heatsActuals.entity.HeatsActuals;
 import org.jeecg.modules.heatsActuals.mapper.HeatsActualsMapper;
@@ -15,24 +16,48 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  * @Version: V1.0
  */
 @Service
+@Slf4j
 public class HeatsActualsServiceImpl extends ServiceImpl<HeatsActualsMapper, HeatsActuals> implements IHeatsActualsService {
     @Override
     public void addC(HeatsActuals heatsActuals) {
-        // 满足  optype;  // 操作类型, 1=新增, 2=编辑
-        // 编辑条件 = 炉号+铸机号+开浇时间
-        if(heatsActuals.getOptype() == 2){ // 编辑信号
-            LambdaQueryWrapper<HeatsActuals> queryWrapper = new LambdaQueryWrapper<HeatsActuals>().eq(HeatsActuals::getHeatsCode, heatsActuals.getHeatsCode())
+        // 编辑条件 = 炉号+铸机号+开浇时间  optype;  // 操作类型, 1=新增, 2=编辑
+        if(heatsActuals.getOptype() == 2){
+            LambdaQueryWrapper<HeatsActuals> queryWrapper = new LambdaQueryWrapper<HeatsActuals>()
+                    .eq(HeatsActuals::getHeatsCode, heatsActuals.getHeatsCode())
                     .eq(HeatsActuals::getCasterCode, heatsActuals.getCasterCode())
                     .eq(HeatsActuals::getStartPourTime, heatsActuals.getStartPourTime());
             HeatsActuals actuals = baseMapper.selectOne(queryWrapper);
             if(oConvertUtils.isNotEmpty(actuals) && oConvertUtils.isNotEmpty(actuals.getId())){ // 编辑
                 heatsActuals.setId(actuals.getId());
-                baseMapper.updateById(heatsActuals);
+                try {
+                    baseMapper.updateById(heatsActuals);
+                } catch (Exception e) {
+                    log.error("更新HeatsActuals数据出现异常", e);
+                    throw new RuntimeException("更新HeatsActuals数据失败", e);
+                }
             } else {
-                baseMapper.insert(heatsActuals);
+                try {
+                    insertHeatsActuals(heatsActuals);
+                } catch (Exception e) {
+                    log.error("编辑时,插入HeatsActuals数据出现异常", e);
+                    throw new RuntimeException("编辑时,插入HeatsActuals数据失败", e);
+                }
             }
         } else {
-            baseMapper.insert(heatsActuals);
+            try {
+                insertHeatsActuals(heatsActuals);
+            } catch (Exception e) {
+                log.error("插入HeatsActuals数据出现异常", e);
+                throw new RuntimeException("插入HeatsActuals数据失败", e);
+            }
         }
     }
+
+    /**
+     *
+     * @param heatsActuals
+     */
+    private void insertHeatsActuals(HeatsActuals heatsActuals) {
+        baseMapper.insert(heatsActuals);
+    }
 }

+ 49 - 21
jeecg-module-conn/src/main/java/org/jeecg/modules/push/utils/MqttClientCallback.java

@@ -17,6 +17,7 @@ import org.jeecg.modules.heatsActuals.service.IHeatsActualsService;
 import org.springframework.data.mongodb.core.MongoTemplate;
 
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * 常规MQTT回调函数
@@ -53,11 +54,16 @@ public class MqttClientCallback implements MqttCallback, MqttCallbackExtended{
 
     private IHeatsActualsService heatsActualsService = SpringContextHolder.getBean(IHeatsActualsService.class);
 
+
+    // 使用原子类AtomicInteger保证在多线程环境下计数准确
+    private AtomicInteger messageCount = new AtomicInteger(0);
+
     /**
      * MQTT 断开连接会执行此方法
      */
     @Override
     public void connectionLost(Throwable throwable) {
+        log.error(">>>>>>>>>>>>>>>>mqtt连接丢失,原因:{}", throwable.getMessage());
 //        log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage());
 //        while (true) {
 //            try {
@@ -85,7 +91,10 @@ public class MqttClientCallback implements MqttCallback, MqttCallbackExtended{
      */
     @Override
     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
-        log.info("发布消息成功");
+        // 从iMqttDeliveryToken中获取主题名称
+        String topic = iMqttDeliveryToken.getTopics()[0];
+        // 使用正确的变量topic记录主题相关日志信息
+        log.info(">>>>>>>>>>>>>>>>发布消息成功,主题: {}", topic);
     }
 
     /**
@@ -93,26 +102,44 @@ public class MqttClientCallback implements MqttCallback, MqttCallbackExtended{
      */
     @Override
     public void messageArrived(String topic, MqttMessage message){
-        MqttMsg mqttMsg = new MqttMsg();
-        mqttMsg.setClientId(mqttClientId);
-        mqttMsg.setCreateDate(new Date());
-        mqttMsg.setDataValue(message.toString());
-        mqttMsg.setTopic(topic);
-        LambdaQueryWrapper<MqttMsg> eq = new LambdaQueryWrapper<MqttMsg>().eq(MqttMsg::getTopic, topic);
-        mqttMsgMapper.delete(eq);
-        mqttMsgMapper.insert(mqttMsg);
-        JSONObject jsonObject = JSONObject.fromObject(message.toString());
-        // 炉次实绩
-        if(topic.contains("trace/performance/converter/add")){
-            HeatsActuals heatsActuals =  JSON.parseObject(jsonObject.toString(), HeatsActuals.class);
-            heatsActualsService.addC(heatsActuals);
-            return;
-        }
-        // 钢坯实绩内容
-        if(topic.contains("trace/performance/billet/add")){
-            BilletBasicInfo billetBasicInfo =  JSON.parseObject(jsonObject.toString(), BilletBasicInfo.class);
-            billetBasicInfoService.addC(billetBasicInfo);
-            return;
+        log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>mqtt收到主题{}的消息:{}", topic, new String(message.getPayload()));
+        // 通过原子类的自增方法来统计消息次数
+        messageCount.incrementAndGet();
+        try {
+            MqttMsg mqttMsg = new MqttMsg();
+            mqttMsg.setClientId(mqttClientId);
+            mqttMsg.setCreateDate(new Date());
+            mqttMsg.setDataValue(message.toString());
+            mqttMsg.setTopic(topic);
+            mqttMsgMapper.delete(new LambdaQueryWrapper<MqttMsg>().eq(MqttMsg::getTopic, topic));
+            mqttMsgMapper.insert(mqttMsg);
+            JSONObject jsonObject = JSONObject.fromObject(message.toString());
+            // 炉次实绩
+            if (topic.contains("trace/performance/converter/add")) {
+                try {
+                    HeatsActuals heatsActuals = JSON.parseObject(jsonObject.toString(), HeatsActuals.class);
+                    heatsActualsService.addC(heatsActuals);
+                } catch (Exception e) {
+                    // 记录日志或者进行其他错误处理
+                    log.info("{}{}", "mqtt处理炉次实绩消息出错: ", e.getMessage());
+                }
+                return;
+            }
+            // 钢坯实绩内容
+            if (topic.contains("trace/performance/billet/add")) {
+                try {
+                    BilletBasicInfo billetBasicInfo = JSON.parseObject(jsonObject.toString(), BilletBasicInfo.class);
+                    billetBasicInfoService.addC(billetBasicInfo);
+                } catch (Exception e) {
+                    // 记录日志或者进行其他错误处理
+                    log.info("{}{}", "mqtt处理钢坯实绩消息出错: ", e.getMessage());
+                }
+                return;
+            }
+            log.info("mqtt到目前为止,已接收消息的总次数为: {}", messageCount.get());
+        } catch (Exception e) {
+            // 记录数据库操作或者其他全局错误
+            log.info("{}{}", "mqtt消息处理出错: ", e.getMessage());
         }
     }
 
@@ -129,4 +156,5 @@ public class MqttClientCallback implements MqttCallback, MqttCallbackExtended{
             }
         }
     }
+
 }

+ 3 - 0
jeecg-module-conn/src/main/java/org/jeecg/modules/push/utils/MqttClientUtil.java

@@ -66,6 +66,9 @@ public class MqttClientUtil implements ApplicationRunner {
             options.setAutomaticReconnect(false);
             //创建连接对象
             client = new MqttClient(url.toString(), configMqtt.getId(), new MemoryPersistence());
+            // 订阅主题,设置QoS级别为1
+            int qos = 2;
+            client.subscribe("topic", qos);
             //设置回调对象
             client.setCallback(new MqttClientCallback(configMqtt.getId(),client,configMqtt.getTopic()));
             //建立连接