Преглед на файлове

新增抓取生产消耗数据

lingpeng.li преди 6 месеца
родител
ревизия
bbf10a539a

+ 39 - 24
jeecg-module-conn/src/main/java/org/jeecg/modules/push/utils/MqttClientCallback.java

@@ -3,9 +3,6 @@ package org.jeecg.modules.push.utils;
 import com.alibaba.fastjson.JSON;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import lombok.extern.slf4j.Slf4j;
-import net.sf.ezmorph.object.DateMorpher;
-import net.sf.json.JSONObject;
-import net.sf.json.util.JSONUtils;
 import org.eclipse.paho.client.mqttv3.*;
 import org.jeecg.common.util.SpringContextHolder;
 import org.jeecg.modules.billetActual.entity.BilletBasicInfo;
@@ -14,9 +11,11 @@ import org.jeecg.modules.connConfig.configMqtt.entity.MqttMsg;
 import org.jeecg.modules.connConfig.configMqtt.mapper.MqttMsgMapper;
 import org.jeecg.modules.heatsActuals.entity.HeatsActuals;
 import org.jeecg.modules.heatsActuals.service.IHeatsActualsService;
+import org.jeecg.modules.productionConsume.entity.ProductionConsumeDTO;
+import org.jeecg.modules.productionConsume.service.IProductionConsumeService;
 import org.springframework.data.mongodb.core.MongoTemplate;
 
