Browse Source

C端位置清空回传坯号

qiangxuan 4 weeks ago
parent
commit
e75006a576

+ 39 - 3
zgzt-sys-java/jeecg-module-conn/src/main/java/org/jeecg/modules/push/utils/MqttClientUtil.java

@@ -220,13 +220,17 @@ public class MqttClientUtil implements ApplicationRunner {
         }
     }
 
+    /**
+     * C端自动化堆垛清理位置,回传坯号,推送消息
+     * @param configMqttMapper
+     * @param map
+     * @param topicInfo
+     */
     public void pushCData(ConfigMqttMapper configMqttMapper, Map<String, Object> map, String topicInfo){
-        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        map.put("time",formatter.format(new Date()));
         ConfigMqtt configMqtt = configMqttMapper.selectOne(new LambdaQueryWrapper<ConfigMqtt>().like(ConfigMqtt::getTopic,topicInfo).eq(ConfigMqtt::getPushOrSub,"0"));
         MqttClient mqttClient = null;
         try {
-            mqttClient = getMqttClient(configMqtt);
+            mqttClient = getMqttClient11(configMqtt);
         } catch (MqttException e) {
             e.printStackTrace();
         }
@@ -251,4 +255,36 @@ public class MqttClientUtil implements ApplicationRunner {
             }
         }
     }
+
+
+    public MqttClient getMqttClient11(ConfigMqtt configMqtt) throws MqttException {
+        MqttConnectOptions options = new MqttConnectOptions();
+        // 配置相关选项,如用户名、密码等...
+        options.setUserName(configMqtt.getUsername());
+        options.setPassword(configMqtt.getPassword().toCharArray());
+        // ...其他配置...
+        // 设置超时时间 单位为秒
+        options.setConnectionTimeout(60);
+        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
+        options.setKeepAliveInterval(60);
+        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
+        options.setCleanSession(false);
+// 由于 MqttConnectOptions 类没有 setSessionExpiryInterval 方法,需要移除这行代码
+// options.setSessionExpiryInterval(604800L); // 会话保留7天(60*60*24*7)
+        // 开启自动重连功能; 方法会判断这个参数
+        options.setAutomaticReconnect(true);
+        // 设置自动重连最大延迟时间 单位是毫秒
+        options.setMaxReconnectDelay(128000);
+        // 允许的最大传输中消息
+        options.setMaxInflight(100);
+        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
+        options.setWill("willTopic", "offline".getBytes(), 0, false);
+
+        StringBuffer url = new StringBuffer();
+        url.append("tcp://").append(configMqtt.getIp()).append(":").append(configMqtt.getHost());
+        MqttClient client = new MqttClient(url.toString(), configMqtt.getId(), new MemoryPersistence());
+        IMqttToken token = client.connectWithResult(options);
+        token.waitForCompletion();
+        return client;
+    }
 }

+ 3 - 1
zgzt-sys-java/jeecg-module-conn/src/main/java/org/jeecg/modules/stackingAndLoadingVehicles/service/impl/StackingAndLoadingVehiclesServiceImpl.java

@@ -71,6 +71,8 @@ import org.springframework.transaction.annotation.Transactional;
 import org.springframework.transaction.interceptor.TransactionAspectSupport;
 
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -1344,12 +1346,12 @@ public class StackingAndLoadingVehiclesServiceImpl extends ServiceImpl<StackingA
             mapSendInfo.put("ccmNo", ccmNo);
             mapSendInfo.put("billetNos", billetNoLists);
             mapSendInfo.put("clearLocationTime", DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get()));
+            // 执行MQTT推送,设置合理的超时时间
             MqttClientUtil mqttClientUtilBe = new MqttClientUtil();
             mqttClientUtilBe.pushCData(configMqttMapper, mapSendInfo, TopicType.SYN_STACKING_PASSBACK.getTopicValue());
             log.info("C端自动化位置清空后坯号回传,发送MQTT成功: {}", mapSendInfo);
         } catch (Exception e) {
             log.error("C端自动化位置清空后坯号回传,发送MQTT异常!", e);
-            throw new RuntimeException("发送MQTT异常", e);
         }
     }