package org.jeecg.modules.watch; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import lombok.extern.slf4j.Slf4j; import org.jeecg.modules.device.entity.DeviceInformation; import org.jeecg.modules.device.mapper.DeviceInformationMapper; import org.jeecg.modules.device.mapper.DeviceRegionMapper; import org.jeecg.modules.devicePoint.entity.DevicePoint; import org.jeecg.modules.devicePoint.mapper.DevicePointMapper; import org.jeecg.modules.gatherData.entity.FpgGatherData; import org.jeecg.modules.gatherData.entity.FpgGatherDataMongo; import org.jeecg.modules.gatherData.mapper.FpgGatherDataMapper; import org.jeecg.modules.peaksAndValleysTimeConfig.entity.SystemVariable; import org.jeecg.modules.peaksAndValleysTimeConfig.service.ISystemVariableService; import org.jeecg.modules.utils.ConnectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @Slf4j @EnableAsync @Component public class MysqlWatch { @Autowired DeviceRegionMapper deviceRegionMapper; @Autowired DeviceInformationMapper deviceInformationMapper; @Autowired DevicePointMapper devicePointMapper; @Autowired FpgGatherDataMapper fpgGatherDataMapper; @Autowired ISystemVariableService systemVariableService; @Autowired MongoTemplate mongoTemplate; @Scheduled(cron = "0 0/1 * * * ?") public void getTsData() { Connection connection = null; Statement statement = null; ResultSet resultSet = null; try { connection = ConnectionUtils.getConnection(); // 用于存放合并后的 FpgGatherData,键为 "DeviceInformationId-CreateTime" Map mergedDataMap = new HashMap<>(); LambdaQueryWrapper informationLambdaQueryWrapper = new LambdaQueryWrapper<>(); List devices = deviceInformationMapper.selectList(informationLambdaQueryWrapper); // 遍历所有设备 for (DeviceInformation device : devices) { LambdaQueryWrapper devicePointLambdaQueryWrapper = new LambdaQueryWrapper<>(); devicePointLambdaQueryWrapper.eq(DevicePoint::getDeviceId, device.getId()); List devicePoints = devicePointMapper.selectList(devicePointLambdaQueryWrapper); // 查询 FpgGatherData 表中 createTime 最新的一条记录 LambdaQueryWrapper fpgGatherDataLambdaQueryWrapper = new LambdaQueryWrapper<>(); fpgGatherDataLambdaQueryWrapper .orderByDesc(FpgGatherData::getCreateTime) // 按 createTime 降序排序 .last("LIMIT 1"); // 只取一条记录 // 执行查询 FpgGatherData latestData = fpgGatherDataMapper.selectOne(fpgGatherDataLambdaQueryWrapper); // 获取最新记录的 createTime Date latestCreateTime1 = null; if (latestData != null) { latestCreateTime1 = latestData.getCreateTime(); log.info("最新的 createTime: " + latestCreateTime1); } else { // 如果没有找到记录,给一个默认时间 latestCreateTime1 = new Date(0); // 默认时间设置为 1970-01-01 00:00:00 log.info("没有找到任何记录,使用默认时间: " + latestCreateTime1); } // 遍历设备点 for (DevicePoint point : devicePoints) { // 假设 latestCreateTime 是从 FpgGatherData 中获取的最新 createTime Date latestCreateTime = latestCreateTime1; // 格式化 latestCreateTime 为 "yyyy-MM-dd HH:mm:ss" 格式的字符串 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String formattedTime = dateFormat.format(latestCreateTime); String sql = "SELECT * FROM ts_value WHERE tagname='" + point.getPointCode() + "' AND timestamp > '" + formattedTime + "' ORDER BY timestamp DESC"; statement = connection.createStatement(); resultSet = statement.executeQuery(sql); // 遍历查询到的实时数据 while (resultSet.next()) { Date timestamp = dateFormat.parse(resultSet.getString("timestamp")); BigDecimal value = new BigDecimal(resultSet.getString("value")); // 生成分组键 String groupKey = device.getId() + "-" + dateFormat.format(timestamp); // 获取或创建 FpgGatherData FpgGatherData fpgGatherData = mergedDataMap.getOrDefault(groupKey, new FpgGatherData()); fpgGatherData.setDeviceInformationId(device.getId()); fpgGatherData.setDeviceRegionId(device.getDeviceRegionId()); fpgGatherData.setCreateTime(timestamp); fpgGatherData.setUpdateTime(timestamp); // 处理 DevicePointId 的拼接 String devicePointIdValue = fpgGatherData.getDevicePointId(); String pointIdStr = point.getId().toString(); if ("Ia".equals(point.getCurrentPower())) { // "Ia" 的 pointId 放在最前 devicePointIdValue = (devicePointIdValue == null || devicePointIdValue.isEmpty()) ? pointIdStr : pointIdStr + (devicePointIdValue.contains(pointIdStr) ? "" : "," + devicePointIdValue); } else if ("P".equals(point.getCurrentPower())) { // "P" 的 pointId 放在最后 devicePointIdValue = (devicePointIdValue == null || devicePointIdValue.isEmpty()) ? pointIdStr : (devicePointIdValue.contains(pointIdStr) ? devicePointIdValue : devicePointIdValue + "," + pointIdStr); } fpgGatherData.setDevicePointId(devicePointIdValue); // 处理数据点类型和值的累计 if ("Ia".equals(point.getCurrentPower())) { fpgGatherData.setRunCurrent( (fpgGatherData.getRunCurrent() == null ? BigDecimal.ZERO : fpgGatherData.getRunCurrent()).add(value) ); } else if ("P".equals(point.getCurrentPower())) { fpgGatherData.setActivePower( (fpgGatherData.getActivePower() == null ? BigDecimal.ZERO : fpgGatherData.getActivePower()).add(value) ); } // 更新到 Map mergedDataMap.put(groupKey, fpgGatherData); } } } // 保存合并数据到数据库 for (FpgGatherData data : mergedDataMap.values()) { // 检查数据库中是否存在相同 DeviceInformationId 和 CreateTime 的记录 FpgGatherData existingData = fpgGatherDataMapper.selectOne( new LambdaQueryWrapper() .eq(FpgGatherData::getDeviceInformationId, data.getDeviceInformationId()) .eq(FpgGatherData::getCreateTime, data.getCreateTime()) ); // 根据采集到的数据 创建时间,跟峰平谷配置的时间,判断属于哪个时段(尖、峰、平、谷) QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("variable_address", "sys_run_current_limit"); queryWrapper.eq("status", 0); SystemVariable systemVariable = systemVariableService.getOne(queryWrapper); // 如果 systemVariable 为空,则不执行后续操作 if (systemVariable == null) { log.info("{}{}", "fpg_close系统变量未配置正常状态的运行电流限制!", JSON.toJSON(data)); } else { DeviceInformation deviceInformation = deviceInformationMapper.selectOne( new LambdaQueryWrapper() .eq(DeviceInformation::getId, data.getDeviceInformationId()) ); // 比较大小 int result = compareStringWithBigDecimal(systemVariable.getDefaultValue(), data.getRunCurrent()); if (result > 0) { deviceInformation.setStatus("1"); } else if (result < 0) { deviceInformation.setStatus("0"); } // 只有在 systemVariable 不为空时才进行更新操作 deviceInformationMapper.updateById(deviceInformation); } // 如果不存在则插入 if (existingData == null) { fpgGatherDataMapper.insert(data); } // 转为 MongoDB 对象并同步 FpgGatherDataMongo mongoData = new FpgGatherDataMongo(); mongoData.setDeviceInformationId(data.getDeviceInformationId()); mongoData.setDeviceRegionId(data.getDeviceRegionId()); mongoData.setCreateTime(data.getCreateTime()); mongoData.setUpdateTime(data.getUpdateTime()); mongoData.setRunCurrent(data.getRunCurrent()); mongoData.setActivePower(data.getActivePower()); Query query = new Query(); query.addCriteria(Criteria.where("deviceInformationId").is(data.getDeviceInformationId())) .addCriteria(Criteria.where("createTime").is(data.getCreateTime())); // 如果 MongoDB 中不存在记录则插入 FpgGatherDataMongo existingMongoData = mongoTemplate.findOne(query, FpgGatherDataMongo.class, "fpgGatherData"); if (existingMongoData == null) { mongoTemplate.save(mongoData, "fpgGatherData"); } } } catch (Exception e) { throw new RuntimeException(e); } finally { ConnectionUtils.closeConnection(connection); ConnectionUtils.closeStatement(statement); ConnectionUtils.closeResultSet(resultSet); } } public static int compareStringWithBigDecimal(String str, BigDecimal bd) { // 将 String 转换为 BigDecimal BigDecimal strAsBigDecimal = new BigDecimal(str); // 使用 BigDecimal 的 compareTo 方法比较 return strAsBigDecimal.compareTo(bd); } }