guoqiang 2 тижнів тому
батько
коміт
c5e58cce89

+ 0 - 44
zgzt-sys-java/jeecg-module-conn/src/main/java/org/jeecg/modules/billetActual/controller/BilletHotsendChangeShiftController.java

@@ -1,67 +1,23 @@
 package org.jeecg.modules.billetActual.controller;
 
-import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shiro.authz.annotation.RequiresPermissions;
 import org.jeecg.common.api.vo.Result;
-import org.jeecg.common.aspect.annotation.AutoLog;
 import org.jeecg.common.system.base.controller.JeecgController;
-import org.jeecg.common.system.query.QueryGenerator;
 import org.jeecg.common.util.DateUtils;
 import org.jeecg.common.util.oConvertUtils;
-import org.jeecg.modules.billetActual.entity.BilletBasicInfo;
 import org.jeecg.modules.billetActual.entity.BilletHotsendChangeShift;
-import org.jeecg.modules.billetActual.entity.OnDutyLiftingBillDetails;
-import org.jeecg.modules.billetActual.service.IBilletBasicInfoService;
 import org.jeecg.modules.billetActual.service.IBilletHotsendChangeShiftService;
 import org.jeecg.modules.billetActual.util.ScheduleUtils;
 import org.jeecg.modules.billetActual.util.ShiftInfo;
-import org.jeecg.modules.billetLiftingBill.entity.BilletLiftingBill;
-import org.jeecg.modules.billetLiftingBill.service.IBilletLiftingBillService;
-import org.jeecg.modules.heatsActuals.service.IHeatsActualsService;
-import org.jeecg.modules.operateLog.service.IOperateLogService;
-import org.jeecg.modules.rollClubOne.entity.RollClubOneDetails;
-import org.jeecg.modules.rollClubOne.service.IRollClubOneDetailsService;
-import org.jeecg.modules.rollClubThree.entity.RollClubThreeDetails;
-import org.jeecg.modules.rollClubThree.service.IRollClubThreeDetailsService;
-import org.jeecg.modules.rollClubTwo.entity.RollClubTwoDetails;
-import org.jeecg.modules.rollClubTwo.service.IRollClubTwoDetailsService;
-import org.jeecg.modules.rollHeight.entity.RollHeightDetails;
-import org.jeecg.modules.rollHeight.service.IRollHeightDetailsService;
-import org.jeecg.modules.rollOutShipp.entity.RollOutShippDetails;
-import org.jeecg.modules.rollOutShipp.service.IRollOutShippDetailsService;
-import org.jeecg.modules.stackingAndLoadingVehicles.entity.StackingUpLog;
-import org.jeecg.modules.stackingAndLoadingVehicles.service.IStackingUpLogService;
 import org.jeecg.modules.storageBill.entity.ShiftEnum;
 import org.jeecg.modules.storageBill.entity.ShiftGroupEnum;
-import org.jeecg.modules.storageBill.entity.StorageBill;
-import org.jeecg.modules.storageBill.service.IStorageBillService;
-import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.web.bind.annotation.*;
-import org.springframework.web.servlet.ModelAndView;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.text.DecimalFormat;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.YearMonth;
-import java.time.format.DateTimeFormatter;
 import java.util.*;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * @Description: 钢坯交班记录

+ 54 - 47
zgzt-sys-java/jeecg-module-conn/src/main/java/org/jeecg/modules/push/utils/MqttClientCallback.java

