123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- package org.jeecg.modules.watch;
- import com.alibaba.fastjson.JSON;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import lombok.extern.slf4j.Slf4j;
- import org.jeecg.common.util.DateUtils;
- import org.jeecg.modules.device.entity.DeviceInformation;
- import org.jeecg.modules.device.mapper.DeviceInformationMapper;
- import org.jeecg.modules.device.mapper.DeviceRegionMapper;
- import org.jeecg.modules.devicePoint.entity.DevicePoint;
- import org.jeecg.modules.devicePoint.mapper.DevicePointMapper;
- import org.jeecg.modules.gatherData.entity.FpgGatherData;
- import org.jeecg.modules.gatherData.mapper.FpgGatherDataMapper;
- import org.jeecg.modules.peaksAndValleysTimeConfig.entity.SystemVariable;
- import org.jeecg.modules.peaksAndValleysTimeConfig.service.ISystemVariableService;
- import org.jeecg.modules.utils.PostgreSQLUtil;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.mongodb.core.MongoTemplate;
- import org.springframework.scheduling.annotation.EnableAsync;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import java.math.BigDecimal;
- import java.sql.*;
- import java.util.Date;
- import java.util.List;
- import java.util.stream.Collectors;
- @Slf4j
- @EnableAsync
- @Component
- public class PostgreSQLWatch {
- @Autowired
- DeviceRegionMapper deviceRegionMapper;
- @Autowired
- DeviceInformationMapper deviceInformationMapper;
- @Autowired
- DevicePointMapper devicePointMapper;
- @Autowired
- FpgGatherDataMapper fpgGatherDataMapper;
- @Autowired
- ISystemVariableService systemVariableService;
- @Autowired
- MongoTemplate mongoTemplate;
- // @Scheduled(cron = "0 */5 * * * ?")
- public void getTsData() {
- Connection connection = null;
- PreparedStatement statement = null;
- ResultSet resultSet = null;
- log.info("抓取采集数据");
- try {
- // 建立链接
- connection = PostgreSQLUtil.getConnection();;
- boolean isClosed = connection.isClosed();
- LambdaQueryWrapper<DeviceInformation> informationLambdaQueryWrapper = new LambdaQueryWrapper<>();
- List<DeviceInformation> devices = deviceInformationMapper.selectList(informationLambdaQueryWrapper);
- // 遍历所有设备
- for (DeviceInformation device : devices) {
- LambdaQueryWrapper<DevicePoint> devicePointLambdaQueryWrapper = new LambdaQueryWrapper<>();
- devicePointLambdaQueryWrapper.eq(DevicePoint::getDeviceId, device.getId());
- List<DevicePoint> devicePoints = devicePointMapper.selectList(devicePointLambdaQueryWrapper);
- // 查询 FpgGatherData 表中 该设备下的createTime 最新的一条记录
- LambdaQueryWrapper<FpgGatherData> fpgGatherDataLambdaQueryWrapper = new LambdaQueryWrapper<>();
- fpgGatherDataLambdaQueryWrapper.eq(FpgGatherData::getDeviceInformationId, device.getId());
- fpgGatherDataLambdaQueryWrapper
- .orderByDesc(FpgGatherData::getCreateTime) // 按 createTime 降序排序
- .last("LIMIT 1"); // 只取一条记录
- // 执行查询
- FpgGatherData latestData = fpgGatherDataMapper.selectOne(fpgGatherDataLambdaQueryWrapper);
- // 获取最新记录的 createTime
- Date latestCreateTime1 = null;
- if (latestData != null) {
- latestCreateTime1 = latestData.getCreateTime();
- } else {
- // 如果没有找到记录,给一个默认时间
- latestCreateTime1 = new Date(0); // 默认时间设置为 1970-01-01 00:00:00
- }
- // 根据采集到的数据 创建时间,跟峰平谷配置的时间,判断属于哪个时段(尖、峰、平、谷)
- QueryWrapper<SystemVariable> queryWrapper = new QueryWrapper<>();
- queryWrapper.eq("variable_address", "sys_run_current_limit");
- queryWrapper.eq("status", 0);
- SystemVariable systemVariable = systemVariableService.getOne(queryWrapper);
- // 生成PointCode集合
- List<String> pointCodeList = devicePoints.stream().map(DevicePoint::getPointCode).collect(Collectors.toList());
- List<String> pointIdList = devicePoints.stream().map(DevicePoint::getId).collect(Collectors.toList());
- // pointCodeList转换String
- String pointCodeListStr = String.join("','", pointCodeList);
- String pointIdListStrs = String.join(",", pointIdList);
- // 日期转字符串
- String formattedTime = DateUtils.date2Str(latestCreateTime1, DateUtils.datetimeFormat.get());
- String sql = String.format("SELECT * FROM ts_value WHERE tagname IN ('%s') AND timestamp > '%s' ORDER BY timestamp ASC", pointCodeListStr, formattedTime);
- statement = connection.prepareStatement(sql);
- resultSet = statement.executeQuery();
- // 遍历查询到的实时数据
- while (resultSet.next()) {
- DevicePoint point = devicePointMapper.selectOne(
- new LambdaQueryWrapper<DevicePoint>()
- .eq(DevicePoint::getPointCode, resultSet.getString("tagname") )
- );
- log.info(resultSet.getString("tagname") + "////////" + resultSet.getString("timestamp") + "////////" + resultSet.getString("value"));
- Date timestamp = DateUtils.str2Date(resultSet.getString("timestamp"), DateUtils.datetimeFormat.get());
- BigDecimal value = new BigDecimal(resultSet.getString("value"));
- // 检查FpgGatherData 数据库中是否存在相同 DeviceInformationId 和 CreateTime 的记录
- FpgGatherData fpgGatherData = fpgGatherDataMapper.selectOne(
- new LambdaQueryWrapper<FpgGatherData>()
- .eq(FpgGatherData::getDeviceInformationId, point.getDeviceId())
- .eq(FpgGatherData::getCreateTime, timestamp)
- );
- if (fpgGatherData == null) { // 直接新增
- FpgGatherData fpgGatherInsert = new FpgGatherData();
- fpgGatherInsert.setCreateTime(timestamp);
- // 处理电流+电压
- if ("Ia".equals(point.getCurrentPower())) {
- fpgGatherInsert.setRunCurrent(value);
- } else if ("P".equals(point.getCurrentPower())) {
- fpgGatherInsert.setActivePower(value);
- }
- fpgGatherInsert.setDevicePointId(pointIdListStrs);
- fpgGatherInsert.setDeviceInformationId(point.getDeviceId());
- fpgGatherInsert.setDeviceRegionId(device.getDeviceRegionId());
- log.info("新增----数据处理---");
- log.info(JSON.toJSONString(fpgGatherInsert));
- fpgGatherDataMapper.insert(fpgGatherInsert);
- } else { // 编辑处理
- // 处理数据点类型和值的累计
- if ("Ia".equals(point.getCurrentPower())) {
- BigDecimal runCurrent = fpgGatherData.getRunCurrent() == null ? value : fpgGatherData.getRunCurrent().add(value);
- fpgGatherData.setRunCurrent(runCurrent);
- } else if ("P".equals(point.getCurrentPower())) {
- BigDecimal activePower = fpgGatherData.getActivePower() == null ? value : fpgGatherData.getActivePower().add(value);
- fpgGatherData.setActivePower(activePower);
- }
- fpgGatherData.setId(fpgGatherData.getId());
- fpgGatherDataMapper.updateById(fpgGatherData);
- }
- // 如果 systemVariable 为空,则不执行后续操作 更新设备状态
- if ("Ia".equals(point.getCurrentPower()) && systemVariable == null) {
- log.info("{}{}", "fpg_close系统变量未配置正常状态的运行电流限制!", JSON.toJSON(fpgGatherData));
- } else if("Ia".equals(point.getCurrentPower())) {
- DeviceInformation deviceInformation = deviceInformationMapper.selectOne(
- new LambdaQueryWrapper<DeviceInformation>()
- .eq(DeviceInformation::getId, point.getDeviceId())
- );
- // 比较大小
- int result = compareStringWithBigDecimal(systemVariable.getDefaultValue(), value);
- if (result > 0) {
- deviceInformation.setStatus("1");
- } else if (result < 0) {
- deviceInformation.setStatus("0");
- }
- // 只有在 systemVariable 不为空时才进行更新操作
- deviceInformationMapper.updateById(deviceInformation);
- }
- }
- // 关闭连接
- resultSet.close();
- statement.close();
- }
- } catch (Exception e) {
- // 关闭连接
- log.info(e.getMessage());
- } finally {
- PostgreSQLUtil.close(connection);
- }
- }
- public static int compareStringWithBigDecimal(String str, BigDecimal bd) {
- // 将 String 转换为 BigDecimal
- BigDecimal strAsBigDecimal = new BigDecimal(str);
- // 使用 BigDecimal 的 compareTo 方法比较
- return strAsBigDecimal.compareTo(bd);
- }
- // public static void main(String[] args) {
- // Connection connection = null;
- // PreparedStatement preparedStatement = null;
- // ResultSet resultSet = null;
- // try {
- // int i = 0;
- // connection = PostgreSQLUtil.getConnection();
- // String sql = "SELECT * FROM ts_value WHERE tagname = 'YC.10330' AND timestamp > '2024-12-03 11:20:01' ORDER BY timestamp ASC";
- // preparedStatement = connection.prepareStatement(sql);
- // resultSet = preparedStatement.executeQuery();
- // while (resultSet.next()) {
- // i++;
- // log.info("{}{}", "查询的数据结果:", "时间:" + resultSet.getString("timestamp") + "电流值" + resultSet.getString("value"));
- // }
- // log.info("{}{}",">>>>>>>>>>>>>>>>>>查询PG总条数:", i);
- // } catch (SQLException e) {
- // log.info("{}",">>>>>>>>>>>>>>>>>>查询PG总条数:");
- // } finally {
- //
- // }
- // }
- }
|