Quellcode durchsuchen

峰平谷通过异步抓取电流功率等数据源信息

lingpeng.li vor 6 Monaten
Ursprung
Commit
424eadb18e

+ 23 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/gatherData/entity/FpgGatherDataMongo.java

@@ -0,0 +1,23 @@
+package org.jeecg.modules.gatherData.entity;
+
+
+import lombok.Data;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.math.BigDecimal;
+import java.util.Date;
+
+@Data
+@Document(collection = "fpgGatherData")
+public class FpgGatherDataMongo {
+
+    private String deviceInformationId;
+    private String deviceRegionId;
+    private String devicePointId;
+    private Date createTime;
+    private Date updateTime;
+    private BigDecimal runCurrent;
+    private BigDecimal activePower;
+    private String fpgModelState;
+    private String eventWarnState;
+}

+ 64 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/utils/ConnectionUtils.java

@@ -0,0 +1,64 @@
+package org.jeecg.modules.utils;
+
+import java.sql.*;
+
+/**
+ * @author llp
+ * date: 2024-11-29 13:20
+ * Description: PostgreSQL 连接工具类
+ */
+public class ConnectionUtils {
+
+    // PostgreSQL JDBC 连接URL
+    private static final String URL = "jdbc:postgresql://182.22.0.182:10102/history?useUnicode=true&ssl=false";
+    private static final String USERNAME = "postgres"; // PostgreSQL 用户名
+    private static final String PASSWORD = "5159273CFBC5";     // PostgreSQL 密码
+
+    /**
+     * 获取连接
+     * */
+    public static Connection getConnection() throws Exception {
+        // 加载 PostgreSQL JDBC 驱动
+        Class.forName("org.postgresql.Driver");
+        return DriverManager.getConnection(URL, USERNAME, PASSWORD);
+    }
+
+    /**
+     * 关闭连接
+     * */
+    public static void closeConnection(Connection conn) {
+        try {
+            if (conn != null) {
+                conn.close();
+            }
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 关闭 SQL 执行对象
+     * */
+    public static void closeStatement(Statement stmt) {
+        try {
+            if (stmt != null) {
+                stmt.close();
+            }
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 关闭结果集
+     * */
+    public static void closeResultSet(ResultSet rs) {
+        try {
+            if (rs != null) {
+                rs.close();
+            }
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 198 - 0
jeecg-module-gather/src/main/java/org/jeecg/modules/watch/MysqlWatch.java

@@ -0,0 +1,198 @@
+package org.jeecg.modules.watch;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.jeecg.modules.device.entity.DeviceInformation;
+import org.jeecg.modules.device.mapper.DeviceInformationMapper;
+import org.jeecg.modules.device.mapper.DeviceRegionMapper;
+import org.jeecg.modules.devicePoint.entity.DevicePoint;
+import org.jeecg.modules.devicePoint.mapper.DevicePointMapper;
+import org.jeecg.modules.gatherData.entity.FpgGatherData;
+import org.jeecg.modules.gatherData.entity.FpgGatherDataMongo;
+import org.jeecg.modules.gatherData.mapper.FpgGatherDataMapper;
+import org.jeecg.modules.utils.ConnectionUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.query.Criteria;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@EnableAsync
+@Component
+public class MysqlWatch {
+
+    @Autowired
+    DeviceRegionMapper deviceRegionMapper;
+
+    @Autowired
+    DeviceInformationMapper deviceInformationMapper;
+
+    @Autowired
+    DevicePointMapper devicePointMapper;
+
+    @Autowired
+    FpgGatherDataMapper fpgGatherDataMapper;
+
+    @Autowired
+    MongoTemplate mongoTemplate;
+
+
+    @Scheduled(cron = "0 0/1 * * * ?")
+    public void getTsData() {
+        Connection connection = null;
+        Statement statement = null;
+        ResultSet resultSet = null;
+        try {
+            connection = ConnectionUtils.getConnection();
+
+            // 用于存放合并后的 FpgGatherData,键为 "DeviceInformationId-CreateTime"
+            Map<String, FpgGatherData> mergedDataMap = new HashMap<>();
+
+            LambdaQueryWrapper<DeviceInformation> informationLambdaQueryWrapper = new LambdaQueryWrapper<>();
+            List<DeviceInformation> devices = deviceInformationMapper.selectList(informationLambdaQueryWrapper);
+
+            // 遍历所有设备
+            for (DeviceInformation device : devices) {
+                LambdaQueryWrapper<DevicePoint> devicePointLambdaQueryWrapper = new LambdaQueryWrapper<>();
+                devicePointLambdaQueryWrapper.eq(DevicePoint::getDeviceId, device.getId());
+                List<DevicePoint> devicePoints = devicePointMapper.selectList(devicePointLambdaQueryWrapper);
+
+                // 查询 FpgGatherData 表中 createTime 最新的一条记录
+                LambdaQueryWrapper<FpgGatherData> fpgGatherDataLambdaQueryWrapper = new LambdaQueryWrapper<>();
+                fpgGatherDataLambdaQueryWrapper
+                        .orderByDesc(FpgGatherData::getCreateTime) // 按 createTime 降序排序
+                        .last("LIMIT 1"); // 只取一条记录
+
+                // 执行查询
+                FpgGatherData latestData = fpgGatherDataMapper.selectOne(fpgGatherDataLambdaQueryWrapper);
+
+                // 获取最新记录的 createTime
+                Date latestCreateTime1 = null;
+                if (latestData != null) {
+                    latestCreateTime1 = latestData.getCreateTime();
+                    System.out.println("最新的 createTime: " + latestCreateTime1);
+                } else {
+                    // 如果没有找到记录,给一个默认时间
+                    latestCreateTime1 = new Date(0); // 默认时间设置为 1970-01-01 00:00:00
+                    System.out.println("没有找到任何记录,使用默认时间: " + latestCreateTime1);
+                }
+
+
+                // 遍历设备点
+                for (DevicePoint point : devicePoints) {
+
+                    // 假设 latestCreateTime 是从 FpgGatherData 中获取的最新 createTime
+                    Date latestCreateTime = latestCreateTime1;
+
+                   // 格式化 latestCreateTime 为 "yyyy-MM-dd HH:mm:ss" 格式的字符串
+                    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+                    String formattedTime = dateFormat.format(latestCreateTime);
+                    String sql = "SELECT * FROM ts_value WHERE tagname='" + point.getPointCode() +
+                            "' AND timestamp > '" + formattedTime + "' ORDER BY timestamp DESC";
+                    statement = connection.createStatement();
+                    resultSet = statement.executeQuery(sql);
+
+                    // 遍历查询到的实时数据
+                    while (resultSet.next()) {
+                        Date timestamp = dateFormat.parse(resultSet.getString("timestamp"));
+                        BigDecimal value = new BigDecimal(resultSet.getString("value"));
+
+                        // 生成分组键
+                        String groupKey = device.getId() + "-" + dateFormat.format(timestamp);
+
+                        // 获取或创建 FpgGatherData
+                        FpgGatherData fpgGatherData = mergedDataMap.getOrDefault(groupKey, new FpgGatherData());
+                        fpgGatherData.setDeviceInformationId(device.getId());
+                        fpgGatherData.setDeviceRegionId(device.getDeviceRegionId());
+                        fpgGatherData.setCreateTime(timestamp);
+                        fpgGatherData.setUpdateTime(timestamp);
+
+                        // 处理 DevicePointId 的拼接
+                        String devicePointIdValue = fpgGatherData.getDevicePointId();
+                        String pointIdStr = point.getId().toString();
+                        if ("Ia".equals(point.getCurrentPower())) {
+                            // "Ia" 的 pointId 放在最前
+                            devicePointIdValue = (devicePointIdValue == null || devicePointIdValue.isEmpty())
+                                    ? pointIdStr
+                                    : pointIdStr + (devicePointIdValue.contains(pointIdStr) ? "" : "," + devicePointIdValue);
+                        } else if ("P".equals(point.getCurrentPower())) {
+                            // "P" 的 pointId 放在最后
+                            devicePointIdValue = (devicePointIdValue == null || devicePointIdValue.isEmpty())
+                                    ? pointIdStr
+                                    : (devicePointIdValue.contains(pointIdStr) ? devicePointIdValue : devicePointIdValue + "," + pointIdStr);
+                        }
+                        fpgGatherData.setDevicePointId(devicePointIdValue);
+
+                        // 处理数据点类型和值的累计
+                        if ("Ia".equals(point.getCurrentPower())) {
+                            fpgGatherData.setRunCurrent(
+                                    (fpgGatherData.getRunCurrent() == null ? BigDecimal.ZERO : fpgGatherData.getRunCurrent()).add(value)
+                            );
+                        } else if ("P".equals(point.getCurrentPower())) {
+                            fpgGatherData.setActivePower(
+                                    (fpgGatherData.getActivePower() == null ? BigDecimal.ZERO : fpgGatherData.getActivePower()).add(value)
+                            );
+                        }
+
+                        // 更新到 Map
+                        mergedDataMap.put(groupKey, fpgGatherData);
+                    }
+                }
+            }
+
+            // 保存合并数据到数据库
+            for (FpgGatherData data : mergedDataMap.values()) {
+                // 检查数据库中是否存在相同 DeviceInformationId 和 CreateTime 的记录
+                FpgGatherData existingData = fpgGatherDataMapper.selectOne(
+                        new LambdaQueryWrapper<FpgGatherData>()
+                                .eq(FpgGatherData::getDeviceInformationId, data.getDeviceInformationId())
+                                .eq(FpgGatherData::getCreateTime, data.getCreateTime())
+                );
+
+                // 如果不存在则插入
+                if (existingData == null) {
+                    fpgGatherDataMapper.insert(data);
+                }
+
+                // 转为 MongoDB 对象并同步
+                FpgGatherDataMongo mongoData = new FpgGatherDataMongo();
+                mongoData.setDeviceInformationId(data.getDeviceInformationId());
+                mongoData.setDeviceRegionId(data.getDeviceRegionId());
+                mongoData.setCreateTime(data.getCreateTime());
+                mongoData.setUpdateTime(data.getUpdateTime());
+                mongoData.setRunCurrent(data.getRunCurrent());
+                mongoData.setActivePower(data.getActivePower());
+
+                Query query = new Query();
+                query.addCriteria(Criteria.where("deviceInformationId").is(data.getDeviceInformationId()))
+                        .addCriteria(Criteria.where("createTime").is(data.getCreateTime()));
+
+                // 如果 MongoDB 中不存在记录则插入
+                FpgGatherDataMongo existingMongoData = mongoTemplate.findOne(query, FpgGatherDataMongo.class, "fpgGatherData");
+                if (existingMongoData == null) {
+                    mongoTemplate.save(mongoData, "fpgGatherData");
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            ConnectionUtils.closeConnection(connection);
+            ConnectionUtils.closeStatement(statement);
+            ConnectionUtils.closeResultSet(resultSet);
+        }
+    }
+
+}