| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463 |
- from utils.statepoint import *
- from utils.mqttdata import *
- from utils.s7data import *
- from models.data_sender import *
- from models.mysql_data import MysqlData, MysqlDataSizing
- import logging, datetime, queue, requests
- def _get_heat_init_strand(heat_no, retry = 3):
- url = f"http://192.168.0.119:7005/actualControl/billetActual/heatNoStats?heatNo={heat_no}"
- while retry:
- try:
- response = requests.get(url)
- result = response.json()
- if result["success"]:
- return result["result"]["strandTotal"]
- except Exception as e:
- warnings.warn(str(e))
- finally:
- retry -= 1
- return [0] * 8
- class Heat:
- def __init__(self, ccm_no, heat_no, start_pour_time: datetime.datetime = None, init_strands_status: list[int] = [0] * 8):
- self.ccm_no = str(ccm_no)
- self.heat_no = str(heat_no)
- self.start_pour_time = start_pour_time
- self.using_event = threading.Event()
- self.strands_req = init_strands_status
- self.total = sum(init_strands_status)
- self.billet_union_count = {}
- self.resource_lock = threading.Lock()
- @classmethod
- def from_dict(cls, src_dict: dict):
- return cls(
- src_dict["ccmNo"],
- src_dict["heatNo"],
- src_dict["sendTime"] if "sendTime" in src_dict and src_dict["sendTime"] else None
- )
-
- def to_dict(self):
- return {
- 'ccmNo': self.ccm_no,
- 'heatNo': self.heat_no,
- 'sendTime': self.start_pour_time.strftime("%Y-%m-%d %H:%M:%S")
- }
-
- def begin_use(self):
- self.using_event.set()
- def is_poured(self) -> bool:
- if self.start_pour_time:
- return True
- return False
-
- def is_using(self) -> bool:
- return self.using_event.is_set()
-
- def init_strands_status(self, init_status: list[int]):
- if self.is_using():
- raise ValueError("无法对已开始出坯的炉进行流初始化")
- with self.resource_lock:
- self.strands_req = init_status
- self.total = sum(init_status)
-
- def claim_strand_seq(self, strand_no) -> tuple[int]:
- if not self.is_using():
- return 0
- index = strand_no - 1
- with self.resource_lock:
- self.total += 1
- self.strands_req[index] += 1
- return self.strands_req[index], self.total
-
- def claim_union_seq(self, length: int) -> int:
- if not self.is_using():
- return 0
- with self.resource_lock:
- if length not in self.billet_union_count:
- self.billet_union_count[length] = 0
- self.billet_union_count[length] += 1
- return self.billet_union_count[length]
- class Billet:
- def __init__(self, cutting_time: datetime.datetime = None):
- if cutting_time:
- self.start_cutting_time: datetime.datetime = cutting_time
- else:
- self.start_cutting_time: datetime.datetime = datetime.datetime.now()
- self.heat: Heat = None
- self.stop_cutting_time: datetime.datetime = None
- self.strand_no: int = None
- self._strand_seq: int = None
- self._heat_seq: int = None
- self.sizing = None
- self.drawing_speed = None
- @property
- def billet_no(self):
- if self.heat and self.strand_no and self.strand_seq:
- return self.heat_no + self.ccm_no + str(self.strand_no) + f"{self.strand_seq:02d}"
- @property
- def ccm_no(self):
- if self.heat:
- return self.heat.ccm_no
- @property
- def heat_no(self):
- if self.heat:
- return self.heat.heat_no
- @property
- def strand_seq(self):
- return self._strand_seq
-
- @property
- def heat_seq(self):
- return self._heat_seq
-
- def batch_strand(self, strand_no = None) -> bool:
- if strand_no:
- self.strand_no = strand_no
- if not (self.strand_no and self.heat):
- return False
- res = self.heat.claim_strand_seq(self.strand_no)
- if res:
- self._strand_seq, self._heat_seq = res
- return True
-
- return False
- class BatchHeat:
- def __init__(self, ccm_no, manual_change_heat_sig: Statepoint,
- mysql_heat_point: Statepoint, mqtt_heat_point: Statepoint,
- ladle_weight_point_1: Statepoint, ladle_weight_point_2: Statepoint,
- sender: Sender, logger: logging.Logger, queue_max = 5):
- self.ccm_no = ccm_no
- self.manual_change_heat_sig = manual_change_heat_sig
- self.mysql_heat_point = mysql_heat_point
- self.mqtt_heat_point = mqtt_heat_point
- self.ladle_weight_point_1 = ladle_weight_point_1
- self.ladle_weight_point_2 = ladle_weight_point_2
- self.sender = sender
- self.logger = logger
- self.last_enqueue = ()
- self.heats_queue = queue.PriorityQueue(queue_max)
- self.heat_info_shared = {}
- self.heat_info_lock = threading.Lock()
- self.current_heat: Heat = None
- self._thread: threading.Thread = None
- self._thread_event = threading.Event()
- self.change_heat_lock = threading.Lock()
- self.manual_change_heat_sig.set_excite_action(self.manual_change_heat_action)
- self.mysql_heat_point.set_excite_action(self.heat_arrive_in_action)
- self.mqtt_heat_point.set_excite_action(self.heat_start_pour_action)
- self.current_heat_point = Statepoint(0)
- def heat_arrive_in_action(self):
- # 维护内部状态
- self.mysql_heat_point.set_state(False)
- # 获取炉次到站信息
- heat_no, into_time = self.mysql_heat_point.data
- time_for_check = into_time + datetime.timedelta(minutes=12)
- self.heat_info_shared[heat_no] = {}
- self.logger.debug(f"[Counter]检测到炉次进站{heat_no}")
- # 大包重量选择算法
- ladle_weight = max(self.ladle_weight_point_1.data, self.ladle_weight_point_2.data)
- # 等待诺德开浇信号判断
- while self.heat_info_shared[heat_no] == {}:
- # 如果超时,根据入站时间预测开浇时间
- if datetime.datetime.now() > time_for_check:
- with self.heat_info_lock:
- if "heatNo" not in self.heat_info_shared[heat_no]:
- self.heat_info_shared[heat_no]["ccmNo"] = self.ccm_no
- self.heat_info_shared[heat_no]["heatNo"] = heat_no
- self.heat_info_shared[heat_no]["sendTime"] = into_time + datetime.timedelta(minutes=5)
- break
- time.sleep(2)
- # 标志是否为铸机开机的第一次开浇
- first_heat_flag = self.current_heat == None and self.heats_queue.empty()
- # 写入日志
- if first_heat_flag:
- self.logger.info(f'[Counter]首次开浇:{heat_no}')
- else:
- self.logger.info(f'[Counter]炉次开浇:{heat_no}')
- # 维护浇铸队列
- try:
- self.last_enqueue = (self.heat_info_shared[heat_no]["sendTime"], Heat.from_dict(self.heat_info_shared[heat_no]))
- self.heats_queue.put(self.last_enqueue, True, 3)
- except TimeoutError as e:
- self.logger.error(f"[Counter]炉次队列已满({self.heats_queue.maxsize}),存在异常:{e}")
- return
- # 使用sender向外发送信号
- self.sender.begin_pour(self.heat_info_shared[heat_no], ladle_weight)
- self.heat_info_shared.pop(heat_no)
- def heat_start_pour_action(self):
- heat_info = self.mqtt_heat_point.data
- heat_no = str(heat_info['heatNo'])
- if "sendTime" in heat_info:
- heat_info["sendTime"] = datetime.datetime.strptime(heat_info["sendTime"], "%Y-%m-%d %H:%M:%S")
- if heat_no in self.heat_info_shared:
- with self.heat_info_lock:
- self.heat_info_shared[heat_no] = heat_info
- self.logger.debug(f'[Counter]收到诺德开浇信号:{heat_no}')
- elif self.last_enqueue[-1].heat_no == heat_no:
- self.logger.warning(f"[Counter]诺德开浇信号晚到{heat_no}")
- else:
- self.logger.info(f'[Counter]开浇信号表示倒浇操作:{heat_no}')
- # 维护浇铸队列
- try:
- self.last_enqueue = (heat_info["sendTime"], Heat.from_dict(heat_info))
- self.heats_queue.put(self.last_enqueue, True, 3)
- except TimeoutError as e:
- self.logger.error(f"[Counter]炉次队列已满({self.heats_queue.maxsize}),无法插入{heat_no},异常:{e}")
- return
-
- def manual_change_heat_action(self):
- with self.change_heat_lock:
- heat_info = self.manual_change_heat_sig.data
- if self.current_heat and heat_info['heatNo'] == self.current_heat.heat_no:
- return
-
- if "startPourTime" in heat_info:
- heat_info["sendTime"] = datetime.datetime.strptime(heat_info["startPourTime"], "%Y-%m-%d %H:%M:%S")
- self.logger.info(f"手动换炉触发:{self.current_heat.heat_no if self.current_heat else "未知"}->{heat_info['heatNo']}")
- self.sender.begin_pour(heat_info, max(self.ladle_weight_point_1.data, self.ladle_weight_point_2.data))
- self.current_heat = Heat.from_dict(heat_info)
- self.current_heat.init_strands_status(_get_heat_init_strand(self.current_heat.heat_no))
- self.current_heat.begin_use()
- self.current_heat_point.inject(int(self.current_heat.heat_no))
- def loop_forever(self):
- try:
- while self._thread_event.is_set():
- if not self.heats_queue.empty() and self.heats_queue.queue[0][0] + datetime.timedelta(minutes=10) <= datetime.datetime.now():
- with self.change_heat_lock:
- _, heat = self.heats_queue.get()
- if self.current_heat:
- self.logger.debug(f"[Counter]炉次划分{self.current_heat.heat_no}->{heat.heat_no}")
- else:
- self.logger.debug(f"[Counter]连铸机出坯启动,炉次为{heat.heat_no}")
- self.current_heat = heat
- self.current_heat.init_strands_status(_get_heat_init_strand(self.current_heat.heat_no))
- self.current_heat.begin_use()
- self.current_heat_point.inject(int(self.current_heat.heat_no))
- except Exception as e:
- self._thread_event.clear()
- self.logger.error(f"换炉线程意外退出:{e}")
-
- def loop_start(self):
- if self._thread and self._thread.is_alive():
- if self._thread_event.is_set():
- return
- raise ChildProcessError("换炉线程异常持续运行")
- self._thread = threading.Thread(target=self.loop_forever)
- self._thread_event.set()
- self._thread.start()
- def loop_stop(self):
- self._thread_event.clear()
- if self._thread and self._thread.is_alive():
- self._thread.join(3)
- def get_current_heat_info(self) -> dict:
- # 当换炉进行中时,获取炉次信息阻塞等待
- with self.change_heat_lock:
- return self.current_heat
-
- def __del__(self):
- self.loop_stop()
-
- class BatchBillet:
- def __init__(self, heat: BatchHeat,
- cutting_sig_point_list: list[Statepoint], short_sizing_point_list: list[Statepoint],
- sizing_point_list: list[Statepoint], drawing_speed_point_list: list[Statepoint],
- logger: logging.Logger):
- self.heat = heat
- self.cutting_sig_point_list = cutting_sig_point_list
- self.short_sizing_point_list = short_sizing_point_list
- self.sizing_point_list = sizing_point_list
- self.drawing_speed_point_list = drawing_speed_point_list
- self.logger = logger
- self.output_billet_queues = [queue.Queue(2) for _ in range(8)]
- self.last_enqueue: list[Billet] = [None for _ in range(8)]
- self._backfill_on_off = threading.Event()
- self.dspeed_integrate_point_list = None
- for i in range(8):
- self.cutting_sig_point_list[i].set_excite_action(lambda index=i: self.cutting_action(index))
- def cutting_action(self, index: int):
- # 先获取实时信息
- sizing = self.sizing_point_list[index].data
- drawing_speed = self.drawing_speed_point_list[index].data
- # 维护拉速回填机制,顺便利用拉速判断短尺
- if self._backfill_on_off.is_set():
- predict_length = self.dspeed_integrate_point_list[index].data
- self.dspeed_integrate_point_list[index].data = 0
- predict_length *= 1000
- if predict_length < 3000:
- predict_length += sizing
- self.logger.debug(f"[Counter]{index+1}流钢坯预测长度为{predict_length}")
- if abs(self.short_sizing_point_list[index].data - predict_length) < abs(sizing - predict_length):
- sizing = self.short_sizing_point_list[index].data
- self.logger.info(f"[Counter]{index+1}流钢坯开始切割")
- billet = Billet()
- billet.sizing = sizing
- billet.drawing_speed = drawing_speed
- billet.strand_no = index + 1
- billet.heat = self.heat.get_current_heat_info()
- if billet.heat:
- self.trans_billet(billet)
- else:
- self.logger.warning(f"[Counter]{index+1}流钢坯未获取到炉次信息")
- def trans_billet(self, billet: Billet):
- try:
- self.output_billet_queues[billet.strand_no-1].put(billet, True, 10)
- self.last_enqueue[billet.strand_no-1] = billet
- except Exception as e:
- self.logger.error(f"{billet.strand_no}流队列加入钢坯失败:{e}")
- def set_dspeed_backfill(self, dspeed_integrate_point_list: list[Statepoint]):
- if self.dspeed_integrate_point_list:
- raise ValueError("拉速补偿数据点不可重复设置")
- self.dspeed_integrate_point_list = dspeed_integrate_point_list.copy()
- for i in range(8):
- self.dspeed_integrate_point_list[i].set_convertor(lambda data, index=i: self.dspeed_convertor(data, index))
- self.dspeed_integrate_point_list[i].data = 0
- self.dspeed_integrate_point_list[i].set_excite_action(lambda index=i: self.dspeed_backfill_action(index))
- def open_dspeed_backfill(self):
- if not self.drawing_speed_point_list:
- raise ValueError("未设置拉速补偿数据点")
- if self._backfill_on_off.is_set():
- return
- self._backfill_on_off.set()
- for i in self.drawing_speed_point_list:
- i.data = 0
- i.set_state(0)
- i.allow_update()
- def close_dspeed_backfill(self):
- self._backfill_on_off.clear()
- if self.drawing_speed_point_list:
- for i in self.drawing_speed_point_list:
- i.allow_update(0)
- def dspeed_convertor(self, data, index):
- return data * 1000 >= self.sizing_point_list[index].data
- def dspeed_backfill_action(self, index: int):
- # 维护内部状态
- self.dspeed_integrate_point_list[index].data = 0
- self.dspeed_integrate_point_list[index].set_state(False)
-
- in_time = datetime.datetime.now()
- while datetime.datetime.now() - in_time < datetime.timedelta(seconds=20):
- if self.last_enqueue[index] and datetime.datetime.now() - self.last_enqueue[index].start_cutting_time < datetime.timedelta(seconds=90):
- return
- time.sleep(0.5)
-
- self.logger.info(f"[Counter]{index+1}流拉速判据回填钢坯")
- billet = Billet(in_time)
- billet.sizing = self.sizing_point_list[index].data
- billet.drawing_speed = self.drawing_speed_point_list[index].data
- billet.strand_no = index + 1
- # 炉次信息可能因换炉阻塞,上面先获取实时信息
- billet.heat = self.heat.get_current_heat_info()
- if billet.heat:
- self.trans_billet(billet)
- else:
- self.logger.warning(f"[Counter]{index+1}流钢坯未获取到炉次信息")
- class Batcher:
- @property
- def data_mqtt(self):
- if self._data_mqtt.cli == None:
- raise ValueError('The MQTT connection to MES has not been initialized.')
- return self._data_mqtt
-
- @property
- def data_s7(self):
- if self._data_s7 == None:
- raise ValueError('The S7 connection to the casting machine PLC has not been initialized.')
- return self._data_s7
-
- @property
- def sender(self):
- if self._sender == None:
- raise ValueError('The sender has not been set.')
- return self._sender
- def __init__(self, data_mqtt: Mqttdata, data_s7: S7data, data_web: Mqttdata, ccmNo, logger: logging.Logger, sender: Sender, data_nuo: MysqlData, data_qbc: MysqlDataSizing):
- self._data_mqtt = data_mqtt
- self._data_s7 = data_s7
- self.data_web = data_web
- self._sender = sender
- self.logger = logger
- self.ccm_no = ccmNo
- self.data_nuo = data_nuo
- self.data_qbc = data_qbc
- self.logger.info(f"[Counter]分炉分坯模块:{ccmNo}号机模块启动")
- self.ladle_weight_1 = self.data_s7.make_point('大包重量1')
- self.ladle_weight_2 = self.data_s7.make_point('大包重量2')
- self.begin_pour_sig = self.data_mqtt.make_point(f'{ccmNo}#开浇信号')
- self.end_pour_sig = self.data_mqtt.make_point(f'{ccmNo}#停浇信号')
- self.nuo_heat_point = self.data_nuo.make_point(f"{ccmNo}#到站信息")
- self.manual_change_heat_sig = data_web.make_point(f"{ccmNo}#手动换炉")
- self.begin_cutting_sig = [self.data_s7.make_point(f'L{i+1}切割信号[0]') for i in range(8)]
- self.end_cutting_sig = [self.data_s7.make_point(f'L{i+1}切割信号[1]') for i in range(8)]
- if ccmNo == '5':
- self.length_cutting = [self.data_qbc.make_point(f'{i+1}流定尺') for i in range(8)]
- elif ccmNo == '6':
- self.length_cutting = [self.data_s7.make_point(f'L{i+1}定尺') for i in range(8)]
- self.short_length_cutting = [self.data_s7.make_point(f'L{i+1}短尺') for i in range(8)]
- self.drawing_speed = [self.data_s7.make_point(f'L{i+1}拉速') for i in range(8)]
- # 拉速补偿点位
- self.dspeed_backfill_point = [self.data_s7.make_point(f"L{i+1}拉速", Integration_speed_mpmin) for i in range(8)]
-
- # 主逻辑对象
- self.batch_heat = BatchHeat(self.ccm_no, self.manual_change_heat_sig,
- self.nuo_heat_point, self.begin_pour_sig,
- self.ladle_weight_1, self.ladle_weight_2,
- self.sender, self.logger)
- self.batch_heat.loop_start()
-
- self.batch_billet = BatchBillet(self.batch_heat, self.begin_cutting_sig, self.short_length_cutting,
- self.length_cutting, self.drawing_speed, self.logger)
- self.batch_billet.set_dspeed_backfill(self.dspeed_backfill_point)
- self.batch_billet.open_dspeed_backfill()
|