PostgreSQLWatch.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package org.jeecg.modules.watch;
  2. import com.alibaba.fastjson.JSON;
  3. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  4. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.jeecg.common.util.DateUtils;
  7. import org.jeecg.modules.device.entity.DeviceInformation;
  8. import org.jeecg.modules.device.mapper.DeviceInformationMapper;
  9. import org.jeecg.modules.device.mapper.DeviceRegionMapper;
  10. import org.jeecg.modules.devicePoint.entity.DevicePoint;
  11. import org.jeecg.modules.devicePoint.mapper.DevicePointMapper;
  12. import org.jeecg.modules.gatherData.entity.FpgGatherData;
  13. import org.jeecg.modules.gatherData.mapper.FpgGatherDataMapper;
  14. import org.jeecg.modules.peaksAndValleysTimeConfig.entity.SystemVariable;
  15. import org.jeecg.modules.peaksAndValleysTimeConfig.service.ISystemVariableService;
  16. import org.jeecg.modules.utils.PostgreSQLUtil;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.data.mongodb.core.MongoTemplate;
  19. import org.springframework.scheduling.annotation.EnableAsync;
  20. import org.springframework.scheduling.annotation.Scheduled;
  21. import org.springframework.stereotype.Component;
  22. import java.math.BigDecimal;
  23. import java.sql.*;
  24. import java.util.Date;
  25. import java.util.List;
  26. import java.util.stream.Collectors;
  27. @Slf4j
  28. @EnableAsync
  29. @Component
  30. public class PostgreSQLWatch {
  31. @Autowired
  32. DeviceRegionMapper deviceRegionMapper;
  33. @Autowired
  34. DeviceInformationMapper deviceInformationMapper;
  35. @Autowired
  36. DevicePointMapper devicePointMapper;
  37. @Autowired
  38. FpgGatherDataMapper fpgGatherDataMapper;
  39. @Autowired
  40. ISystemVariableService systemVariableService;
  41. @Autowired
  42. MongoTemplate mongoTemplate;
  43. @Scheduled(cron = "0 0/4 * * * *")
  44. public void getTsData() {
  45. Connection connection = null;
  46. PreparedStatement statement = null;
  47. ResultSet resultSet = null;
  48. log.info("抓取采集数据");
  49. try {
  50. // 建立链接
  51. connection = PostgreSQLUtil.getConnection();;
  52. boolean isClosed = connection.isClosed();
  53. LambdaQueryWrapper<DeviceInformation> informationLambdaQueryWrapper = new LambdaQueryWrapper<>();
  54. List<DeviceInformation> devices = deviceInformationMapper.selectList(informationLambdaQueryWrapper);
  55. // 遍历所有设备
  56. for (DeviceInformation device : devices) {
  57. LambdaQueryWrapper<DevicePoint> devicePointLambdaQueryWrapper = new LambdaQueryWrapper<>();
  58. devicePointLambdaQueryWrapper.eq(DevicePoint::getDeviceId, device.getId());
  59. List<DevicePoint> devicePoints = devicePointMapper.selectList(devicePointLambdaQueryWrapper);
  60. // 查询 FpgGatherData 表中 该设备下的createTime 最新的一条记录
  61. LambdaQueryWrapper<FpgGatherData> fpgGatherDataLambdaQueryWrapper = new LambdaQueryWrapper<>();
  62. fpgGatherDataLambdaQueryWrapper.eq(FpgGatherData::getDeviceInformationId, device.getId());
  63. fpgGatherDataLambdaQueryWrapper
  64. .orderByDesc(FpgGatherData::getCreateTime) // 按 createTime 降序排序
  65. .last("LIMIT 1"); // 只取一条记录
  66. // 执行查询
  67. FpgGatherData latestData = fpgGatherDataMapper.selectOne(fpgGatherDataLambdaQueryWrapper);
  68. // 获取最新记录的 createTime
  69. Date latestCreateTime1 = null;
  70. if (latestData != null) {
  71. latestCreateTime1 = latestData.getCreateTime();
  72. } else {
  73. // 如果没有找到记录,给一个默认时间
  74. latestCreateTime1 = new Date(0); // 默认时间设置为 1970-01-01 00:00:00
  75. }
  76. // 根据采集到的数据 创建时间,跟峰平谷配置的时间,判断属于哪个时段(尖、峰、平、谷)
  77. QueryWrapper<SystemVariable> queryWrapper = new QueryWrapper<>();
  78. queryWrapper.eq("variable_address", "sys_run_current_limit");
  79. queryWrapper.eq("status", 0);
  80. SystemVariable systemVariable = systemVariableService.getOne(queryWrapper);
  81. // 生成PointCode集合
  82. List<String> pointCodeList = devicePoints.stream().map(DevicePoint::getPointCode).collect(Collectors.toList());
  83. List<String> pointIdList = devicePoints.stream().map(DevicePoint::getId).collect(Collectors.toList());
  84. // pointCodeList转换String
  85. String pointCodeListStr = String.join("','", pointCodeList);
  86. String pointIdListStrs = String.join(",", pointIdList);
  87. // 日期转字符串
  88. String formattedTime = DateUtils.date2Str(latestCreateTime1, DateUtils.datetimeFormat.get());
  89. String sql = String.format("SELECT * FROM ts_value WHERE tagname IN ('%s') AND timestamp > '%s' ORDER BY timestamp ASC", pointCodeListStr, formattedTime);
  90. statement = connection.prepareStatement(sql);
  91. resultSet = statement.executeQuery();
  92. // 遍历查询到的实时数据
  93. while (resultSet.next()) {
  94. DevicePoint point = devicePointMapper.selectOne(
  95. new LambdaQueryWrapper<DevicePoint>()
  96. .eq(DevicePoint::getPointCode, resultSet.getString("tagname") )
  97. );
  98. log.info(resultSet.getString("tagname") + "////////" + resultSet.getString("timestamp") + "////////" + resultSet.getString("value"));
  99. Date timestamp = DateUtils.str2Date(resultSet.getString("timestamp"), DateUtils.datetimeFormat.get());
  100. BigDecimal value = new BigDecimal(resultSet.getString("value"));
  101. // 检查FpgGatherData 数据库中是否存在相同 DeviceInformationId 和 CreateTime 的记录
  102. FpgGatherData fpgGatherData = fpgGatherDataMapper.selectOne(
  103. new LambdaQueryWrapper<FpgGatherData>()
  104. .eq(FpgGatherData::getDeviceInformationId, point.getDeviceId())
  105. .eq(FpgGatherData::getCreateTime, timestamp)
  106. );
  107. if (fpgGatherData == null) { // 直接新增
  108. FpgGatherData fpgGatherInsert = new FpgGatherData();
  109. fpgGatherInsert.setCreateTime(timestamp);
  110. // 处理电流+电压
  111. if ("Ia".equals(point.getCurrentPower())) {
  112. fpgGatherInsert.setRunCurrent(value);
  113. } else if ("P".equals(point.getCurrentPower())) {
  114. fpgGatherInsert.setActivePower(value);
  115. }
  116. fpgGatherInsert.setDevicePointId(pointIdListStrs);
  117. fpgGatherInsert.setDeviceInformationId(point.getDeviceId());
  118. fpgGatherInsert.setDeviceRegionId(device.getDeviceRegionId());
  119. log.info("新增----数据处理---");
  120. log.info(JSON.toJSONString(fpgGatherInsert));
  121. fpgGatherDataMapper.insert(fpgGatherInsert);
  122. } else { // 编辑处理
  123. // 处理数据点类型和值的累计
  124. if ("Ia".equals(point.getCurrentPower())) {
  125. BigDecimal runCurrent = fpgGatherData.getRunCurrent() == null ? value : fpgGatherData.getRunCurrent().add(value);
  126. fpgGatherData.setRunCurrent(runCurrent);
  127. } else if ("P".equals(point.getCurrentPower())) {
  128. BigDecimal activePower = fpgGatherData.getActivePower() == null ? value : fpgGatherData.getActivePower().add(value);
  129. fpgGatherData.setActivePower(activePower);
  130. }
  131. fpgGatherData.setId(fpgGatherData.getId());
  132. fpgGatherDataMapper.updateById(fpgGatherData);
  133. }
  134. // 如果 systemVariable 为空,则不执行后续操作 更新设备状态
  135. if ("Ia".equals(point.getCurrentPower()) && systemVariable == null) {
  136. log.info("{}{}", "fpg_close系统变量未配置正常状态的运行电流限制!", JSON.toJSON(fpgGatherData));
  137. } else if("Ia".equals(point.getCurrentPower())) {
  138. DeviceInformation deviceInformation = deviceInformationMapper.selectOne(
  139. new LambdaQueryWrapper<DeviceInformation>()
  140. .eq(DeviceInformation::getId, point.getDeviceId())
  141. );
  142. // 比较大小
  143. int result = compareStringWithBigDecimal(systemVariable.getDefaultValue(), value);
  144. if (result > 0) {
  145. deviceInformation.setStatus("1");
  146. } else if (result < 0) {
  147. deviceInformation.setStatus("0");
  148. }
  149. // 只有在 systemVariable 不为空时才进行更新操作
  150. deviceInformationMapper.updateById(deviceInformation);
  151. }
  152. }
  153. // 关闭连接
  154. resultSet.close();
  155. statement.close();
  156. }
  157. } catch (Exception e) {
  158. // 关闭连接
  159. log.info(e.getMessage());
  160. } finally {
  161. PostgreSQLUtil.close(connection);
  162. }
  163. }
  164. public static int compareStringWithBigDecimal(String str, BigDecimal bd) {
  165. // 将 String 转换为 BigDecimal
  166. BigDecimal strAsBigDecimal = new BigDecimal(str);
  167. // 使用 BigDecimal 的 compareTo 方法比较
  168. return strAsBigDecimal.compareTo(bd);
  169. }
  170. // public static void main(String[] args) {
  171. // Connection connection = null;
  172. // PreparedStatement preparedStatement = null;
  173. // ResultSet resultSet = null;
  174. // try {
  175. // int i = 0;
  176. // connection = PostgreSQLUtil.getConnection();
  177. // String sql = "SELECT * FROM ts_value WHERE tagname = 'YC.10330' AND timestamp > '2024-12-03 11:20:01' ORDER BY timestamp ASC";
  178. // preparedStatement = connection.prepareStatement(sql);
  179. // resultSet = preparedStatement.executeQuery();
  180. // while (resultSet.next()) {
  181. // i++;
  182. // log.info("{}{}", "查询的数据结果:", "时间:" + resultSet.getString("timestamp") + "电流值" + resultSet.getString("value"));
  183. // }
  184. // log.info("{}{}",">>>>>>>>>>>>>>>>>>查询PG总条数:", i);
  185. // } catch (SQLException e) {
  186. // log.info("{}",">>>>>>>>>>>>>>>>>>查询PG总条数:");
  187. // } finally {
  188. //
  189. // }
  190. // }
  191. }