@@ -57,53 +57,59 @@ public class MqttClientCallback implements MqttCallback, MqttCallbackExtended {
 
     @Override
     public void connectionLost(Throwable throwable) {
-//        log.error("连接丢失,原因: {}", throwable.getMessage(), throwable);
-//        int retryCount = 0;
-//        int maxRetryCount = 5;
-//        long retryDelay = 5000;
-//
-//        while (retryCount < maxRetryCount) {
-//            try {
-//                log.error("正在进行第 {} 次重连", retryCount + 1);
-//                Thread.sleep(retryDelay);
-//                myMQTTClient.reconnect();
-//                break;
-//            } catch (InterruptedException | MqttException e) {
-//                retryCount++;
-//            }
-//        }
-//
-//        if (retryCount == maxRetryCount) {
-//            log.error("已达到最大重试次数,放弃重连");
-//        }
-//
-//        int reconnectAttempts = 0;
-//        int maxReconnectAttempts = 10; // 控制最大尝试次数
-//
-//        while (reconnectAttempts < maxReconnectAttempts) {
-//            try {
-//                reconnectTimes += 1;
-//                log.warn("mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
-//                myMQTTClient.reconnect();
-//
-//                String[] topicArray = topics != null ? topics.split(",") : new String[0];
-//                for (String topic : topicArray) {
-//                    myMQTTClient.subscribe(topic);
-//                }
-//            } catch (MqttException e) {
-//                log.error("mqtt断连异常", e);
-//            }
-//
-//            try {
-//                Thread.sleep(retryDelay);
-//            } catch (InterruptedException e1) {
-//                Thread.currentThread().interrupt();
-//            }
-//
-//            reconnectAttempts++;
-//        }
+        log.error("连接丢失,原因: {}", throwable.getMessage(), throwable);
+        int retryCount = 0;
+        final int maxRetryCount = 5;
+        final long retryDelay = 5000;
+        while (retryCount < maxRetryCount) {
+            try {
+                log.info("正在进行第 {} 次重连", retryCount + 1);
+                Thread.sleep(retryDelay);
+                if (myMQTTClient != null && !myMQTTClient.isConnected()) {
+                    myMQTTClient.reconnect();
+                }
+                break;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt(); // 恢复中断状态
+                retryCount++;
+            } catch (MqttException e) {
+                retryCount++;
+            }
+        }
+        if (retryCount == maxRetryCount) {
+            log.error("已达到最大重试次数,放弃重连");
+        }
+        int reconnectAttempts = 0;
+        final int maxReconnectAttempts = 10;
+        while (reconnectAttempts < maxReconnectAttempts) {
+            try {
+                reconnectTimes += 1;
+                log.warn("mqtt reconnect times = {},尝试重新连接...", reconnectTimes);
+                if (myMQTTClient != null) {
+                    myMQTTClient.reconnect();
+                    if (topics != null && !topics.trim().isEmpty()) {
+                        String[] topicArray = topics.split(",");
+                        for (String topic : topicArray) {
+                            String trimmedTopic = topic.trim();
+                            if (!trimmedTopic.isEmpty()) {
+                                myMQTTClient.subscribe(trimmedTopic);
+                            }
+                        }
+                    }
+                }
+            } catch (MqttException e) {
+                log.error("mqtt断连异常", e);
+            }
+            try {
+                Thread.sleep(retryDelay);
+            } catch (InterruptedException e1) {
+                Thread.currentThread().interrupt(); // 正确恢复中断状态
+            }
+            reconnectAttempts++;
+        }
     }
 
+
     @Override
     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
         String topic = iMqttDeliveryToken.getTopics()[0];
@@ -179,8 +185,9 @@ public class MqttClientCallback implements MqttCallback, MqttCallbackExtended {
                     handleStackQueryLocation(payload);
                     break;
             }
-
-            log.info("mqtt到目前为止,已接收消息的总次数为: {}", messageCount.get());
+            if (!"syn/strandnosize/receive".equals(topic)) {
+                log.info("mqtt到目前为止,已接收消息的总次数为: {}", messageCount.get());
+            }
 
         } catch (Exception e) {
             log.error("mqtt消息处理出错", e);

+ 181 - 192
zgzt-sys-java/jeecg-module-conn/src/main/java/org/jeecg/modules/push/utils/MqttClientUtil.java

@@ -3,7 +3,6 @@ package org.jeecg.modules.push.utils;
 import com.alibaba.fastjson.JSON;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.ObjectUtils;
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.jeecg.modules.connConfig.configMqtt.entity.ConfigMqtt;
@@ -14,274 +13,264 @@ import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.stereotype.Component;
 
-import java.text.SimpleDateFormat;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 @Slf4j
 @Component
 public class MqttClientUtil implements ApplicationRunner {
 
-    public ConcurrentHashMap<String, MqttClient> mqttClients = new ConcurrentHashMap();
+    public final ConcurrentHashMap<String, MqttClient> mqttClients = new ConcurrentHashMap<>();
+    private static final ConcurrentMap<String, Object> lockMap = new ConcurrentHashMap<>();
 
     @Autowired
     private ConfigMqttMapper configMqttMapper;
 
-    public MqttClient getMqttClient(ConfigMqtt configMqtt) throws MqttException {
+    // 提取公共方法:构建 MQTT URL
+    private static String buildMqttUrl(ConfigMqtt configMqtt) {
+        return "tcp://" + configMqtt.getIp() + ":" + configMqtt.getHost();
+    }
+
+    // 提取公共方法:创建默认连接选项
+    private MqttConnectOptions createDefaultOptions(ConfigMqtt configMqtt) {
         MqttConnectOptions options = new MqttConnectOptions();
         options.setUserName(configMqtt.getUsername());
-        options.setPassword(configMqtt.getPassword().toCharArray());
-        // 设置超时时间 单位为秒
+        char[] password = configMqtt.getPassword().toCharArray();
+        options.setPassword(password);
         options.setConnectionTimeout(60);
-        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
         options.setKeepAliveInterval(60);
-        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
-        options.setCleanSession(false);
-// 由于 MqttConnectOptions 类没有 setSessionExpiryInterval 方法,需要移除这行代码
-// options.setSessionExpiryInterval(604800L); // 会话保留7天(60*60*24*7)
-        // 开启自动重连功能; 方法会判断这个参数
+        options.setCleanSession(true);
         options.setAutomaticReconnect(true);
-        // 设置自动重连最大延迟时间 单位是毫秒
         options.setMaxReconnectDelay(128000);
-        // 允许的最大传输中消息
-        options.setMaxInflight(100);
+        options.setMaxInflight(1000);
 
-        StringBuffer url = new StringBuffer();
-        url.append("tcp://").append(configMqtt.getIp()).append(":").append(configMqtt.getHost());
-        MqttClient client = new MqttClient(url.toString(), configMqtt.getId(), new MemoryPersistence());
-        if (!client.isConnected()) {
-            log.info("{}{}", "(((((((((((getMqttClients时,未连接mqtt", configMqtt.getId());
-            client.connect(options);
-        }else {
-            log.info("{}{}", ">>>>>>>>>getMqttClients时,已连接mqtt", configMqtt.getId());
-            client.reconnect();
-            client.connect(options);
-        }
-        IMqttToken token = client.connectWithResult(options); // 等待消息发送完成
+        // 清理密码
+        Arrays.fill(password, '\0');
+        return options;
+    }
+
+    // 统一创建 MQTT 客户端的方法(新增回调参数)
+    private MqttClient createMqttClient(ConfigMqtt configMqtt, boolean cleanSession) throws MqttException {
+        MqttConnectOptions options = createDefaultOptions(configMqtt);
+        options.setCleanSession(cleanSession);
+        String url = buildMqttUrl(configMqtt);
+        MqttClient client = new MqttClient(url, configMqtt.getId(), new MemoryPersistence());
+        client.setCallback(new MqttClientCallback(configMqtt.getId(), client, configMqtt.getTopic()));
+        IMqttToken token = client.connectWithResult(options);
         token.waitForCompletion();
         return client;
     }
 
+    public synchronized MqttClient getMqttClient(ConfigMqtt configMqtt) throws MqttException {
+        String clientId = configMqtt.getId();
+        String url = buildMqttUrl(configMqtt);
+        MqttClient client = mqttClients.get(url);
+
+        if (client == null || !client.isConnected()) {
+            client = createMqttClient(configMqtt, true);
+            mqttClients.put(url, client);
+            log.info("新建并连接 MQTT 客户端: {}", clientId);
+        } else {
+            log.info("已存在并连接 MQTT 客户端: {}", clientId);
+        }
+        return client;
+    }
 
     public void sub(ConfigMqtt configMqtt) throws MqttException {
-        //创建mqtt连接url
-        StringBuffer url = new StringBuffer();
-        url.append("tcp://").append(configMqtt.getIp()).append(":").append(configMqtt.getHost());
-        //根据url从连接缓存池中拿去连接对象
-        MqttClient client = mqttClients.get(url.toString());
-        //连接对象是否为空或异常
-        if(ObjectUtils.isEmpty(client)||!client.isConnected()){
-            //mqtt连接参数
-            MqttConnectOptions options = new MqttConnectOptions();
-            //用户名
-            options.setUserName(configMqtt.getUsername());
-            //密码
-            options.setPassword(configMqtt.getPassword().toCharArray());
-            // 设置超时时间 单位为秒
-            options.setConnectionTimeout(60);
-            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
-            options.setKeepAliveInterval(60);
-            //清除会话
-            options.setCleanSession(false);
-// 由于 MqttConnectOptions 类没有 setSessionExpiryInterval 方法,需要移除这行代码
-// options.setSessionExpiryInterval(604800L); // 会话保留7天(60*60*24*7)
-            // 设置自动重连最大延迟时间 单位是毫秒
-            options.setMaxReconnectDelay(128000);
-            // 允许的最大传输中消息
-            options.setMaxInflight(100);
-            //创建连接对象
-            client = new MqttClient(url.toString(), configMqtt.getId(), new MemoryPersistence());
-            //设置回调对象
-            client.setCallback(new MqttClientCallback(configMqtt.getId(),client,configMqtt.getTopic()));
-            //建立连接
-            if (!client.isConnected()) {
-                log.info("{}{}", "@@@@@@@sub时,未连接mqtt", configMqtt.getId());
-                IMqttToken token = client.connectWithResult(options);
-                token.waitForCompletion(); // 等待消息发送完成
-            }else {
-                log.info("{}{}", "%%%%%%%%%sub时,已连接mqtt", configMqtt.getId());
-                client.reconnect();
+        String url = buildMqttUrl(configMqtt);
+        MqttClient client;
+        synchronized (this) {
+            client = mqttClients.get(url);
+            if (client == null || !client.isConnected()) {
+                client = createMqttClient(configMqtt, true);
+                mqttClients.put(url, client);
             }
-            //将连接放入链接缓存池中
-            mqttClients.put(url.toString(),client);
         }
-        log.info("{}{}", ">>>>>>>>>>>订阅并连接mqtt:", client.getClientId());
-        //获取所有主题
         String[] topics = configMqtt.getTopic().split(",");
-        if(topics!=null&&topics.length!=0){
-            for (String topic : topics) {
-                //订阅主题
-                client.subscribe(topic, 2);
+        for (String topicStr : topics) {
+            String trimmed = topicStr.trim();
+            if (trimmed.isEmpty()) continue;
+            try {
+                log.info("MQTT主题订阅成功: {}", trimmed);
+                client.subscribe(trimmed, 2);
+            } catch (MqttException e) {
+                log.error("MQTT主题订阅失败: {}", trimmed, e);
             }
         }
     }
 
-    /**
-     * 发布消息
-     *
-     * @param qos
-     * @param retained:留存
-     */
-    public void publish(ConfigMqtt configMqtt, List<PointData> msgs, int qos, boolean retained){
+    public void publish(ConfigMqtt configMqtt, List<PointData> msgs, int qos, boolean retained) {
         MqttClient mqttClient = null;
         try {
             mqttClient = getMqttClient(configMqtt);
-            for (PointData msg : msgs) {
-                MqttMessage message = new MqttMessage();
-                message.setPayload(JSON.toJSON(msg).toString().getBytes());
-                message.setQos(qos);
-                message.setRetained(retained);
-                String[] topics = configMqtt.getTopic().split(",");
-                if(topics!=null&&topics.length!=0){
-                    for (String topic : topics) {
-                        MqttTopic mqttTopic = mqttClient.getTopic(topic);
-                        if (null == mqttTopic) {
-                            log.error("topic is not exist");
-                        }
-                        MqttDeliveryToken token;//Delivery:配送
-                        synchronized (mqttTopic) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
-                            try {
-                                token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
-                                token.waitForCompletion(1000L);
-                            } catch (MqttPersistenceException e) {
-                                e.printStackTrace();
-                            } catch (MqttException e) {
-                                e.printStackTrace();
-                            }
+            if (mqttClient == null) {
+                log.error("MQTT客户端获取失败");
+                return;
+            }
+
+            String clientId = mqttClient.getClientId();
+            String[] topics = configMqtt.getTopic().split(",");
+            for (String topicStr : topics) {
+                String topicName = topicStr.trim();
+                if (topicName.isEmpty()) continue;
+
+                MqttTopic topic = mqttClient.getTopic(topicName);
+                if (topic == null) {
+                    log.warn("topic is not exist: {}", topicName);
+                    continue;
+                }
+                for (PointData msg : msgs) {
+                    if (msg == null) {
+                        log.warn("跳过空消息");
+                        continue;
+                    }
+                    Object json = JSON.toJSON(msg);
+                    if (json == null) {
+                        log.warn("序列化失败: {}", msg);
+                        continue;
+                    }
+                    MqttMessage message = new MqttMessage(json.toString().getBytes(StandardCharsets.UTF_8));
+                    message.setQos(qos);
+                    message.setRetained(retained);
+
+                    String lockKey = getLockKey(topicName, clientId);
+                    synchronized (getOrCreateLock(lockKey)) {
+                        try {
+                            topic.publish(message);
+                        } catch (MqttException e) {
+                            log.error("MQTT消息发布失败: {}", topicName, e);
                         }
                     }
                 }
             }
         } catch (MqttException e) {
-            e.printStackTrace();
-        }finally {
-            try {
-                mqttClient.disconnect();
-            } catch (MqttException e) {
-                e.printStackTrace();
-            }
+            log.error("MQTT发布消息异常", e);
+        } finally {
+            disconnectQuietly(mqttClient);
         }
     }
 
-
-    public Boolean testConn(ConfigMqtt configMqtt){
-        MqttConnectOptions options = new MqttConnectOptions();
-        options.setUserName(configMqtt.getUsername());
-        options.setPassword(configMqtt.getPassword().toCharArray());
-        options.setConnectionTimeout(60);
-        options.setKeepAliveInterval(60);
-        options.setCleanSession(false);
-// 由于 MqttConnectOptions 类没有 setSessionExpiryInterval 方法,需要移除这行代码
-// options.setSessionExpiryInterval(604800L); // 会话保留7天(60*60*24*7)
-        options.setAutomaticReconnect(true);
-        StringBuffer url = new StringBuffer();
-        url.append("tcp://").append(configMqtt.getIp()).append(":").append(configMqtt.getHost());
+    public Boolean testConn(ConfigMqtt configMqtt) {
         MqttClient client = null;
         try {
-            client = new MqttClient(url.toString(), configMqtt.getId(), new MemoryPersistence());
-            client.setCallback(new MqttClientCallback(configMqtt.getId(),client,configMqtt.getTopic()));
-            IMqttToken token = client.connectWithResult(options);
-            token.waitForCompletion(); // 等待消息发送完成
-            MqttTopic mqttTopic = client.getTopic(configMqtt.getTopic());
-            if(ObjectUtils.isEmpty(mqttTopic)){
-                return false;
-            }
+            client = createMqttClient(configMqtt, true);
+            return true;
         } catch (Exception e) {
+            log.error("测试连接失败", e);
             return false;
-        }finally {
-            if(client!=null) {
-                try {
-                    client.disconnect();
-                } catch (MqttException e) {
-                    e.printStackTrace();
-                }
-            }
+        } finally {
+            disconnectQuietly(client);
+            closeQuietly(client);
         }
-        return true;
     }
 
-    //项目启动时对所有需要订阅的mqtt连接进行订阅
     @Override
-    public void run(ApplicationArguments args){
-        LambdaQueryWrapper<ConfigMqtt> eq = new LambdaQueryWrapper<ConfigMqtt>().ne(ConfigMqtt::getPushOrSub,"0");
+    public void run(ApplicationArguments args) {
+        LambdaQueryWrapper<ConfigMqtt> eq = new LambdaQueryWrapper<ConfigMqtt>().ne(ConfigMqtt::getPushOrSub, "1");
         List<ConfigMqtt> configMqtts = configMqttMapper.selectList(eq);
         for (ConfigMqtt configMqtt : configMqtts) {
             try {
                 sub(configMqtt);
+                log.info("启动订阅成功: {}", configMqtt.getId());
             } catch (MqttException e) {
-                e.printStackTrace();
+                log.error("启动订阅失败: {}", configMqtt.getId(), e);
             }
         }
     }
 
     /**
      * C端自动化堆垛清理位置,回传坯号,推送消息
-     * @param configMqttMapper
-     * @param map
-     * @param topicInfo
      */
     public void pushCData(ConfigMqttMapper configMqttMapper, Map<String, Object> map, String topicInfo){
-        ConfigMqtt configMqtt = configMqttMapper.selectOne(new LambdaQueryWrapper<ConfigMqtt>().like(ConfigMqtt::getTopic,topicInfo).eq(ConfigMqtt::getPushOrSub,"0"));
+        ConfigMqtt configMqtt = configMqttMapper.selectOne(
+                new LambdaQueryWrapper<ConfigMqtt>()
+                        .like(ConfigMqtt::getTopic, topicInfo)
+                        .eq(ConfigMqtt::getPushOrSub, "0")
+        );
+        if (configMqtt == null) {
+            log.error("找不到对应的 MQTT 配置");
+            return;
+        }
+
         MqttClient mqttClient = null;
         try {
-            mqttClient = getMqttClient11(configMqtt);
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
-        MqttTopic topic = mqttClient.getTopic(topicInfo);
-        MqttMessage message = new MqttMessage();
-        message.setPayload(JSON.toJSON(map).toString().getBytes());
-        message.setQos(2);
-        message.setRetained(true);
-        if (null == topic) {
-            log.error("topic is not exist");
-        }else{
-            MqttDeliveryToken token;//Delivery:配送
-            synchronized (topic) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
+            mqttClient = getPushMqttClient(configMqtt);
+            if (mqttClient == null) {
+                log.error("MQTT客户端获取失败");
+                return;
+            }
+
+            MqttTopic topic = mqttClient.getTopic(topicInfo);
+            if (topic == null) {
+                log.error("topic is not exist");
+                return;
+            }
+
+            MqttMessage message = new MqttMessage(JSON.toJSONString(map).getBytes(StandardCharsets.UTF_8));
+            message.setQos(2);
+            message.setRetained(true);
+
+            Object lock = getOrCreateLock(getLockKey(topicInfo, mqttClient.getClientId()));
+            synchronized (lock) {
+                MqttDeliveryToken token;
                 try {
-                    token = topic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
+                    token = topic.publish(message);
                     token.waitForCompletion(1000L);
-                } catch (MqttPersistenceException e) {
-                    e.printStackTrace();
                 } catch (MqttException e) {
-                    e.printStackTrace();
+                    log.error("MQTT消息发送失败", e);
                 }
             }
+        } catch (MqttException e) {
+            log.error("获取 MQTT 客户端失败", e);
+        } finally {
+            disconnectQuietly(mqttClient);
         }
     }
 
-
-    public MqttClient getMqttClient11(ConfigMqtt configMqtt) throws MqttException {
-        MqttConnectOptions options = new MqttConnectOptions();
-        // 配置相关选项,如用户名、密码等...
-        options.setUserName(configMqtt.getUsername());
-        options.setPassword(configMqtt.getPassword().toCharArray());
-        // ...其他配置...
-        // 设置超时时间 单位为秒
-        options.setConnectionTimeout(60);
-        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
-        options.setKeepAliveInterval(60);
-        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
-        options.setCleanSession(false);
-// 由于 MqttConnectOptions 类没有 setSessionExpiryInterval 方法,需要移除这行代码
-// options.setSessionExpiryInterval(604800L); // 会话保留7天(60*60*24*7)
-        // 开启自动重连功能; 方法会判断这个参数
-        options.setAutomaticReconnect(true);
-        // 设置自动重连最大延迟时间 单位是毫秒
-        options.setMaxReconnectDelay(128000);
-        // 允许的最大传输中消息
-        options.setMaxInflight(100);
-        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
-        options.setWill("willTopic", "offline".getBytes(), 2, false);
-
-        StringBuffer url = new StringBuffer();
-        url.append("tcp://").append(configMqtt.getIp()).append(":").append(configMqtt.getHost());
-        MqttClient client = new MqttClient(url.toString(), configMqtt.getId(), new MemoryPersistence());
+    public MqttClient getPushMqttClient(ConfigMqtt configMqtt) throws MqttException {
+        MqttConnectOptions options = createDefaultOptions(configMqtt);
+        options.setCleanSession(true); // 修改 CleanSession 设置
+        String url = buildMqttUrl(configMqtt);
+        MqttClient client = new MqttClient(url, configMqtt.getId(), new MemoryPersistence());
+        client.setCallback(new MqttClientCallback(configMqtt.getId(), client, configMqtt.getTopic()));
         IMqttToken token = client.connectWithResult(options);
         token.waitForCompletion();
         return client;
     }
+
+    // 工具方法:安静地断开连接
+    private void disconnectQuietly(MqttClient client) {
+        if (client != null && client.isConnected()) {
+            try {
+                client.disconnect();
+            } catch (MqttException e) {
+                log.warn("MQTT断开连接失败", e);
+            }
+        }
+    }
+
+    // 工具方法:安静地关闭客户端
+    private void closeQuietly(MqttClient client) {
+        if (client != null) {
+            try {
+                client.close();
+            } catch (MqttException e) {
+                log.warn("MQTT客户端关闭失败", e);
+            }
+        }
+    }
+
+    // 获取或创建锁对象
+    private Object getOrCreateLock(String key) {
+        return lockMap.computeIfAbsent(key, k -> new Object());
+    }
+
+    // 构建锁键
+    private String getLockKey(String topic, String clientId) {
+        return String.format("%s%s", topic, clientId);
+    }
 }