steel_fit.py 11 KB


  1. from utils.statepoint import Statepoint
  2. from models.data_sender import Sender
  3. import numpy as np
  4. from scipy import interpolate, integrate
  5. from collections import deque
  6. from models.cip_data import CIPData
  7. from utils.s7data import S7data
  8. import datetime, logging, time, threading, queue
  9. def interp1d(x, y, logger: logging.Logger):
  10. if x[-1] < datetime.datetime.now().timestamp() - 300:
  11. logger.error("连续5分钟未采集到变化数值,表示为-1")
  12. return lambda data: -1
  13. if len(x) < 4:
  14. logger.warning("采集到的数量低于4个,表示为平均值")
  15. avg = np.mean(y)
  16. return lambda data: avg
  17. try:
  18. return interpolate.interp1d(x, y, kind='cubic')
  19. except Exception as e:
  20. logger.error(f"三次样条插值失败:{e}")
  21. return lambda data: -1
  22. class BufferPoint(Statepoint):
  23. def __init__(self, initvalue = None, initstate = False, maxlen: int | None = 3000):
  24. super().__init__(deque(maxlen = maxlen), initstate)
  25. def inject(self, data):
  26. self.data.append((data, datetime.datetime.now().timestamp()))
  27. def get_buffer(self):
  28. res = self.data.copy()
  29. last = res[-1][0]
  30. res.append((last, datetime.datetime.now().timestamp() + 0.001))
  31. return res
  32. class billet_data_gatherer:
  33. def __init__(self, dspeed_point: BufferPoint, cutting_sig_point: Statepoint, sizing_point: Statepoint, flow_rate_point_list: list[BufferPoint],
  34. logger: logging.Logger, strand_no: int, result_queue: queue.Queue):
  35. self.dspeed_point = dspeed_point
  36. self.cutting_sig_point = cutting_sig_point
  37. self.sizing_point = sizing_point
  38. self.flow_rate_point_list = flow_rate_point_list
  39. self.logger = logger
  40. self.strand_no = strand_no
  41. self.result_queue = result_queue
  42. self.MOLD_TO_CUTTER_DISTANCE = 28
  43. self.CRITICAL_ZONE_LENGTH = 12
  44. self.cutting_sig_point.set_excite_action(self.cutting_action)
  45. def cutting_action(self):
  46. cutting_time = datetime.datetime.now().timestamp()
  47. sizing = self.sizing_point.data / 1000
  48. self.logger.debug(f"{self.strand_no}流开始切割")
  49. time.sleep(30)
  50. dspeed_buffer: deque = self.dspeed_point.get_buffer()
  51. flow_rate_buffer_list: list[deque] = [self.flow_rate_point_list[i].get_buffer() for i in range(5)]
  52. if len(dspeed_buffer) < 10:
  53. self.logger.debug(f"{self.strand_no}流已统计数据量不足,无法计算")
  54. return
  55. data_tuple, time_tuple = zip(*dspeed_buffer)
  56. x = np.array(time_tuple)
  57. y = np.array(data_tuple) / 60
  58. vt_func = interp1d(x, y, self.logger)
  59. entry_time = self._binary_search_start(vt_func, cutting_time, sizing + self.MOLD_TO_CUTTER_DISTANCE)
  60. if entry_time == None:
  61. self.logger.debug(f"{self.strand_no}流已统计数据量不足,无法计算")
  62. return
  63. exit_time = self._binary_search_end(vt_func, entry_time, sizing + self.CRITICAL_ZONE_LENGTH)
  64. dspeed_avg = (sizing + self.CRITICAL_ZONE_LENGTH) / (exit_time - entry_time) * 60
  65. self.create_data(cutting_time, entry_time, exit_time, self.flow_rate_total(flow_rate_buffer_list, entry_time, exit_time), dspeed_avg)
  66. def _binary_search_start(self, func, upper_limit, target):
  67. left = func.x.min()
  68. right = func.x.max()
  69. if self._get_distance(func, left, upper_limit) < target:
  70. return
  71. while abs(right - left) > 0.01:
  72. mid = (left + right) / 2
  73. if self._get_distance(func, mid, upper_limit) >= target:
  74. left = mid
  75. else:
  76. right = mid
  77. return (left + right) / 2
  78. def _binary_search_end(self, func, lower_limit, target):
  79. left = func.x.min()
  80. right = func.x.max()
  81. while abs(right - left) > 0.01:
  82. mid = (left + right) / 2
  83. if self._get_distance(func, lower_limit, mid) >= target:
  84. right = mid
  85. else:
  86. left = mid
  87. return (left + right) / 2
  88. def _get_distance(self, vt_func, lower, upper):
  89. return integrate.quad(vt_func, lower, upper)[0]
  90. def flow_rate_total(self, deque_list: list[deque], start_time, end_time):
  91. res = []
  92. for dequei in deque_list:
  93. data_tuple, time_tuple = zip(*dequei)
  94. x = np.array(time_tuple)
  95. y = np.array(data_tuple) / 3600
  96. vt_func = interp1d(x, y, self.logger)
  97. total = integrate.quad(vt_func, start_time, end_time)[0]
  98. res.append(total)
  99. return sum(res)
  100. def create_data(self, cutting_time, entry_time, exit_time, water_total, dspeed_avg):
  101. self.logger.debug(f"{self.strand_no}流钢坯计算结果:")
  102. self.logger.debug(f'\t切割时间:{datetime.datetime.fromtimestamp(cutting_time).strftime("%Y-%m-%d %H:%M:%S")}')
  103. self.logger.debug(f'\t进入时间:{datetime.datetime.fromtimestamp(entry_time).strftime("%Y-%m-%d %H:%M:%S")}')
  104. self.logger.debug(f'\t离开时间:{datetime.datetime.fromtimestamp(exit_time).strftime("%Y-%m-%d %H:%M:%S")}')
  105. self.logger.debug(f'\t水量总计:{water_total}')
  106. self.logger.debug(f'\t平均拉速:{dspeed_avg}')
  107. self.result_queue.put((self.strand_no, float(cutting_time), float(entry_time), float(exit_time), float(water_total), float(dspeed_avg)))
  108. class SteelFit:
  109. def __init__(self, s7_data_20: S7data, s7_data_215: S7data, cip_data: CIPData, sender: Sender, logger: logging.Logger):
  110. self.water_temperature_buffer: BufferPoint = cip_data.make_point("5#二冷水总管温度", BufferPoint)
  111. self.water_pressure_buffer: BufferPoint = cip_data.make_point("5#二冷水总管压力", BufferPoint)
  112. self.steel_temperature_buffer: BufferPoint = s7_data_20.make_point("中间包连续测温温度", BufferPoint)
  113. self.water_temperature_difference_buffer: BufferPoint = cip_data.make_point("5#结晶器水温差", BufferPoint)
  114. self.dspeed_buffer = [s7_data_20.make_point(f"{i}流结晶器拉速", BufferPoint) for i in range(1, 9)]
  115. self.cutting_sig_point = [s7_data_215.make_point(f"L{i}切割信号[0]") for i in range(1, 9)]
  116. self.sizing_point = [s7_data_20.make_point(f"{i}流定尺") for i in range(1, 9)]
  117. self.flow_rate_point_list = [[cip_data.make_point(f"5#水流量-{i}流-{j}段", BufferPoint) for j in range(1, 6)] for i in range(1, 9)]
  118. self.sender = sender
  119. self.logger = logger
  120. self.task_queue = queue.Queue()
  121. self.billet_data_gatherer_list = [
  122. billet_data_gatherer(
  123. self.dspeed_buffer[i],
  124. self.cutting_sig_point[i],
  125. self.sizing_point[i],
  126. self.flow_rate_point_list[i],
  127. logger,
  128. i + 1,
  129. self.task_queue
  130. )
  131. for i in range(8)
  132. ]
  133. self.thread_run = True
  134. self.thread = threading.Thread(target=self.loop_process)
  135. self.thread.start()
  136. def loop_process(self):
  137. while self.thread_run:
  138. try:
  139. task_tuple = self.task_queue.get(True, 1)
  140. tmp_dict = {}
  141. tmp_dict["strand_no"] = task_tuple[0]
  142. tmp_dict["cutting_time"] = datetime.datetime.fromtimestamp(task_tuple[1])
  143. tmp_dict["entry_time"] = datetime.datetime.fromtimestamp(task_tuple[2])
  144. tmp_dict["exit_time"] = datetime.datetime.fromtimestamp(task_tuple[3])
  145. cal_res = self.cal_data(task_tuple[2], task_tuple[3])
  146. tmp_dict["water_temperature"] = cal_res[0]
  147. tmp_dict["water_pressure"] = cal_res[1]
  148. tmp_dict["water_volume"] = task_tuple[4]
  149. tmp_dict["water_pressure_sd"] = cal_res[2]
  150. tmp_dict["steel_temperature"] = cal_res[3]
  151. tmp_dict["drawing_speed"] = task_tuple[5]
  152. tmp_dict["water_temperature_difference"] = cal_res[4]
  153. self.sender.upload_billet(tmp_dict)
  154. except queue.Empty:
  155. pass
  156. except Exception as e:
  157. self.logger.error(f"铸机数据计算过程中出现意外:{e}")
  158. def cal_data(self, entry_time, exit_time):
  159. wt = self.interval_avg(self.water_temperature_buffer.get_buffer(), entry_time, exit_time)
  160. wp = self.interval_avg(self.water_pressure_buffer.get_buffer(), entry_time, exit_time)
  161. wps = self.interval_sd(self.water_pressure_buffer.get_buffer(), entry_time, exit_time)
  162. st = self.interval_avg(self.steel_temperature_buffer.get_buffer(), entry_time, exit_time)
  163. wtd = self.interval_avg(self.water_temperature_difference_buffer.get_buffer(), entry_time, exit_time)
  164. return (float(wt), float(wp), float(wps), float(st), float(wtd))
  165. def interval_avg(self, buffer, left, right):
  166. data_tuple, time_tuple = zip(*buffer)
  167. x = np.array(time_tuple)
  168. y = np.array(data_tuple)
  169. func = interp1d(x, y, self.logger)
  170. inte = integrate.quad(func, left, right)[0]
  171. return inte / (right - left)
  172. def interval_sd(self, buffer, left, right):
  173. data_tuple, time_tuple = zip(*buffer)
  174. x = np.array(time_tuple)
  175. y = np.array(data_tuple)
  176. func = interp1d(x, y, self.logger)
  177. inte = integrate.quad(func, left, right)[0]
  178. avg = inte / (right - left)
  179. func2 = lambda x: (func(x) - avg) ** 2
  180. inte2 = integrate.quad(func2, left, right)[0]
  181. avg2 = inte2 / (right - left)
  182. return avg2 ** 0.5
  183. if __name__ == "__main__":
  184. from utils.s7data import S7Client, S7data
  185. from utils.logger import Logger
  186. # 配置S7连接
  187. s7_1 = S7Client()
  188. s7_1.connect("172.16.1.20", 0, 0)
  189. data_1 = S7data("conf/s7@172.16.1.20.csv")
  190. data_1.set_S7Client(s7_1)
  191. data_1.auto_update_group()
  192. s7_2 = S7Client()
  193. s7_2.connect("172.16.1.21", 0, 0)
  194. data_2 = S7data("conf/s7@172.16.1.21.csv")
  195. data_2.set_S7Client(s7_2)
  196. data_2.auto_update_group()
  197. s7_3 = S7Client()
  198. s7_3.connect("192.168.1.215", 0, 0)
  199. data_3 = S7data("conf/s7@192.168.1.215.csv")
  200. data_3.set_S7Client(s7_3)
  201. data_3.auto_update_group()
  202. # 配置CIP连接
  203. cip_data = CIPData("192.168.3.100")
  204. cip_data.start_update()
  205. # 配置日志模块
  206. logger = Logger('test')
  207. logger.screen_on()
  208. class C:
  209. def upload_billet(self, arg_dict: dict):
  210. pass
  211. # 钢坯拟合模块
  212. steel_fit = SteelFit(data_1, data_3, cip_data, C(), logger)