package org.jeecg.modules.watch; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import lombok.extern.slf4j.Slf4j; import org.jeecg.common.util.DateUtils; import org.jeecg.modules.device.entity.DeviceInformation; import org.jeecg.modules.device.mapper.DeviceInformationMapper; import org.jeecg.modules.device.mapper.DeviceRegionMapper; import org.jeecg.modules.devicePoint.entity.DevicePoint; import org.jeecg.modules.devicePoint.mapper.DevicePointMapper; import org.jeecg.modules.gatherData.entity.FpgGatherData; import org.jeecg.modules.gatherData.mapper.FpgGatherDataMapper; import org.jeecg.modules.peaksAndValleysTimeConfig.entity.SystemVariable; import org.jeecg.modules.peaksAndValleysTimeConfig.service.ISystemVariableService; import org.jeecg.modules.utils.PostgreSQLUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.sql.*; import java.util.Date; import java.util.List; import java.util.stream.Collectors; @Slf4j @EnableAsync @Component public class PostgreSQLWatch { @Autowired DeviceRegionMapper deviceRegionMapper; @Autowired DeviceInformationMapper deviceInformationMapper; @Autowired DevicePointMapper devicePointMapper; @Autowired FpgGatherDataMapper fpgGatherDataMapper; @Autowired ISystemVariableService systemVariableService; @Autowired MongoTemplate mongoTemplate; @Scheduled(cron = "0 0/4 * * * *") public void getTsData() { Connection connection = null; PreparedStatement statement = null; ResultSet resultSet = null; log.info("抓取采集数据"); try { // 建立链接 connection = PostgreSQLUtil.getConnection();; boolean isClosed = connection.isClosed(); LambdaQueryWrapper informationLambdaQueryWrapper = new LambdaQueryWrapper<>(); List devices = deviceInformationMapper.selectList(informationLambdaQueryWrapper); // 遍历所有设备 for (DeviceInformation device : devices) { LambdaQueryWrapper devicePointLambdaQueryWrapper = new LambdaQueryWrapper<>(); devicePointLambdaQueryWrapper.eq(DevicePoint::getDeviceId, device.getId()); List devicePoints = devicePointMapper.selectList(devicePointLambdaQueryWrapper); // 查询 FpgGatherData 表中 该设备下的createTime 最新的一条记录 LambdaQueryWrapper fpgGatherDataLambdaQueryWrapper = new LambdaQueryWrapper<>(); fpgGatherDataLambdaQueryWrapper.eq(FpgGatherData::getDeviceInformationId, device.getId()); fpgGatherDataLambdaQueryWrapper .orderByDesc(FpgGatherData::getCreateTime) // 按 createTime 降序排序 .last("LIMIT 1"); // 只取一条记录 // 执行查询 FpgGatherData latestData = fpgGatherDataMapper.selectOne(fpgGatherDataLambdaQueryWrapper); // 获取最新记录的 createTime Date latestCreateTime1 = null; if (latestData != null) { latestCreateTime1 = latestData.getCreateTime(); } else { // 如果没有找到记录,给一个默认时间 latestCreateTime1 = new Date(0); // 默认时间设置为 1970-01-01 00:00:00 } // 根据采集到的数据 创建时间,跟峰平谷配置的时间,判断属于哪个时段(尖、峰、平、谷) QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("variable_address", "sys_run_current_limit"); queryWrapper.eq("status", 0); SystemVariable systemVariable = systemVariableService.getOne(queryWrapper); // 生成PointCode集合 List pointCodeList = devicePoints.stream().map(DevicePoint::getPointCode).collect(Collectors.toList()); List pointIdList = devicePoints.stream().map(DevicePoint::getId).collect(Collectors.toList()); // pointCodeList转换String String pointCodeListStr = String.join("','", pointCodeList); String pointIdListStrs = String.join(",", pointIdList); // 日期转字符串 String formattedTime = DateUtils.date2Str(latestCreateTime1, DateUtils.datetimeFormat.get()); String sql = String.format("SELECT * FROM ts_value WHERE tagname IN ('%s') AND timestamp > '%s' ORDER BY timestamp ASC", pointCodeListStr, formattedTime); statement = connection.prepareStatement(sql); resultSet = statement.executeQuery(); // 遍历查询到的实时数据 while (resultSet.next()) { DevicePoint point = devicePointMapper.selectOne( new LambdaQueryWrapper() .eq(DevicePoint::getPointCode, resultSet.getString("tagname") ) ); log.info(resultSet.getString("tagname") + "////////" + resultSet.getString("timestamp") + "////////" + resultSet.getString("value")); Date timestamp = DateUtils.str2Date(resultSet.getString("timestamp"), DateUtils.datetimeFormat.get()); BigDecimal value = new BigDecimal(resultSet.getString("value")); // 检查FpgGatherData 数据库中是否存在相同 DeviceInformationId 和 CreateTime 的记录 FpgGatherData fpgGatherData = fpgGatherDataMapper.selectOne( new LambdaQueryWrapper() .eq(FpgGatherData::getDeviceInformationId, point.getDeviceId()) .eq(FpgGatherData::getCreateTime, timestamp) ); if (fpgGatherData == null) { // 直接新增 FpgGatherData fpgGatherInsert = new FpgGatherData(); fpgGatherInsert.setCreateTime(timestamp); // 处理电流+电压 if ("Ia".equals(point.getCurrentPower())) { fpgGatherInsert.setRunCurrent(value); } else if ("P".equals(point.getCurrentPower())) { fpgGatherInsert.setActivePower(value); } fpgGatherInsert.setDevicePointId(pointIdListStrs); fpgGatherInsert.setDeviceInformationId(point.getDeviceId()); fpgGatherInsert.setDeviceRegionId(device.getDeviceRegionId()); log.info("新增----数据处理---"); log.info(JSON.toJSONString(fpgGatherInsert)); fpgGatherDataMapper.insert(fpgGatherInsert); } else { // 编辑处理 // 处理数据点类型和值的累计 if ("Ia".equals(point.getCurrentPower())) { BigDecimal runCurrent = fpgGatherData.getRunCurrent() == null ? value : fpgGatherData.getRunCurrent().add(value); fpgGatherData.setRunCurrent(runCurrent); } else if ("P".equals(point.getCurrentPower())) { BigDecimal activePower = fpgGatherData.getActivePower() == null ? value : fpgGatherData.getActivePower().add(value); fpgGatherData.setActivePower(activePower); } fpgGatherData.setId(fpgGatherData.getId()); fpgGatherDataMapper.updateById(fpgGatherData); } // 如果 systemVariable 为空,则不执行后续操作 更新设备状态 if ("Ia".equals(point.getCurrentPower()) && systemVariable == null) { log.info("{}{}", "fpg_close系统变量未配置正常状态的运行电流限制!", JSON.toJSON(fpgGatherData)); } else if("Ia".equals(point.getCurrentPower())) { DeviceInformation deviceInformation = deviceInformationMapper.selectOne( new LambdaQueryWrapper() .eq(DeviceInformation::getId, point.getDeviceId()) ); // 比较大小 int result = compareStringWithBigDecimal(systemVariable.getDefaultValue(), value); if (result > 0) { deviceInformation.setStatus("1"); } else if (result < 0) { deviceInformation.setStatus("0"); } // 只有在 systemVariable 不为空时才进行更新操作 deviceInformationMapper.updateById(deviceInformation); } } // 关闭连接 resultSet.close(); statement.close(); } } catch (Exception e) { // 关闭连接 log.info(e.getMessage()); } finally { PostgreSQLUtil.close(connection); } } public static int compareStringWithBigDecimal(String str, BigDecimal bd) { // 将 String 转换为 BigDecimal BigDecimal strAsBigDecimal = new BigDecimal(str); // 使用 BigDecimal 的 compareTo 方法比较 return strAsBigDecimal.compareTo(bd); } // public static void main(String[] args) { // Connection connection = null; // PreparedStatement preparedStatement = null; // ResultSet resultSet = null; // try { // int i = 0; // connection = PostgreSQLUtil.getConnection(); // String sql = "SELECT * FROM ts_value WHERE tagname = 'YC.10330' AND timestamp > '2024-12-03 11:20:01' ORDER BY timestamp ASC"; // preparedStatement = connection.prepareStatement(sql); // resultSet = preparedStatement.executeQuery(); // while (resultSet.next()) { // i++; // log.info("{}{}", "查询的数据结果:", "时间:" + resultSet.getString("timestamp") + "电流值" + resultSet.getString("value")); // } // log.info("{}{}",">>>>>>>>>>>>>>>>>>查询PG总条数:", i); // } catch (SQLException e) { // log.info("{}",">>>>>>>>>>>>>>>>>>查询PG总条数:"); // } finally { // // } // } }