from utils.statepoint import * from utils.mqttdata import * from utils.s7data import * from models.data_sender import * from models.mysql_data import MysqlData, MysqlDataSizing import logging, datetime, queue, requests def _get_heat_init_strand(heat_no, retry = 3): url = f"http://192.168.0.119:7005/actualControl/billetActual/heatNoStats?heatNo={heat_no}" while retry: try: response = requests.get(url) result = response.json() if result["success"]: return result["result"]["strandTotal"] except Exception as e: warnings.warn(str(e)) finally: retry -= 1 return [0] * 8 class Heat: def __init__(self, ccm_no, heat_no, start_pour_time: datetime.datetime = None, init_strands_status: list[int] = [0] * 8): self.ccm_no = str(ccm_no) self.heat_no = str(heat_no) self.start_pour_time = start_pour_time self.using_event = threading.Event() self.strands_req = init_strands_status self.total = sum(init_strands_status) self.billet_union_count = {} self.resource_lock = threading.Lock() @classmethod def from_dict(cls, src_dict: dict): return cls( src_dict["ccmNo"], src_dict["heatNo"], src_dict["sendTime"] if "sendTime" in src_dict and src_dict["sendTime"] else None ) def to_dict(self): return { 'ccmNo': self.ccm_no, 'heatNo': self.heat_no, 'sendTime': self.start_pour_time.strftime("%Y-%m-%d %H:%M:%S") } def begin_use(self): self.using_event.set() def is_poured(self) -> bool: if self.start_pour_time: return True return False def is_using(self) -> bool: return self.using_event.is_set() def init_strands_status(self, init_status: list[int]): if self.is_using(): raise ValueError("无法对已开始出坯的炉进行流初始化") with self.resource_lock: self.strands_req = init_status self.total = sum(init_status) def claim_strand_seq(self, strand_no) -> tuple[int]: if not self.is_using(): return 0 index = strand_no - 1 with self.resource_lock: self.total += 1 self.strands_req[index] += 1 return self.strands_req[index], self.total def claim_union_seq(self, length: int) -> int: if not self.is_using(): return 0 with self.resource_lock: if length not in self.billet_union_count: self.billet_union_count[length] = 0 self.billet_union_count[length] += 1 return self.billet_union_count[length] class Billet: def __init__(self, cutting_time: datetime.datetime = None): if cutting_time: self.start_cutting_time: datetime.datetime = cutting_time else: self.start_cutting_time: datetime.datetime = datetime.datetime.now() self.heat: Heat = None self.stop_cutting_time: datetime.datetime = None self.strand_no: int = None self._strand_seq: int = None self._heat_seq: int = None self.sizing = None self.drawing_speed = None @property def billet_no(self): if self.heat and self.strand_no and self.strand_seq: return self.heat_no + self.ccm_no + str(self.strand_no) + f"{self.strand_seq:02d}" @property def ccm_no(self): if self.heat: return self.heat.ccm_no @property def heat_no(self): if self.heat: return self.heat.heat_no @property def strand_seq(self): return self._strand_seq @property def heat_seq(self): return self._heat_seq def batch_strand(self, strand_no = None) -> bool: if strand_no: self.strand_no = strand_no if not (self.strand_no and self.heat): return False res = self.heat.claim_strand_seq(self.strand_no) if res: self._strand_seq, self._heat_seq = res return True return False class BatchHeat: def __init__(self, ccm_no, manual_change_heat_sig: Statepoint, mysql_heat_point: Statepoint, mqtt_heat_point: Statepoint, ladle_weight_point_1: Statepoint, ladle_weight_point_2: Statepoint, sender: Sender, logger: logging.Logger, queue_max = 5): self.ccm_no = ccm_no self.manual_change_heat_sig = manual_change_heat_sig self.mysql_heat_point = mysql_heat_point self.mqtt_heat_point = mqtt_heat_point self.ladle_weight_point_1 = ladle_weight_point_1 self.ladle_weight_point_2 = ladle_weight_point_2 self.sender = sender self.logger = logger self.last_enqueue = () self.heats_queue = queue.PriorityQueue(queue_max) self.heat_info_shared = {} self.heat_info_lock = threading.Lock() self.current_heat: Heat = None self._thread: threading.Thread = None self._thread_event = threading.Event() self.change_heat_lock = threading.Lock() self.manual_change_heat_sig.set_excite_action(self.manual_change_heat_action) self.mysql_heat_point.set_excite_action(self.heat_arrive_in_action) self.mqtt_heat_point.set_excite_action(self.heat_start_pour_action) self.current_heat_point = Statepoint(0) def heat_arrive_in_action(self): # 维护内部状态 self.mysql_heat_point.set_state(False) # 获取炉次到站信息 heat_no, into_time = self.mysql_heat_point.data time_for_check = into_time + datetime.timedelta(minutes=12) self.heat_info_shared[heat_no] = {} self.logger.debug(f"[Counter]检测到炉次进站{heat_no}") # 大包重量选择算法 ladle_weight = max(self.ladle_weight_point_1.data, self.ladle_weight_point_2.data) # 等待诺德开浇信号判断 while self.heat_info_shared[heat_no] == {}: # 如果超时,根据入站时间预测开浇时间 if datetime.datetime.now() > time_for_check: with self.heat_info_lock: if "heatNo" not in self.heat_info_shared[heat_no]: self.heat_info_shared[heat_no]["ccmNo"] = self.ccm_no self.heat_info_shared[heat_no]["heatNo"] = heat_no self.heat_info_shared[heat_no]["sendTime"] = into_time + datetime.timedelta(minutes=5) break time.sleep(2) # 标志是否为铸机开机的第一次开浇 first_heat_flag = self.current_heat == None and self.heats_queue.empty() # 写入日志 if first_heat_flag: self.logger.info(f'[Counter]首次开浇:{heat_no}') else: self.logger.info(f'[Counter]炉次开浇:{heat_no}') # 维护浇铸队列 try: self.last_enqueue = (self.heat_info_shared[heat_no]["sendTime"], Heat.from_dict(self.heat_info_shared[heat_no])) self.heats_queue.put(self.last_enqueue, True, 3) except TimeoutError as e: self.logger.error(f"[Counter]炉次队列已满({self.heats_queue.maxsize}),存在异常:{e}") return # 使用sender向外发送信号 self.sender.begin_pour(self.heat_info_shared[heat_no], ladle_weight) self.heat_info_shared.pop(heat_no) def heat_start_pour_action(self): heat_info = self.mqtt_heat_point.data heat_no = str(heat_info['heatNo']) if "sendTime" in heat_info: heat_info["sendTime"] = datetime.datetime.strptime(heat_info["sendTime"], "%Y-%m-%d %H:%M:%S") if heat_no in self.heat_info_shared: with self.heat_info_lock: self.heat_info_shared[heat_no] = heat_info self.logger.debug(f'[Counter]收到诺德开浇信号:{heat_no}') elif self.last_enqueue[-1].heat_no == heat_no: self.logger.warning(f"[Counter]诺德开浇信号晚到{heat_no}") else: self.logger.info(f'[Counter]开浇信号表示倒浇操作:{heat_no}') # 维护浇铸队列 try: self.last_enqueue = (heat_info["sendTime"], Heat.from_dict(heat_info)) self.heats_queue.put(self.last_enqueue, True, 3) except TimeoutError as e: self.logger.error(f"[Counter]炉次队列已满({self.heats_queue.maxsize}),无法插入{heat_no},异常:{e}") return def manual_change_heat_action(self): with self.change_heat_lock: heat_info = self.manual_change_heat_sig.data if self.current_heat and heat_info['heatNo'] == self.current_heat.heat_no: return if "startPourTime" in heat_info: heat_info["sendTime"] = datetime.datetime.strptime(heat_info["startPourTime"], "%Y-%m-%d %H:%M:%S") self.logger.info(f"手动换炉触发:{self.current_heat.heat_no if self.current_heat else "未知"}->{heat_info['heatNo']}") self.sender.begin_pour(heat_info, max(self.ladle_weight_point_1.data, self.ladle_weight_point_2.data)) self.current_heat = Heat.from_dict(heat_info) self.current_heat.init_strands_status(_get_heat_init_strand(self.current_heat.heat_no)) self.current_heat.begin_use() self.current_heat_point.inject(int(self.current_heat.heat_no)) def loop_forever(self): try: while self._thread_event.is_set(): if not self.heats_queue.empty() and self.heats_queue.queue[0][0] + datetime.timedelta(minutes=10) <= datetime.datetime.now(): with self.change_heat_lock: _, heat = self.heats_queue.get() if self.current_heat: self.logger.debug(f"[Counter]炉次划分{self.current_heat.heat_no}->{heat.heat_no}") else: self.logger.debug(f"[Counter]连铸机出坯启动,炉次为{heat.heat_no}") self.current_heat = heat self.current_heat.init_strands_status(_get_heat_init_strand(self.current_heat.heat_no)) self.current_heat.begin_use() self.current_heat_point.inject(int(self.current_heat.heat_no)) except Exception as e: self._thread_event.clear() self.logger.error(f"换炉线程意外退出:{e}") def loop_start(self): if self._thread and self._thread.is_alive(): if self._thread_event.is_set(): return raise ChildProcessError("换炉线程异常持续运行") self._thread = threading.Thread(target=self.loop_forever) self._thread_event.set() self._thread.start() def loop_stop(self): self._thread_event.clear() if self._thread and self._thread.is_alive(): self._thread.join(3) def get_current_heat_info(self) -> dict: # 当换炉进行中时,获取炉次信息阻塞等待 with self.change_heat_lock: return self.current_heat def __del__(self): self.loop_stop() class BatchBillet: def __init__(self, heat: BatchHeat, cutting_sig_point_list: list[Statepoint], short_sizing_point_list: list[Statepoint], sizing_point_list: list[Statepoint], drawing_speed_point_list: list[Statepoint], logger: logging.Logger): self.heat = heat self.cutting_sig_point_list = cutting_sig_point_list self.short_sizing_point_list = short_sizing_point_list self.sizing_point_list = sizing_point_list self.drawing_speed_point_list = drawing_speed_point_list self.logger = logger self.output_billet_queues = [queue.Queue(2) for _ in range(8)] self.last_enqueue: list[Billet] = [None for _ in range(8)] self._backfill_on_off = threading.Event() self.dspeed_integrate_point_list = None for i in range(8): self.cutting_sig_point_list[i].set_excite_action(lambda index=i: self.cutting_action(index)) def cutting_action(self, index: int): # 先获取实时信息 sizing = self.sizing_point_list[index].data drawing_speed = self.drawing_speed_point_list[index].data # 维护拉速回填机制,顺便利用拉速判断短尺 if self._backfill_on_off.is_set(): predict_length = self.dspeed_integrate_point_list[index].data self.dspeed_integrate_point_list[index].data = 0 predict_length *= 1000 if predict_length < 3000: predict_length += sizing self.logger.debug(f"[Counter]{index+1}流钢坯预测长度为{predict_length}") if abs(self.short_sizing_point_list[index].data - predict_length) < abs(sizing - predict_length): sizing = self.short_sizing_point_list[index].data self.logger.info(f"[Counter]{index+1}流钢坯开始切割") billet = Billet() billet.sizing = sizing billet.drawing_speed = drawing_speed billet.strand_no = index + 1 billet.heat = self.heat.get_current_heat_info() if billet.heat: self.trans_billet(billet) else: self.logger.warning(f"[Counter]{index+1}流钢坯未获取到炉次信息") def trans_billet(self, billet: Billet): try: self.output_billet_queues[billet.strand_no-1].put(billet, True, 10) self.last_enqueue[billet.strand_no-1] = billet except Exception as e: self.logger.error(f"{billet.strand_no}流队列加入钢坯失败:{e}") def set_dspeed_backfill(self, dspeed_integrate_point_list: list[Statepoint]): if self.dspeed_integrate_point_list: raise ValueError("拉速补偿数据点不可重复设置") self.dspeed_integrate_point_list = dspeed_integrate_point_list.copy() for i in range(8): self.dspeed_integrate_point_list[i].set_convertor(lambda data, index=i: self.dspeed_convertor(data, index)) self.dspeed_integrate_point_list[i].data = 0 self.dspeed_integrate_point_list[i].set_excite_action(lambda index=i: self.dspeed_backfill_action(index)) def open_dspeed_backfill(self): if not self.drawing_speed_point_list: raise ValueError("未设置拉速补偿数据点") if self._backfill_on_off.is_set(): return self._backfill_on_off.set() for i in self.drawing_speed_point_list: i.data = 0 i.set_state(0) i.allow_update() def close_dspeed_backfill(self): self._backfill_on_off.clear() if self.drawing_speed_point_list: for i in self.drawing_speed_point_list: i.allow_update(0) def dspeed_convertor(self, data, index): return data * 1000 >= self.sizing_point_list[index].data def dspeed_backfill_action(self, index: int): # 维护内部状态 self.dspeed_integrate_point_list[index].data = 0 self.dspeed_integrate_point_list[index].set_state(False) in_time = datetime.datetime.now() while datetime.datetime.now() - in_time < datetime.timedelta(seconds=20): if self.last_enqueue[index] and datetime.datetime.now() - self.last_enqueue[index].start_cutting_time < datetime.timedelta(seconds=90): return time.sleep(0.5) self.logger.info(f"[Counter]{index+1}流拉速判据回填钢坯") billet = Billet(in_time) billet.sizing = self.sizing_point_list[index].data billet.drawing_speed = self.drawing_speed_point_list[index].data billet.strand_no = index + 1 # 炉次信息可能因换炉阻塞,上面先获取实时信息 billet.heat = self.heat.get_current_heat_info() if billet.heat: self.trans_billet(billet) else: self.logger.warning(f"[Counter]{index+1}流钢坯未获取到炉次信息") class Batcher: @property def data_mqtt(self): if self._data_mqtt.cli == None: raise ValueError('The MQTT connection to MES has not been initialized.') return self._data_mqtt @property def data_s7(self): if self._data_s7 == None: raise ValueError('The S7 connection to the casting machine PLC has not been initialized.') return self._data_s7 @property def sender(self): if self._sender == None: raise ValueError('The sender has not been set.') return self._sender def __init__(self, data_mqtt: Mqttdata, data_s7: S7data, data_web: Mqttdata, ccmNo, logger: logging.Logger, sender: Sender, data_nuo: MysqlData, data_qbc: MysqlDataSizing): self._data_mqtt = data_mqtt self._data_s7 = data_s7 self.data_web = data_web self._sender = sender self.logger = logger self.ccm_no = ccmNo self.data_nuo = data_nuo self.data_qbc = data_qbc self.logger.info(f"[Counter]分炉分坯模块:{ccmNo}号机模块启动") self.ladle_weight_1 = self.data_s7.make_point('大包重量1') self.ladle_weight_2 = self.data_s7.make_point('大包重量2') self.begin_pour_sig = self.data_mqtt.make_point(f'{ccmNo}#开浇信号') self.end_pour_sig = self.data_mqtt.make_point(f'{ccmNo}#停浇信号') self.nuo_heat_point = self.data_nuo.make_point(f"{ccmNo}#到站信息") self.manual_change_heat_sig = data_web.make_point(f"{ccmNo}#手动换炉") self.begin_cutting_sig = [self.data_s7.make_point(f'L{i+1}切割信号[0]') for i in range(8)] self.end_cutting_sig = [self.data_s7.make_point(f'L{i+1}切割信号[1]') for i in range(8)] if ccmNo == '5': self.length_cutting = [self.data_qbc.make_point(f'{i+1}流定尺') for i in range(8)] elif ccmNo == '6': self.length_cutting = [self.data_s7.make_point(f'L{i+1}定尺') for i in range(8)] self.short_length_cutting = [self.data_s7.make_point(f'L{i+1}短尺') for i in range(8)] self.drawing_speed = [self.data_s7.make_point(f'L{i+1}拉速') for i in range(8)] # 拉速补偿点位 self.dspeed_backfill_point = [self.data_s7.make_point(f"L{i+1}拉速", Integration_speed_mpmin) for i in range(8)] # 主逻辑对象 self.batch_heat = BatchHeat(self.ccm_no, self.manual_change_heat_sig, self.nuo_heat_point, self.begin_pour_sig, self.ladle_weight_1, self.ladle_weight_2, self.sender, self.logger) self.batch_heat.loop_start() self.batch_billet = BatchBillet(self.batch_heat, self.begin_cutting_sig, self.short_length_cutting, self.length_cutting, self.drawing_speed, self.logger) self.batch_billet.set_dspeed_backfill(self.dspeed_backfill_point) self.batch_billet.open_dspeed_backfill()