MysqlWatch.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package org.jeecg.modules.watch;
  2. import com.alibaba.fastjson.JSON;
  3. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  4. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.jeecg.modules.device.entity.DeviceInformation;
  7. import org.jeecg.modules.device.mapper.DeviceInformationMapper;
  8. import org.jeecg.modules.device.mapper.DeviceRegionMapper;
  9. import org.jeecg.modules.devicePoint.entity.DevicePoint;
  10. import org.jeecg.modules.devicePoint.mapper.DevicePointMapper;
  11. import org.jeecg.modules.gatherData.entity.FpgGatherData;
  12. import org.jeecg.modules.gatherData.entity.FpgGatherDataMongo;
  13. import org.jeecg.modules.gatherData.mapper.FpgGatherDataMapper;
  14. import org.jeecg.modules.peaksAndValleysTimeConfig.entity.SystemVariable;
  15. import org.jeecg.modules.peaksAndValleysTimeConfig.service.ISystemVariableService;
  16. import org.jeecg.modules.utils.ConnectionUtils;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.data.mongodb.core.MongoTemplate;
  19. import org.springframework.data.mongodb.core.query.Criteria;
  20. import org.springframework.data.mongodb.core.query.Query;
  21. import org.springframework.scheduling.annotation.EnableAsync;
  22. import org.springframework.scheduling.annotation.Scheduled;
  23. import org.springframework.stereotype.Component;
  24. import java.math.BigDecimal;
  25. import java.sql.Connection;
  26. import java.sql.ResultSet;
  27. import java.sql.Statement;
  28. import java.text.SimpleDateFormat;
  29. import java.util.Date;
  30. import java.util.HashMap;
  31. import java.util.List;
  32. import java.util.Map;
  33. @Slf4j
  34. @EnableAsync
  35. @Component
  36. public class MysqlWatch {
  37. @Autowired
  38. DeviceRegionMapper deviceRegionMapper;
  39. @Autowired
  40. DeviceInformationMapper deviceInformationMapper;
  41. @Autowired
  42. DevicePointMapper devicePointMapper;
  43. @Autowired
  44. FpgGatherDataMapper fpgGatherDataMapper;
  45. @Autowired
  46. ISystemVariableService systemVariableService;
  47. @Autowired
  48. MongoTemplate mongoTemplate;
  49. @Scheduled(cron = "0 0/1 * * * ?")
  50. public void getTsData() {
  51. Connection connection = null;
  52. Statement statement = null;
  53. ResultSet resultSet = null;
  54. try {
  55. connection = ConnectionUtils.getConnection();
  56. // 用于存放合并后的 FpgGatherData,键为 "DeviceInformationId-CreateTime"
  57. Map<String, FpgGatherData> mergedDataMap = new HashMap<>();
  58. LambdaQueryWrapper<DeviceInformation> informationLambdaQueryWrapper = new LambdaQueryWrapper<>();
  59. List<DeviceInformation> devices = deviceInformationMapper.selectList(informationLambdaQueryWrapper);
  60. // 遍历所有设备
  61. for (DeviceInformation device : devices) {
  62. LambdaQueryWrapper<DevicePoint> devicePointLambdaQueryWrapper = new LambdaQueryWrapper<>();
  63. devicePointLambdaQueryWrapper.eq(DevicePoint::getDeviceId, device.getId());
  64. List<DevicePoint> devicePoints = devicePointMapper.selectList(devicePointLambdaQueryWrapper);
  65. // 查询 FpgGatherData 表中 createTime 最新的一条记录
  66. LambdaQueryWrapper<FpgGatherData> fpgGatherDataLambdaQueryWrapper = new LambdaQueryWrapper<>();
  67. fpgGatherDataLambdaQueryWrapper
  68. .orderByDesc(FpgGatherData::getCreateTime) // 按 createTime 降序排序
  69. .last("LIMIT 1"); // 只取一条记录
  70. // 执行查询
  71. FpgGatherData latestData = fpgGatherDataMapper.selectOne(fpgGatherDataLambdaQueryWrapper);
  72. // 获取最新记录的 createTime
  73. Date latestCreateTime1 = null;
  74. if (latestData != null) {
  75. latestCreateTime1 = latestData.getCreateTime();
  76. log.info("最新的 createTime: " + latestCreateTime1);
  77. } else {
  78. // 如果没有找到记录,给一个默认时间
  79. latestCreateTime1 = new Date(0); // 默认时间设置为 1970-01-01 00:00:00
  80. log.info("没有找到任何记录,使用默认时间: " + latestCreateTime1);
  81. }
  82. // 遍历设备点
  83. for (DevicePoint point : devicePoints) {
  84. // 假设 latestCreateTime 是从 FpgGatherData 中获取的最新 createTime
  85. Date latestCreateTime = latestCreateTime1;
  86. // 格式化 latestCreateTime 为 "yyyy-MM-dd HH:mm:ss" 格式的字符串
  87. SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  88. String formattedTime = dateFormat.format(latestCreateTime);
  89. String sql = "SELECT * FROM ts_value WHERE tagname='" + point.getPointCode() +
  90. "' AND timestamp > '" + formattedTime + "' ORDER BY timestamp DESC";
  91. statement = connection.createStatement();
  92. resultSet = statement.executeQuery(sql);
  93. // 遍历查询到的实时数据
  94. while (resultSet.next()) {
  95. Date timestamp = dateFormat.parse(resultSet.getString("timestamp"));
  96. BigDecimal value = new BigDecimal(resultSet.getString("value"));
  97. // 生成分组键
  98. String groupKey = device.getId() + "-" + dateFormat.format(timestamp);
  99. // 获取或创建 FpgGatherData
  100. FpgGatherData fpgGatherData = mergedDataMap.getOrDefault(groupKey, new FpgGatherData());
  101. fpgGatherData.setDeviceInformationId(device.getId());
  102. fpgGatherData.setDeviceRegionId(device.getDeviceRegionId());
  103. fpgGatherData.setCreateTime(timestamp);
  104. fpgGatherData.setUpdateTime(timestamp);
  105. // 处理 DevicePointId 的拼接
  106. String devicePointIdValue = fpgGatherData.getDevicePointId();
  107. String pointIdStr = point.getId().toString();
  108. if ("Ia".equals(point.getCurrentPower())) {
  109. // "Ia" 的 pointId 放在最前
  110. devicePointIdValue = (devicePointIdValue == null || devicePointIdValue.isEmpty())
  111. ? pointIdStr
  112. : pointIdStr + (devicePointIdValue.contains(pointIdStr) ? "" : "," + devicePointIdValue);
  113. } else if ("P".equals(point.getCurrentPower())) {
  114. // "P" 的 pointId 放在最后
  115. devicePointIdValue = (devicePointIdValue == null || devicePointIdValue.isEmpty())
  116. ? pointIdStr
  117. : (devicePointIdValue.contains(pointIdStr) ? devicePointIdValue : devicePointIdValue + "," + pointIdStr);
  118. }
  119. fpgGatherData.setDevicePointId(devicePointIdValue);
  120. // 处理数据点类型和值的累计
  121. if ("Ia".equals(point.getCurrentPower())) {
  122. fpgGatherData.setRunCurrent(
  123. (fpgGatherData.getRunCurrent() == null ? BigDecimal.ZERO : fpgGatherData.getRunCurrent()).add(value)
  124. );
  125. } else if ("P".equals(point.getCurrentPower())) {
  126. fpgGatherData.setActivePower(
  127. (fpgGatherData.getActivePower() == null ? BigDecimal.ZERO : fpgGatherData.getActivePower()).add(value)
  128. );
  129. }
  130. // 更新到 Map
  131. mergedDataMap.put(groupKey, fpgGatherData);
  132. }
  133. }
  134. }
  135. // 保存合并数据到数据库
  136. for (FpgGatherData data : mergedDataMap.values()) {
  137. // 检查数据库中是否存在相同 DeviceInformationId 和 CreateTime 的记录
  138. FpgGatherData existingData = fpgGatherDataMapper.selectOne(
  139. new LambdaQueryWrapper<FpgGatherData>()
  140. .eq(FpgGatherData::getDeviceInformationId, data.getDeviceInformationId())
  141. .eq(FpgGatherData::getCreateTime, data.getCreateTime())
  142. );
  143. // 根据采集到的数据 创建时间,跟峰平谷配置的时间,判断属于哪个时段(尖、峰、平、谷)
  144. QueryWrapper<SystemVariable> queryWrapper = new QueryWrapper<>();
  145. queryWrapper.eq("variable_address", "sys_run_current_limit");
  146. queryWrapper.eq("status", 0);
  147. SystemVariable systemVariable = systemVariableService.getOne(queryWrapper);
  148. // 如果 systemVariable 为空,则不执行后续操作
  149. if (systemVariable == null) {
  150. log.info("{}{}", "fpg_close系统变量未配置正常状态的运行电流限制!", JSON.toJSON(data));
  151. } else {
  152. DeviceInformation deviceInformation = deviceInformationMapper.selectOne(
  153. new LambdaQueryWrapper<DeviceInformation>()
  154. .eq(DeviceInformation::getId, data.getDeviceInformationId())
  155. );
  156. // 比较大小
  157. int result = compareStringWithBigDecimal(systemVariable.getDefaultValue(), data.getRunCurrent());
  158. if (result > 0) {
  159. deviceInformation.setStatus("1");
  160. } else if (result < 0) {
  161. deviceInformation.setStatus("0");
  162. }
  163. // 只有在 systemVariable 不为空时才进行更新操作
  164. deviceInformationMapper.updateById(deviceInformation);
  165. }
  166. // 如果不存在则插入
  167. if (existingData == null) {
  168. fpgGatherDataMapper.insert(data);
  169. }
  170. // 转为 MongoDB 对象并同步
  171. FpgGatherDataMongo mongoData = new FpgGatherDataMongo();
  172. mongoData.setDeviceInformationId(data.getDeviceInformationId());
  173. mongoData.setDeviceRegionId(data.getDeviceRegionId());
  174. mongoData.setCreateTime(data.getCreateTime());
  175. mongoData.setUpdateTime(data.getUpdateTime());
  176. mongoData.setRunCurrent(data.getRunCurrent());
  177. mongoData.setActivePower(data.getActivePower());
  178. Query query = new Query();
  179. query.addCriteria(Criteria.where("deviceInformationId").is(data.getDeviceInformationId()))
  180. .addCriteria(Criteria.where("createTime").is(data.getCreateTime()));
  181. // 如果 MongoDB 中不存在记录则插入
  182. FpgGatherDataMongo existingMongoData = mongoTemplate.findOne(query, FpgGatherDataMongo.class, "fpgGatherData");
  183. if (existingMongoData == null) {
  184. mongoTemplate.save(mongoData, "fpgGatherData");
  185. }
  186. }
  187. } catch (Exception e) {
  188. throw new RuntimeException(e);
  189. } finally {
  190. ConnectionUtils.closeConnection(connection);
  191. ConnectionUtils.closeStatement(statement);
  192. ConnectionUtils.closeResultSet(resultSet);
  193. }
  194. }
  195. public static int compareStringWithBigDecimal(String str, BigDecimal bd) {
  196. // 将 String 转换为 BigDecimal
  197. BigDecimal strAsBigDecimal = new BigDecimal(str);
  198. // 使用 BigDecimal 的 compareTo 方法比较
  199. return strAsBigDecimal.compareTo(bd);
  200. }
  201. }