-import java.util.*;
+import java.util.Date;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -54,6 +53,8 @@ public class MqttClientCallback implements MqttCallback, MqttCallbackExtended{
 
     private IHeatsActualsService heatsActualsService = SpringContextHolder.getBean(IHeatsActualsService.class);
 
+    private IProductionConsumeService productionConsumeService = SpringContextHolder.getBean(IProductionConsumeService.class);
+
 
     // 使用原子类AtomicInteger保证在多线程环境下计数准确
     private AtomicInteger messageCount = new AtomicInteger(0);
@@ -113,28 +114,42 @@ public class MqttClientCallback implements MqttCallback, MqttCallbackExtended{
             mqttMsg.setTopic(topic);
             mqttMsgMapper.delete(new LambdaQueryWrapper<MqttMsg>().eq(MqttMsg::getTopic, topic));
             mqttMsgMapper.insert(mqttMsg);
-            JSONObject jsonObject = JSONObject.fromObject(message.toString());
-            // 炉次实绩
-            if (topic.contains("trace/performance/converter/add")) {
-                try {
-                    HeatsActuals heatsActuals = JSON.parseObject(jsonObject.toString(), HeatsActuals.class);
-                    heatsActualsService.addC(heatsActuals);
-                } catch (Exception e) {
-                    // 记录日志或者进行其他错误处理
-                    log.info("{}{}", "mqtt处理炉次实绩消息出错: ", e.getMessage());
+            String payload = new String(message.getPayload());
+
+            TopicType topicType = null;
+            for (TopicType type : TopicType.values()) {
+                if (topic.contains(type.getTopicValue())) {
+                    topicType = type;
+                    break;
                 }
-                return;
             }
-            // 钢坯实绩内容
-            if (topic.contains("trace/performance/billet/add")) {
-                try {
-                    BilletBasicInfo billetBasicInfo = JSON.parseObject(jsonObject.toString(), BilletBasicInfo.class);
-                    billetBasicInfoService.addC(billetBasicInfo);
-                } catch (Exception e) {
-                    // 记录日志或者进行其他错误处理
-                    log.info("{}{}", "mqtt处理钢坯实绩消息出错: ", e.getMessage());
-                }
-                return;
+            switch (topicType) {
+                case TRACE_PERFORMANCE_CONVERTER_ADD:
+                    try {
+                        HeatsActuals heatsActuals = JSON.parseObject(payload, HeatsActuals.class);
+                        heatsActualsService.addC(heatsActuals);
+                    } catch (Exception e) {
+                        log.info("{}{}", "mqtt处理炉次实绩消息出错: ", e.getMessage());
+                    }
+                    return;
+                case TRACE_PERFORMANCE_BILLET_ADD:
+                    try {
+                        BilletBasicInfo billetBasicInfo = JSON.parseObject(payload, BilletBasicInfo.class);
+                        billetBasicInfoService.addC(billetBasicInfo);
+                    } catch (Exception e) {
+                        log.info("{}{}", "mqtt处理钢坯基础消息出错: ", e.getMessage());
+                    }
+                    return;
+                case TRACE_DEVICEINFORMATION_CURRENT_AOWER:
+                    try {
+                        ProductionConsumeDTO productionConsume = JSON.parseObject(payload, ProductionConsumeDTO.class);
+                        productionConsumeService.addC(productionConsume);
+                    } catch (Exception e) {
+                        log.info("{}{}", "mqtt处理生产消耗消息出错: ", e.getMessage());
+                    }
+                    return;
+                default:
+                    break;
             }
             log.info("mqtt到目前为止,已接收消息的总次数为: {}", messageCount.get());
         } catch (Exception e) {

+ 25 - 10
jeecg-module-conn/src/main/java/org/jeecg/modules/push/utils/MqttClientUtil.java

@@ -43,11 +43,14 @@ public class MqttClientUtil implements ApplicationRunner {
 
 
     public void sub(ConfigMqtt configMqtt) throws MqttException {
+        log.info("进入到mqtt的订阅!");
         //创建mqtt连接url
         StringBuffer url = new StringBuffer();
         url.append("tcp://").append(configMqtt.getIp()).append(":").append(configMqtt.getHost());
+        log.info("MQTT服务器的配置地址为: {}", url);
         //根据url从连接缓存池中拿去连接对象
         MqttClient client = mqttClients.get(url.toString());
+        log.info("MQTT的连接对象为: {}", client);
         //连接对象是否为空或异常
         if(ObjectUtils.isEmpty(client)||!client.isConnected()){
             //mqtt连接参数
@@ -56,32 +59,42 @@ public class MqttClientUtil implements ApplicationRunner {
             options.setUserName(configMqtt.getUsername());
             //密码
             options.setPassword(configMqtt.getPassword().toCharArray());
-            //连接超时时间
-            options.setConnectionTimeout(100);
-            //心跳检测频率
+            // 设置超时时间 单位为秒
+            options.setConnectionTimeout(60);
+            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
             options.setKeepAliveInterval(60);
             //清除会话
             options.setCleanSession(true);
-            //自动重连
-            options.setAutomaticReconnect(false);
+            // 设置自动重连最大延迟时间 单位是毫秒
+            options.setMaxReconnectDelay(128000);
+            // 允许的最大传输中消息
+            options.setMaxInflight(100);
+            // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
+            // options.setWill("willTopic", "offline".getBytes(), 0, false);
             //创建连接对象
             client = new MqttClient(url.toString(), configMqtt.getId(), new MemoryPersistence());
-            // 订阅主题,设置QoS级别为1
-            int qos = 2;
-            client.subscribe("topic", qos);
+            log.info("创建的新的mqtt的连接对象为: {}", client);
             //设置回调对象
             client.setCallback(new MqttClientCallback(configMqtt.getId(),client,configMqtt.getTopic()));
             //建立连接
-            client.connect(options);
+            if (!client.isConnected()) {
+                log.info("{}{}", "@@@@@@@sub时,未连接mqtt", configMqtt.getId());
+                IMqttToken token = client.connectWithResult(options);
+                token.waitForCompletion(); // 等待消息发送完成
+            }else {
+                log.info("{}{}", "%%%%%%%%%sub时,已连接mqtt", configMqtt.getId());
+                client.reconnect();
+            }
             //将连接放入链接缓存池中
             mqttClients.put(url.toString(),client);
         }
+        log.info("{}{}", ">>>>>>>>>>>订阅并连接mqtt:", client.getClientId());
         //获取所有主题
         String[] topics = configMqtt.getTopic().split(",");
         if(topics!=null&&topics.length!=0){
             for (String topic : topics) {
                 //订阅主题
-                client.subscribe(topic);
+                client.subscribe(topic, 1);
             }
         }
     }
@@ -172,10 +185,12 @@ public class MqttClientUtil implements ApplicationRunner {
     //项目启动时对所有需要订阅的mqtt连接进行订阅
     @Override
     public void run(ApplicationArguments args){
+        log.info("mqtt开始进行连接!");
         LambdaQueryWrapper<ConfigMqtt> eq = new LambdaQueryWrapper<ConfigMqtt>().ne(ConfigMqtt::getPushOrSub,"0");
         List<ConfigMqtt> configMqtts = configMqttMapper.selectList(eq);
         for (ConfigMqtt configMqtt : configMqtts) {
             try {
+                log.info("当前mqtt IP配置为: {}端口配置为: {}, 主题配置为: {},用户配置为: {}",configMqtt.getIp(),configMqtt.getHost(),configMqtt.getTopic(), configMqtt.getUsername());
                 sub(configMqtt);
             } catch (MqttException e) {
                 e.printStackTrace();

+ 31 - 0
jeecg-module-conn/src/main/java/org/jeecg/modules/push/utils/TopicType.java

@@ -0,0 +1,31 @@
+package org.jeecg.modules.push.utils;
+
+/**
+ * 主题类型
+ */
+public enum TopicType {
+    /**
+     *  炉次实绩
+     */
+    TRACE_PERFORMANCE_CONVERTER_ADD("trace/performance/converter/add"),
+    /**
+     * 钢坯基础信息
+     */
+    TRACE_PERFORMANCE_BILLET_ADD("trace/performance/billet/add"),
+
+    /**
+     *  生产消耗
+     */
+    TRACE_DEVICEINFORMATION_CURRENT_AOWER("trace/deviceinformation/current/aower");
+
+
+    private String topicValue;
+
+    TopicType(String topicValue) {
+        this.topicValue = topicValue;
+    }
+
+    public String getTopicValue() {
+        return topicValue;
+    }
+}

+ 62 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/productionConsume/entity/ProductionConsume.java

@@ -0,0 +1,62 @@
+package org.jeecg.modules.productionConsume.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.experimental.Accessors;
+import org.jeecgframework.poi.excel.annotation.Excel;
+import org.springframework.format.annotation.DateTimeFormat;
+
+import java.math.BigDecimal;
+import java.util.Date;
+
+@Data
+@TableName("production_consume")
+@Accessors(chain = true)
+@EqualsAndHashCode(callSuper = false)
+@ApiModel(value="production_consume对象", description="生产消耗")
+public class ProductionConsume {
+
+    private static final long serialVersionUID = 1L;
+
+    /**主键*/
+    @TableId(type = IdType.ASSIGN_ID)
+    @ApiModelProperty(value = "主键")
+    private String id;
+
+    /**创建日期*/
+    @JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
+    @DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss")
+    @ApiModelProperty(value = "读数时间")
+    private Date createTime;
+
+    /**设备名称*/
+    @Excel(name = "设备名称", width = 15)
+    @ApiModelProperty(value = "设备名称")
+    private String deviceTitle;
+
+
+    /**区域名称*/
+    @Excel(name = "区域名称", width = 15)
+    @ApiModelProperty(value = "区域名称")
+    private String regionTitle;
+
+    /**有功功率*/
+    @Excel(name = "有功功率", width = 15)
+    @ApiModelProperty(value = "有功功率")
+    private BigDecimal activePower;
+
+
+    /**消耗功率*/
+    @Excel(name = "消耗功率", width = 15)
+    @ApiModelProperty(value = "消耗功率")
+    private BigDecimal consumePower;
+
+
+
+}

+ 38 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/productionConsume/entity/ProductionConsumeDTO.java

@@ -0,0 +1,38 @@
+package org.jeecg.modules.productionConsume.entity;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import org.springframework.format.annotation.DateTimeFormat;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.Map;
+
+@Data
+public class ProductionConsumeDTO {
+
+
+    /**
+     * 创建日期
+     */
+    @JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
+    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    @ApiModelProperty(value = "读数时间")
+    private Date createTime;
+
+    /**
+     * 设备名称
+     */
+    @ApiModelProperty(value = "设备名称")
+    private Map<String, String> deviceTitle;
+
+
+    /**
+     * 有功功率
+     */
+    @ApiModelProperty(value = "有功功率")
+    private BigDecimal activePower;
+
+
+}

+ 57 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/productionConsume/entity/ProductionDevice.java

@@ -0,0 +1,57 @@
+package org.jeecg.modules.productionConsume.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.experimental.Accessors;
+import org.jeecgframework.poi.excel.annotation.Excel;
+
+import java.io.Serializable;
+
+@Data
+@TableName("production_device")
+@Accessors(chain = true)
+@EqualsAndHashCode(callSuper = false)
+@ApiModel(value = "production_device对象", description = "生产消耗设备关系表")
+public class ProductionDevice implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 主键
+     */
+    @TableId(type = IdType.ASSIGN_ID)
+    @ApiModelProperty(value = "主键")
+    private String id;
+    /**
+     * 区域名称
+     */
+    @Excel(name = "区域名称", width = 15)
+    @ApiModelProperty(value = "区域名称")
+    private String deviceRegion;
+    /**
+     * 设备名称
+     */
+    @Excel(name = "设备名称", width = 15)
+    @ApiModelProperty(value = "设备名称")
+    private String deviceTitle;
+    /**
+     * 变量名称
+     */
+    @Excel(name = "变量名称", width = 15)
+    @ApiModelProperty(value = "变量名称")
+    private String deviceVariable;
+
+    /**
+     * 备注
+     */
+    @Excel(name = "备注", width = 15)
+    @ApiModelProperty(value = "备注")
+    private String remark;
+
+
+}

+ 7 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/productionConsume/mapper/ProductionConsumeMapper.java

@@ -0,0 +1,7 @@
+package org.jeecg.modules.productionConsume.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.jeecg.modules.productionConsume.entity.ProductionConsume;
+
+public interface ProductionConsumeMapper extends BaseMapper<ProductionConsume> {
+}

+ 7 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/productionConsume/mapper/ProductionDeviceMapper.java

@@ -0,0 +1,7 @@
+package org.jeecg.modules.productionConsume.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.jeecg.modules.productionConsume.entity.ProductionDevice;
+
+public interface ProductionDeviceMapper extends BaseMapper<ProductionDevice> {
+}

+ 10 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/productionConsume/service/IProductionConsumeService.java

@@ -0,0 +1,10 @@
+package org.jeecg.modules.productionConsume.service;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import org.jeecg.modules.productionConsume.entity.ProductionConsume;
+import org.jeecg.modules.productionConsume.entity.ProductionConsumeDTO;
+
+public interface IProductionConsumeService extends IService<ProductionConsume> {
+
+    void addC(ProductionConsumeDTO productionConsumeDTO);
+}

+ 81 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/productionConsume/service/impl/ProductionConsumeServiceImpl.java

@@ -0,0 +1,81 @@
+package org.jeecg.modules.productionConsume.service.impl;
+
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.jeecg.modules.device.mapper.DeviceInformationMapper;
+import org.jeecg.modules.productionConsume.entity.ProductionConsume;
+import org.jeecg.modules.productionConsume.entity.ProductionConsumeDTO;
+import org.jeecg.modules.productionConsume.mapper.ProductionConsumeMapper;
+import org.jeecg.modules.productionConsume.service.IProductionConsumeService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+@Service
+@Slf4j
+public class ProductionConsumeServiceImpl extends ServiceImpl<ProductionConsumeMapper, ProductionConsume> implements IProductionConsumeService {
+
+    @Autowired
+    private DeviceInformationMapper deviceInformationMapper;
+
+    @Autowired
+    private ProductionConsumeMapper productionConsumeMapper;
+
+    @Override
+    public void addC(ProductionConsumeDTO productionConsumeDTO) {
+        try {
+            insertProductionConsume(productionConsumeDTO);
+        } catch (Exception e) {
+            log.error("插入ProductionConsume数据出现异常", e);
+            throw new RuntimeException("插入ProductionConsume数据失败", e);
+        }
+
+    }
+
+    private void insertProductionConsume(ProductionConsumeDTO productionConsumeDTO) {
+
+        ProductionConsume productionConsume = new ProductionConsume();
+        if (productionConsumeDTO == null || productionConsumeDTO.getDeviceTitle() == null) {
+            throw new IllegalArgumentException("生产消耗数据或者设备名称不能为空!");
+        }
+
+        // 获取 deviceTitle 中的键值对
+        Map<String, String> deviceTitleMap = productionConsumeDTO.getDeviceTitle();
+
+        if (deviceTitleMap == null || deviceTitleMap.isEmpty()) {
+            throw new IllegalArgumentException("生产消耗数据中的设备信息不能为空!");
+        }
+
+        for (Map.Entry<String, String> entry : deviceTitleMap.entrySet()) {
+            String regionTitle = entry.getKey(); // 对应 RegionTitle
+            String deviceTitle = entry.getValue(); // 对应 DeviceTitle
+
+            LambdaQueryWrapper<ProductionConsume> consumeQueryWrapper = new LambdaQueryWrapper<ProductionConsume>()
+                    .eq(ProductionConsume::getRegionTitle, regionTitle)
+                    .eq(ProductionConsume::getDeviceTitle, deviceTitle)
+                    .orderByDesc(ProductionConsume::getCreateTime)
+                    .last("limit 1");
+
+            ProductionConsume lastConsume = productionConsumeMapper.selectOne(consumeQueryWrapper);
+
+            productionConsume.setCreateTime(productionConsumeDTO.getCreateTime());
+            productionConsume.setActivePower(productionConsumeDTO.getActivePower());
+            productionConsume.setRegionTitle(regionTitle);
+            productionConsume.setDeviceTitle(deviceTitle);
+
+            // 计算 consumePower
+            if (lastConsume != null && productionConsumeDTO.getActivePower() != null && lastConsume.getActivePower() != null) {
+                productionConsume.setConsumePower(productionConsumeDTO.getActivePower().subtract(lastConsume.getActivePower()));
+            } else if (lastConsume == null) {
+                productionConsume.setConsumePower(BigDecimal.ZERO);
+            }
+
+            // 插入数据
+            baseMapper.insert(productionConsume);
+        }
+    }
+}

+ 52 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/utils/ProductPostgreSQLUtil.java

@@ -0,0 +1,52 @@
+package org.jeecg.modules.utils;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public class ProductPostgreSQLUtil {
+
+    private static HikariDataSource dataSource;
+
+    static {
+        HikariConfig config = new HikariConfig();
+        config.setJdbcUrl("jdbc:postgresql://192.168.1.253:5432/eiop?useUnicode=true&ssl=false");
+        config.setUsername("lgjt01");
+        config.setPassword("lgjt01");
+        config.setDriverClassName("org.postgresql.Driver");
+        config.setMaximumPoolSize(10); // 最大连接数
+        config.setMinimumIdle(5);      // 最小空闲连接数
+        config.setIdleTimeout(30000);  // 空闲连接超时时间(毫秒)
+        config.setConnectionTimeout(30000); // 连接超时时间(毫秒)
+        config.setMaxLifetime(1800000); // 连接最大生命周期(毫秒)
+
+        dataSource = new HikariDataSource(config);
+    }
+
+    public static Connection getConnection() throws SQLException {
+        return dataSource.getConnection();
+    }
+
+    public static void close(Connection connection) {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    // 关闭连接池
+    public static void shutdown() {
+        if (dataSource != null) {
+            dataSource.close();
+        }
+    }
+
+    private ProductPostgreSQLUtil() {
+        // 私有构造函数,防止实例化
+    }
+}

+ 165 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/watch/ProductPostgreSQLWatch.java

@@ -0,0 +1,165 @@
+package org.jeecg.modules.watch;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.jeecg.common.util.DateUtils;
+import org.jeecg.modules.productionConsume.entity.ProductionConsume;
+import org.jeecg.modules.productionConsume.entity.ProductionDevice;
+import org.jeecg.modules.productionConsume.mapper.ProductionConsumeMapper;
+import org.jeecg.modules.productionConsume.mapper.ProductionDeviceMapper;
+import org.jeecg.modules.utils.PostgreSQLUtil;
+import org.jeecg.modules.utils.ProductPostgreSQLUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Timestamp;
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Slf4j
+@EnableAsync
+@Component
+public class ProductPostgreSQLWatch {
+
+    @Autowired
+    ProductionConsumeMapper productionConsumeMapper;
+
+    @Autowired
+    ProductionDeviceMapper productionDeviceMapper;
+
+    @Async
+    @Scheduled(fixedDelay = 1000 * 60 * 10)
+    public void getProductionData() {
+        Connection connection = null;
+        PreparedStatement statement = null;
+        ResultSet resultSet = null;
+        log.info("抓取生产消耗数据");
+        try {
+            // 建立链接
+            connection = ProductPostgreSQLUtil.getConnection();
+            ;
+            boolean isClosed = connection.isClosed();
+            LambdaQueryWrapper<ProductionDevice> deviceLambdaQueryWrapper = new LambdaQueryWrapper<>();
+            List<ProductionDevice> devices = productionDeviceMapper.selectList(deviceLambdaQueryWrapper);
+            // 遍历所有设备
+            for (ProductionDevice device : devices) {
+                // 查询 productionConsume 表中 该设备下的createTime 最新的一条记录
+                LambdaQueryWrapper<ProductionConsume> productionLambdaQueryWrapper = new LambdaQueryWrapper<>();
+                productionLambdaQueryWrapper.eq(ProductionConsume::getDeviceTitle, device.getDeviceTitle());
+                productionLambdaQueryWrapper
+                        .orderByDesc(ProductionConsume::getCreateTime) // 按 createTime 降序排序
+                        .last("LIMIT 1"); // 只取一条记录
+                // 执行查询
+                ProductionConsume latestData = productionConsumeMapper.selectOne(productionLambdaQueryWrapper);
+                // 获取最新记录的 createTime
+                Date latestCreateTime1 = null;
+                if (latestData != null) {
+                    latestCreateTime1 = latestData.getCreateTime();
+                } else {
+                    // 如果没有找到记录,设置为当前时间前30分钟
+                    Calendar calendar = Calendar.getInstance();
+                    calendar.add(Calendar.MINUTE, -30); // 当前时间减去30分钟
+                    latestCreateTime1 = calendar.getTime();
+                }
+
+                // 使用 split 方法分割字符串
+                String[] strArray = device.getDeviceVariable().split(",");
+
+                 // 将数组转换为 List
+                List<String> list = Arrays.asList(strArray);
+
+                 // 将 List 转换为正确的 SQL IN 子句格式
+                String deviceSnInClause = list.stream()
+                        .map(String::trim) // 去掉可能的空格
+                        .map(sn -> "'" + sn + "'") // 每个值加上单引号
+                        .collect(Collectors.joining(",")); // 用逗号拼接
+
+                // 日期转字符串
+                String formattedTime = DateUtils.date2Str(latestCreateTime1, DateUtils.datetimeFormat.get());
+
+                // 构造 SQL
+                String sql = String.format("select key,ts,upload_time,device_sn from sys_device_delivery_t\n" +
+                        "where device_sn in (%s) and \"key\" = 'ep' and upload_time > '%s' " +
+                        "order by upload_time asc", deviceSnInClause, formattedTime);
+                statement = connection.prepareStatement(sql);
+                resultSet = statement.executeQuery();
+
+                // 解析结果集
+                List<Map<String, Object>> resultList = new ArrayList<>();
+                while (resultSet.next()) {
+                    Map<String, Object> row = new HashMap<>();
+                    row.put("key", resultSet.getString("key"));
+                    row.put("ts", resultSet.getBigDecimal("ts"));
+                    row.put("upload_time", resultSet.getTimestamp("upload_time"));
+                    row.put("device_sn", resultSet.getString("device_sn"));
+                    resultList.add(row);
+                }
+
+                // 按 upload_time 分组
+                Map<Timestamp, List<Map<String, Object>>> groupedData = resultList.stream()
+                        .collect(Collectors.groupingBy(row -> (Timestamp) row.get("upload_time")));
+
+                // 对分组后的数据进行处理
+                for (Map.Entry<Timestamp, List<Map<String, Object>>> entry : groupedData.entrySet()) {
+                    ProductionConsume productionConsume = new ProductionConsume();
+                    productionConsume.setRegionTitle(device.getDeviceRegion());
+                    productionConsume.setDeviceTitle(device.getDeviceTitle());
+
+                    Timestamp uploadTime = entry.getKey();
+                    List<Map<String, Object>> group = entry.getValue();
+                    productionConsume.setCreateTime(uploadTime);
+
+                    // 查询最新记录
+                    LambdaQueryWrapper<ProductionConsume> consumeQueryWrapper = new LambdaQueryWrapper<ProductionConsume>()
+                            .eq(ProductionConsume::getRegionTitle, device.getDeviceRegion())
+                            .eq(ProductionConsume::getDeviceTitle, device.getDeviceTitle())
+                            .orderByDesc(ProductionConsume::getCreateTime)
+                            .last("limit 1");
+
+                    ProductionConsume lastConsume = productionConsumeMapper.selectOne(consumeQueryWrapper);
+
+                    log.info("处理分组数据: upload_time = {}, 数据量 = {}", uploadTime, group.size());
+
+                    // 累加 ts 的值,保留为 BigDecimal 类型
+                    BigDecimal totalTs = group.stream()
+                            .map(row -> (BigDecimal) row.get("ts")) // 提取 ts 值为 BigDecimal
+                            .reduce(BigDecimal.ZERO, BigDecimal::add); // 累加
+
+
+                    // 将累加后的值设置到 activePower
+                    productionConsume.setActivePower(totalTs.setScale(2, RoundingMode.HALF_UP));
+
+
+                    // 计算 consumePower
+                    if (lastConsume != null && totalTs != null && lastConsume.getActivePower() != null) {
+                        productionConsume.setConsumePower(totalTs.subtract(lastConsume.getActivePower()).setScale(2, RoundingMode.HALF_UP));
+                    } else if (lastConsume == null) {
+                        productionConsume.setConsumePower(BigDecimal.ZERO);
+                    }
+
+                    // 保存或处理生产消耗记录
+                    productionConsumeMapper.insert(productionConsume);
+                }
+
+                // 关闭连接
+                resultSet.close();
+                statement.close();
+            }
+        } catch (Exception e) {
+            // 关闭连接
+            log.info(e.getMessage());
+        } finally {
+            PostgreSQLUtil.close(connection);
+        }
+    }
+
+
+}