| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- from utils.statepoint import Statepoint
- from models.data_sender import Sender
- import numpy as np
- from scipy import interpolate, integrate
- from collections import deque
- from models.cip_data import CIPData
- from utils.s7data import S7data
- import datetime, logging, time, threading, queue
- def interp1d(x, y, logger: logging.Logger):
- if x[-1] < datetime.datetime.now().timestamp() - 300:
- logger.error("连续5分钟未采集到变化数值,表示为-1")
- return lambda data: -1
- if len(x) < 4:
- logger.warning("采集到的数量低于4个,表示为平均值")
- avg = np.mean(y)
- return lambda data: avg
- try:
- return interpolate.interp1d(x, y, kind='cubic')
- except Exception as e:
- logger.error(f"三次样条插值失败:{e}")
- return lambda data: -1
- class BufferPoint(Statepoint):
- def __init__(self, initvalue = None, initstate = False, maxlen: int | None = 3000):
- super().__init__(deque(maxlen = maxlen), initstate)
- def inject(self, data):
- self.data.append((data, datetime.datetime.now().timestamp()))
- def get_buffer(self):
- res = self.data.copy()
- last = res[-1][0]
- res.append((last, datetime.datetime.now().timestamp() + 0.001))
- return res
- class billet_data_gatherer:
- def __init__(self, dspeed_point: BufferPoint, cutting_sig_point: Statepoint, sizing_point: Statepoint, flow_rate_point_list: list[BufferPoint],
- logger: logging.Logger, strand_no: int, result_queue: queue.Queue):
- self.dspeed_point = dspeed_point
- self.cutting_sig_point = cutting_sig_point
- self.sizing_point = sizing_point
- self.flow_rate_point_list = flow_rate_point_list
- self.logger = logger
- self.strand_no = strand_no
- self.result_queue = result_queue
- self.MOLD_TO_CUTTER_DISTANCE = 28
- self.CRITICAL_ZONE_LENGTH = 12
- self.cutting_sig_point.set_excite_action(self.cutting_action)
-
- def cutting_action(self):
- cutting_time = datetime.datetime.now().timestamp()
- sizing = self.sizing_point.data / 1000
- self.logger.debug(f"{self.strand_no}流开始切割")
- time.sleep(30)
- dspeed_buffer: deque = self.dspeed_point.get_buffer()
- flow_rate_buffer_list: list[deque] = [self.flow_rate_point_list[i].get_buffer() for i in range(5)]
- if len(dspeed_buffer) < 10:
- self.logger.debug(f"{self.strand_no}流已统计数据量不足,无法计算")
- return
- data_tuple, time_tuple = zip(*dspeed_buffer)
- x = np.array(time_tuple)
- y = np.array(data_tuple) / 60
- vt_func = interp1d(x, y, self.logger)
- entry_time = self._binary_search_start(vt_func, cutting_time, sizing + self.MOLD_TO_CUTTER_DISTANCE)
- if entry_time == None:
- self.logger.debug(f"{self.strand_no}流已统计数据量不足,无法计算")
- return
-
- exit_time = self._binary_search_end(vt_func, entry_time, sizing + self.CRITICAL_ZONE_LENGTH)
- dspeed_avg = (sizing + self.CRITICAL_ZONE_LENGTH) / (exit_time - entry_time) * 60
- self.create_data(cutting_time, entry_time, exit_time, self.flow_rate_total(flow_rate_buffer_list, entry_time, exit_time), dspeed_avg)
-
- def _binary_search_start(self, func, upper_limit, target):
- left = func.x.min()
- right = func.x.max()
- if self._get_distance(func, left, upper_limit) < target:
- return
- while abs(right - left) > 0.01:
- mid = (left + right) / 2
- if self._get_distance(func, mid, upper_limit) >= target:
- left = mid
- else:
- right = mid
-
- return (left + right) / 2
-
- def _binary_search_end(self, func, lower_limit, target):
- left = func.x.min()
- right = func.x.max()
- while abs(right - left) > 0.01:
- mid = (left + right) / 2
- if self._get_distance(func, lower_limit, mid) >= target:
- right = mid
- else:
- left = mid
-
- return (left + right) / 2
- def _get_distance(self, vt_func, lower, upper):
- return integrate.quad(vt_func, lower, upper)[0]
-
- def flow_rate_total(self, deque_list: list[deque], start_time, end_time):
- res = []
- for dequei in deque_list:
- data_tuple, time_tuple = zip(*dequei)
- x = np.array(time_tuple)
- y = np.array(data_tuple) / 3600
- vt_func = interp1d(x, y, self.logger)
- total = integrate.quad(vt_func, start_time, end_time)[0]
- res.append(total)
- return sum(res)
-
- def create_data(self, cutting_time, entry_time, exit_time, water_total, dspeed_avg):
- self.logger.debug(f"{self.strand_no}流钢坯计算结果:")
- self.logger.debug(f'\t切割时间:{datetime.datetime.fromtimestamp(cutting_time).strftime("%Y-%m-%d %H:%M:%S")}')
- self.logger.debug(f'\t进入时间:{datetime.datetime.fromtimestamp(entry_time).strftime("%Y-%m-%d %H:%M:%S")}')
- self.logger.debug(f'\t离开时间:{datetime.datetime.fromtimestamp(exit_time).strftime("%Y-%m-%d %H:%M:%S")}')
- self.logger.debug(f'\t水量总计:{water_total}')
- self.logger.debug(f'\t平均拉速:{dspeed_avg}')
- self.result_queue.put((self.strand_no, float(cutting_time), float(entry_time), float(exit_time), float(water_total), float(dspeed_avg)))
- class SteelFit:
- def __init__(self, s7_data_20: S7data, s7_data_215: S7data, cip_data: CIPData, sender: Sender, logger: logging.Logger):
- self.water_temperature_buffer: BufferPoint = cip_data.make_point("5#二冷水总管温度", BufferPoint)
- self.water_pressure_buffer: BufferPoint = cip_data.make_point("5#二冷水总管压力", BufferPoint)
- self.steel_temperature_buffer: BufferPoint = s7_data_20.make_point("中间包连续测温温度", BufferPoint)
- self.water_temperature_difference_buffer: BufferPoint = cip_data.make_point("5#结晶器水温差", BufferPoint)
- self.dspeed_buffer = [s7_data_20.make_point(f"{i}流结晶器拉速", BufferPoint) for i in range(1, 9)]
- self.cutting_sig_point = [s7_data_215.make_point(f"L{i}切割信号[0]") for i in range(1, 9)]
- self.sizing_point = [s7_data_20.make_point(f"{i}流定尺") for i in range(1, 9)]
- self.flow_rate_point_list = [[cip_data.make_point(f"5#水流量-{i}流-{j}段", BufferPoint) for j in range(1, 6)] for i in range(1, 9)]
- self.sender = sender
- self.logger = logger
- self.task_queue = queue.Queue()
- self.billet_data_gatherer_list = [
- billet_data_gatherer(
- self.dspeed_buffer[i],
- self.cutting_sig_point[i],
- self.sizing_point[i],
- self.flow_rate_point_list[i],
- logger,
- i + 1,
- self.task_queue
- )
- for i in range(8)
- ]
-
- self.thread_run = True
- self.thread = threading.Thread(target=self.loop_process)
- self.thread.start()
- def loop_process(self):
- while self.thread_run:
- try:
- task_tuple = self.task_queue.get(True, 1)
- tmp_dict = {}
- tmp_dict["strand_no"] = task_tuple[0]
- tmp_dict["cutting_time"] = datetime.datetime.fromtimestamp(task_tuple[1])
- tmp_dict["entry_time"] = datetime.datetime.fromtimestamp(task_tuple[2])
- tmp_dict["exit_time"] = datetime.datetime.fromtimestamp(task_tuple[3])
-
- cal_res = self.cal_data(task_tuple[2], task_tuple[3])
- tmp_dict["water_temperature"] = cal_res[0]
- tmp_dict["water_pressure"] = cal_res[1]
- tmp_dict["water_volume"] = task_tuple[4]
- tmp_dict["water_pressure_sd"] = cal_res[2]
- tmp_dict["steel_temperature"] = cal_res[3]
- tmp_dict["drawing_speed"] = task_tuple[5]
- tmp_dict["water_temperature_difference"] = cal_res[4]
- self.sender.upload_billet(tmp_dict)
- except queue.Empty:
- pass
- except Exception as e:
- self.logger.error(f"铸机数据计算过程中出现意外:{e}")
- def cal_data(self, entry_time, exit_time):
- wt = self.interval_avg(self.water_temperature_buffer.get_buffer(), entry_time, exit_time)
- wp = self.interval_avg(self.water_pressure_buffer.get_buffer(), entry_time, exit_time)
- wps = self.interval_sd(self.water_pressure_buffer.get_buffer(), entry_time, exit_time)
- st = self.interval_avg(self.steel_temperature_buffer.get_buffer(), entry_time, exit_time)
- wtd = self.interval_avg(self.water_temperature_difference_buffer.get_buffer(), entry_time, exit_time)
- return (float(wt), float(wp), float(wps), float(st), float(wtd))
- def interval_avg(self, buffer, left, right):
- data_tuple, time_tuple = zip(*buffer)
- x = np.array(time_tuple)
- y = np.array(data_tuple)
- func = interp1d(x, y, self.logger)
- inte = integrate.quad(func, left, right)[0]
- return inte / (right - left)
- def interval_sd(self, buffer, left, right):
- data_tuple, time_tuple = zip(*buffer)
- x = np.array(time_tuple)
- y = np.array(data_tuple)
- func = interp1d(x, y, self.logger)
- inte = integrate.quad(func, left, right)[0]
- avg = inte / (right - left)
- func2 = lambda x: (func(x) - avg) ** 2
- inte2 = integrate.quad(func2, left, right)[0]
- avg2 = inte2 / (right - left)
- return avg2 ** 0.5
- if __name__ == "__main__":
- from utils.s7data import S7Client, S7data
- from utils.logger import Logger
- # 配置S7连接
- s7_1 = S7Client()
- s7_1.connect("172.16.1.20", 0, 0)
- data_1 = S7data("conf/s7@172.16.1.20.csv")
- data_1.set_S7Client(s7_1)
- data_1.auto_update_group()
-
- s7_2 = S7Client()
- s7_2.connect("172.16.1.21", 0, 0)
- data_2 = S7data("conf/s7@172.16.1.21.csv")
- data_2.set_S7Client(s7_2)
- data_2.auto_update_group()
- s7_3 = S7Client()
- s7_3.connect("192.168.1.215", 0, 0)
- data_3 = S7data("conf/s7@192.168.1.215.csv")
- data_3.set_S7Client(s7_3)
- data_3.auto_update_group()
- # 配置CIP连接
- cip_data = CIPData("192.168.3.100")
- cip_data.start_update()
-
- # 配置日志模块
- logger = Logger('test')
- logger.screen_on()
- class C:
- def upload_billet(self, arg_dict: dict):
- pass
- # 钢坯拟合模块
- steel_fit = SteelFit(data_1, data_3, cip_data, C(), logger)
|