Эх сурвалжийг харах

9.24提交稳定运行版本(非正式)

oldwine 2 сар өмнө
parent
commit
48a3cc6ae0

+ 78 - 20
main.py

@@ -1,5 +1,5 @@
-from models.billet_counter import Counter
 from models.data_sender import Sender
+from models.batcher import Batcher
 from models.billet_trace_pusher import Trace_pusher
 from utils.s7data import S7data, S7Client
 from utils.mqttdata import Mqttdata, MqttClient
@@ -8,20 +8,25 @@ from models.data_forward import *
 from models.parking import Parking
 from models.overhead_crane import Crane
 from models.data_checker import Checker
-from models.billet_stacks import Stack_manager, Billet_stack
+from models.billet_stacks import Stack_manager
 from models.jiaoban import Banci
-import pymysql
+from models.mysql_data import MysqlData, MysqlDataSizing
+from dbutils.pooled_db import PooledDB
+from models.s7_writer import S7Writer
+import pymysql, random, logging
+
+_debug = True
 
 ##############################################################
 # 日志配置
 
 logger_5 = Logger('5#')
 logger_5.file_on_with_rotation('logs/5#log.log')
-# logger_5.screen_on()
+logger_5.screen_on(logging.INFO)
 
 logger_6 = Logger('6#')
 logger_6.file_on_with_rotation('logs/6#log.log')
-# logger_6.screen_on()
+logger_6.screen_on(logging.INFO)
 
 logger_sender = Logger('sender')
 logger_sender.file_on_with_rotation('logs/sender_log.log')
@@ -29,8 +34,11 @@ logger_sender.file_on_with_rotation('logs/sender_log.log')
 
 logger_trace = Logger('trace')
 logger_trace.file_on_with_rotation('logs/trace_log.log')
-# logger_trace.screen_on()
+logger_trace.screen_on(logging.INFO)
 
+logger_mysql = Logger('mysql')
+logger_mysql.file_on_with_rotation('logs/mysql.log')
+# logger_trace.screen_on()
 
 ##############################################################
 # S7数据源配置
@@ -54,8 +62,13 @@ logger_6.info('[PREPARE]6#机PLC连接成功')
 
 ##############################################################
 # MQTT数据源配置
+_debug_random_str = ""
+random_char_lib = "abcdefghijklmnopqrstuvwxyz0123456789"
+if _debug:
+    _debug_random_str = '_debug_' + ''.join(random.choices(random_char_lib, k = 10))
 
-mqtt_mes = MqttClient('python-mqtt-biller_digitalization_test333', 'readonly', '1qazxsw@')
+mqtt_mes_clientid = 'python-mqtt-mes-billet' + _debug_random_str
+mqtt_mes = MqttClient(mqtt_mes_clientid, 'readonly', '1qazxsw@')
 mqtt_mes.connect('192.168.12.201', 1883)
 mqtt_mes.loop_start()
 data_mes = Mqttdata()
@@ -63,7 +76,8 @@ data_mes.set_mqtt_client(mqtt_mes)
 logger_5.info('[PREPARE]与MES使用MQTT连接成功')
 logger_6.info('[PREPARE]与MES使用MQTT连接成功')
 
-mqtt_web = MqttClient('python-mqtt-992_test333')
+mqtt_web_clientid = 'python-mqtt-web-billet' + _debug_random_str
+mqtt_web = MqttClient(mqtt_web_clientid)
 mqtt_web.connect('192.168.0.119', 1883)
 mqtt_web.loop_start()
 data_web = Mqttdata()
@@ -76,30 +90,60 @@ logger_sender.info('[PREPARE]与WEB业务平台使用MQTT连接成功')
 
 db = pymysql.connect(host='localhost', port=3306, user='root', password='1qaz2wsx@..', database='steel_production_db')
 
+nuo_db = PooledDB(
+    creator=pymysql,
+    maxconnections=5,
+    mincached=1,
+    blocking=True,
+    host='192.168.12.201',
+    user='gpszh',
+    password='lg123456',
+    database='nuo_scheduling_lm2',
+    charset='utf8mb4'
+)
+
+data_nuo = MysqlData(nuo_db, logger_mysql)
+
+qbc_db = PooledDB(
+    creator=pymysql,
+    maxconnections=5,
+    mincached=1,
+    blocking=True,
+    host='192.168.1.211',
+    user='zgzt',
+    password='zgzt1234',
+    database='steelmaking_data',
+    charset='utf8mb4'
+)
+
+data_qbc = MysqlDataSizing(qbc_db, logger_mysql)
+
 
 ##############################################################
 # 数据发送服务
 
 sender = Sender(logger_sender)
-#sender.set_mqtt_client(mqtt_web)
-#sender.set_mysql_client(db)
 
-# debug设置
-sender.http_flag = False
-sender.mysql_flag = True
+if _debug:
+    sender.http_flag = False
+    sender.mysql_flag = False
+else:
+    sender.set_mqtt_client(mqtt_web)
+    sender.set_mysql_client(db)
+    sender.mysql_flag = True
 
 
 ##############################################################
 # 分炉分坯服务
 
+flfp_5 = Batcher(data_mes, data_5, data_web, "5", logger_5, sender, data_nuo, data_qbc)
+flfp_6 = Batcher(data_mes, data_6, data_web, "6", logger_6, sender, data_nuo, data_qbc)
+
 position_5 = [10650, 11600, 12830, 13924, 15237, 16440, 17757, 18935]
 position_6 = [8084, 9303, 10618, 11998, 13040, 14409, 15584, 16853]
 
-pusher_5 = Trace_pusher(data_5, logger_5, sender, position_5, data_web.make_point('5#手动换炉'), True)
-pusher_6 = Trace_pusher(data_6, logger_6, sender, position_6, data_web.make_point('6#手动换炉'), hostmove_flag=True)
-
-flfp_5 = Counter(data_mes, data_5, 5, logger_5, sender, pusher_5)
-flfp_6 = Counter(data_mes, data_6, 6, logger_6, sender, pusher_6)
+pusher_5 = Trace_pusher(flfp_5.batch_billet.output_billet_queues, data_5, logger_5, sender, position_5, True)
+pusher_6 = Trace_pusher(flfp_6.batch_billet.output_billet_queues, data_6, logger_6, sender, position_6, hostmove_flag=True)
 
 
 ##############################################################
@@ -126,7 +170,8 @@ crane = Crane(data_5, pusher_5, pusher_6, parking, stack_manager, sender, logger
 # 数据警报服务
 
 checker = Checker(data_5, data_6, logger_sender)
-checker.async_start_check()
+if not _debug:
+    checker.async_start_check()
 
 
 ##############################################################
@@ -143,4 +188,17 @@ forward_5 = Forward(data_5, sender, True, '5')
 forward_5.start_auto_forward()
 
 forward_6 = Forward(data_6, sender, False, '6')
-forward_6.start_auto_forward()
+forward_6.start_auto_forward()
+
+
+##############################################################
+# PLC写入服务
+
+if not _debug:
+    s7_5w = S7Client()
+    s7_5w.connect('192.168.1.215', 0, 0)
+
+    s7_writer = S7Writer(s7_5w)
+    s7_writer.add_task("dint", 420, 34, flfp_5.batch_heat.current_heat_point)
+
+    s7_writer.loop_start()

+ 463 - 0
models/batcher.py

@@ -0,0 +1,463 @@
+from utils.statepoint import *
+from utils.mqttdata import *
+from utils.s7data import *
+from models.data_sender import *
+from models.mysql_data import MysqlData, MysqlDataSizing
+import logging, datetime, queue, requests
+
+def _get_heat_init_strand(heat_no, retry = 3):
+    url = f"http://192.168.0.119:7005/actualControl/billetActual/heatNoStats?heatNo={heat_no}"
+    while retry:
+        try:
+            response = requests.get(url)
+            result = response.json()
+            if result["success"]:
+                return result["result"]["strandTotal"]
+        except Exception as e:
+            warnings.warn(str(e))
+        finally:
+            retry -= 1
+    return [0] * 8
+
+class Heat:
+    def __init__(self, ccm_no, heat_no, start_pour_time: datetime.datetime = None, init_strands_status: list[int] = [0] * 8):
+        self.ccm_no = str(ccm_no)
+        self.heat_no = str(heat_no)
+        self.start_pour_time = start_pour_time
+
+        self.using_event = threading.Event()
+
+        self.strands_req = init_strands_status
+        self.total = sum(init_strands_status)
+        self.billet_union_count = {}
+        self.resource_lock = threading.Lock()
+
+    @classmethod
+    def from_dict(cls, src_dict: dict):
+        return cls(
+            src_dict["ccmNo"],
+            src_dict["heatNo"],
+            src_dict["sendTime"] if "sendTime" in src_dict and src_dict["sendTime"] else None
+        )
+    
+    def to_dict(self):
+        return {
+            'ccmNo': self.ccm_no,
+            'heatNo': self.heat_no,
+            'sendTime': self.start_pour_time.strftime("%Y-%m-%d %H:%M:%S")
+        }
+    
+    def begin_use(self):
+        self.using_event.set()
+
+    def is_poured(self) -> bool:
+        if self.start_pour_time:
+            return True
+        return False
+    
+    def is_using(self) -> bool:
+        return self.using_event.is_set()
+    
+    def init_strands_status(self, init_status: list[int]):
+        if self.is_using():
+            raise ValueError("无法对已开始出坯的炉进行流初始化")
+        with self.resource_lock:
+            self.strands_req = init_status
+            self.total = sum(init_status)
+    
+    def claim_strand_seq(self, strand_no) -> tuple[int]:
+        if not self.is_using():
+            return 0
+        index = strand_no - 1
+        with self.resource_lock:
+            self.total += 1
+            self.strands_req[index] += 1
+            return self.strands_req[index], self.total
+        
+    def claim_union_seq(self, length: int) -> int:
+        if not self.is_using():
+            return 0
+        with self.resource_lock:
+            if length not in self.billet_union_count:
+                self.billet_union_count[length] = 0
+            self.billet_union_count[length] += 1
+            return self.billet_union_count[length]
+
+class Billet:
+    def __init__(self, cutting_time: datetime.datetime = None):
+        if cutting_time:
+            self.start_cutting_time: datetime.datetime = cutting_time
+        else:
+            self.start_cutting_time: datetime.datetime = datetime.datetime.now()
+
+        self.heat: Heat = None
+        self.stop_cutting_time: datetime.datetime = None
+        self.strand_no: int = None
+        self._strand_seq: int = None
+        self._heat_seq: int = None
+        self.sizing = None
+        self.drawing_speed = None
+
+    @property
+    def billet_no(self):
+        if self.heat and self.strand_no and self.strand_seq:
+            return self.heat_no + self.ccm_no + str(self.strand_no) + f"{self.strand_seq:02d}"
+
+    @property
+    def ccm_no(self):
+        if self.heat:
+            return self.heat.ccm_no
+
+    @property
+    def heat_no(self):
+        if self.heat:
+            return self.heat.heat_no
+
+    @property
+    def strand_seq(self):
+        return self._strand_seq
+    
+    @property
+    def heat_seq(self):
+        return self._heat_seq
+    
+    def batch_strand(self, strand_no = None) -> bool:
+        if strand_no:
+            self.strand_no = strand_no
+        if not (self.strand_no and self.heat):
+            return False
+        res = self.heat.claim_strand_seq(self.strand_no)
+        if res:
+            self._strand_seq, self._heat_seq = res
+            return True
+        
+        return False
+
+class BatchHeat:
+    def __init__(self, ccm_no, manual_change_heat_sig: Statepoint,
+                 mysql_heat_point: Statepoint, mqtt_heat_point: Statepoint,
+                 ladle_weight_point_1: Statepoint, ladle_weight_point_2: Statepoint,
+                 sender: Sender, logger: logging.Logger, queue_max = 5):
+        self.ccm_no = ccm_no
+        self.manual_change_heat_sig = manual_change_heat_sig
+        self.mysql_heat_point = mysql_heat_point
+        self.mqtt_heat_point = mqtt_heat_point
+        self.ladle_weight_point_1 = ladle_weight_point_1
+        self.ladle_weight_point_2 = ladle_weight_point_2
+        self.sender = sender
+        self.logger = logger
+
+        self.last_enqueue = ()
+        self.heats_queue = queue.PriorityQueue(queue_max)
+        self.heat_info_shared = {}
+        self.heat_info_lock = threading.Lock()
+
+        self.current_heat: Heat = None
+        self._thread: threading.Thread = None
+        self._thread_event = threading.Event()
+        self.change_heat_lock = threading.Lock()
+
+        self.manual_change_heat_sig.set_excite_action(self.manual_change_heat_action)
+        self.mysql_heat_point.set_excite_action(self.heat_arrive_in_action)
+        self.mqtt_heat_point.set_excite_action(self.heat_start_pour_action)
+
+        self.current_heat_point = Statepoint(0)
+
+    def heat_arrive_in_action(self):
+        # 维护内部状态
+        self.mysql_heat_point.set_state(False)
+
+        # 获取炉次到站信息
+        heat_no, into_time = self.mysql_heat_point.data
+        time_for_check = into_time + datetime.timedelta(minutes=12)
+        self.heat_info_shared[heat_no] = {}
+
+        self.logger.debug(f"[Counter]检测到炉次进站{heat_no}")
+
+        # 大包重量选择算法
+        ladle_weight = max(self.ladle_weight_point_1.data, self.ladle_weight_point_2.data)
+
+        # 等待诺德开浇信号判断
+        while self.heat_info_shared[heat_no] == {}:
+            # 如果超时,根据入站时间预测开浇时间
+            if datetime.datetime.now() > time_for_check:
+                with self.heat_info_lock:
+                    if "heatNo" not in self.heat_info_shared[heat_no]:
+                        self.heat_info_shared[heat_no]["ccmNo"] = self.ccm_no
+                        self.heat_info_shared[heat_no]["heatNo"] = heat_no
+                        self.heat_info_shared[heat_no]["sendTime"] = into_time + datetime.timedelta(minutes=5)
+                break
+            time.sleep(2)
+
+        # 标志是否为铸机开机的第一次开浇
+        first_heat_flag = self.current_heat == None and self.heats_queue.empty()
+
+        # 写入日志
+        if first_heat_flag:
+            self.logger.info(f'[Counter]首次开浇:{heat_no}')
+        else:
+            self.logger.info(f'[Counter]炉次开浇:{heat_no}')
+
+        # 维护浇铸队列
+        try:
+            self.last_enqueue = (self.heat_info_shared[heat_no]["sendTime"], Heat.from_dict(self.heat_info_shared[heat_no]))
+            self.heats_queue.put(self.last_enqueue, True, 3)
+        except TimeoutError as e:
+            self.logger.error(f"[Counter]炉次队列已满({self.heats_queue.maxsize}),存在异常:{e}")
+            return
+
+        # 使用sender向外发送信号
+        self.sender.begin_pour(self.heat_info_shared[heat_no], ladle_weight)
+        self.heat_info_shared.pop(heat_no)
+
+    def heat_start_pour_action(self):
+        heat_info = self.mqtt_heat_point.data
+        heat_no = str(heat_info['heatNo'])
+        if "sendTime" in heat_info:
+            heat_info["sendTime"] = datetime.datetime.strptime(heat_info["sendTime"], "%Y-%m-%d %H:%M:%S")
+
+        if heat_no in self.heat_info_shared:
+            with self.heat_info_lock:
+                self.heat_info_shared[heat_no] = heat_info
+            self.logger.debug(f'[Counter]收到诺德开浇信号:{heat_no}')
+        elif self.last_enqueue[-1].heat_no == heat_no:
+            self.logger.warning(f"[Counter]诺德开浇信号晚到{heat_no}")
+        else:
+            self.logger.info(f'[Counter]开浇信号表示倒浇操作:{heat_no}')
+            # 维护浇铸队列
+            try:
+                self.last_enqueue = (heat_info["sendTime"], Heat.from_dict(heat_info))
+                self.heats_queue.put(self.last_enqueue, True, 3)
+            except TimeoutError as e:
+                self.logger.error(f"[Counter]炉次队列已满({self.heats_queue.maxsize}),无法插入{heat_no},异常:{e}")
+                return
+            
+    def manual_change_heat_action(self):
+        with self.change_heat_lock:
+            heat_info = self.manual_change_heat_sig.data
+            if self.current_heat and heat_info['heatNo'] == self.current_heat.heat_no:
+                return
+            
+            if "startPourTime" in heat_info:
+                heat_info["sendTime"] = datetime.datetime.strptime(heat_info["startPourTime"], "%Y-%m-%d %H:%M:%S")
+
+            self.logger.info(f"手动换炉触发:{self.current_heat.heat_no if self.current_heat else "未知"}->{heat_info['heatNo']}")
+
+            self.sender.begin_pour(heat_info, max(self.ladle_weight_point_1.data, self.ladle_weight_point_2.data))
+
+            self.current_heat = Heat.from_dict(heat_info)
+            self.current_heat.init_strands_status(_get_heat_init_strand(self.current_heat.heat_no))
+            self.current_heat.begin_use()
+            self.current_heat_point.inject(int(self.current_heat.heat_no))
+
+    def loop_forever(self):
+        try:
+            while self._thread_event.is_set():
+                if not self.heats_queue.empty() and self.heats_queue.queue[0][0] + datetime.timedelta(minutes=10) <= datetime.datetime.now():
+                    with self.change_heat_lock:
+                        _, heat = self.heats_queue.get()
+                        if self.current_heat:
+                            self.logger.debug(f"[Counter]炉次划分{self.current_heat.heat_no}->{heat.heat_no}")
+                        else:
+                            self.logger.debug(f"[Counter]连铸机出坯启动,炉次为{heat.heat_no}")
+                        self.current_heat = heat
+                        self.current_heat.init_strands_status(_get_heat_init_strand(self.current_heat.heat_no))
+                        self.current_heat.begin_use()
+                        self.current_heat_point.inject(int(self.current_heat.heat_no))
+        except Exception as e:
+            self._thread_event.clear()
+            self.logger.error(f"换炉线程意外退出:{e}")
+    
+    def loop_start(self):
+        if self._thread and self._thread.is_alive():
+            if self._thread_event.is_set():
+                return
+            raise ChildProcessError("换炉线程异常持续运行")
+        self._thread = threading.Thread(target=self.loop_forever)
+        self._thread_event.set()
+        self._thread.start()
+
+    def loop_stop(self):
+        self._thread_event.clear()
+        if self._thread and self._thread.is_alive():
+            self._thread.join(3)
+
+    def get_current_heat_info(self) -> dict:
+        # 当换炉进行中时,获取炉次信息阻塞等待
+        with self.change_heat_lock:
+            return self.current_heat
+        
+    def __del__(self):
+        self.loop_stop()
+        
+class BatchBillet:
+    def __init__(self, heat: BatchHeat,
+                 cutting_sig_point_list: list[Statepoint], short_sizing_point_list: list[Statepoint],
+                 sizing_point_list: list[Statepoint], drawing_speed_point_list: list[Statepoint],
+                 logger: logging.Logger):
+        self.heat = heat
+        self.cutting_sig_point_list = cutting_sig_point_list
+        self.short_sizing_point_list = short_sizing_point_list
+        self.sizing_point_list = sizing_point_list
+        self.drawing_speed_point_list = drawing_speed_point_list
+        self.logger = logger
+
+        self.output_billet_queues = [queue.Queue(2) for _ in range(8)]
+        self.last_enqueue: list[Billet] = [None for _ in range(8)]
+
+        self._backfill_on_off = threading.Event()
+        self.dspeed_integrate_point_list = None
+
+        for i in range(8):
+            self.cutting_sig_point_list[i].set_excite_action(lambda index=i: self.cutting_action(index))
+
+    def cutting_action(self, index: int):
+        # 先获取实时信息
+        sizing = self.sizing_point_list[index].data
+        drawing_speed = self.drawing_speed_point_list[index].data
+
+        # 维护拉速回填机制,顺便利用拉速判断短尺
+        if self._backfill_on_off.is_set():
+            predict_length = self.dspeed_integrate_point_list[index].data
+            self.dspeed_integrate_point_list[index].data = 0
+            predict_length *= 1000
+            if predict_length < 3000:
+                predict_length += sizing
+            self.logger.debug(f"[Counter]{index+1}流钢坯预测长度为{predict_length}")
+            if abs(self.short_sizing_point_list[index].data - predict_length) < abs(sizing - predict_length):
+                sizing = self.short_sizing_point_list[index].data
+
+        self.logger.info(f"[Counter]{index+1}流钢坯开始切割")
+
+        billet = Billet()
+        billet.sizing = sizing
+        billet.drawing_speed = drawing_speed
+        billet.strand_no = index + 1
+        billet.heat = self.heat.get_current_heat_info()
+        if billet.heat:
+            self.trans_billet(billet)
+        else:
+            self.logger.warning(f"[Counter]{index+1}流钢坯未获取到炉次信息")
+
+    def trans_billet(self, billet: Billet):
+        try:
+            self.output_billet_queues[billet.strand_no-1].put(billet, True, 10)
+            self.last_enqueue[billet.strand_no-1] = billet
+        except Exception as e:
+            self.logger.error(f"{billet.strand_no}流队列加入钢坯失败:{e}")
+
+    def set_dspeed_backfill(self, dspeed_integrate_point_list: list[Statepoint]):
+        if self.dspeed_integrate_point_list:
+            raise ValueError("拉速补偿数据点不可重复设置")
+        self.dspeed_integrate_point_list = dspeed_integrate_point_list.copy()
+        for i in range(8):
+            self.dspeed_integrate_point_list[i].set_convertor(lambda data, index=i: self.dspeed_convertor(data, index))
+            self.dspeed_integrate_point_list[i].data = 0
+            self.dspeed_integrate_point_list[i].set_excite_action(lambda index=i: self.dspeed_backfill_action(index))
+
+    def open_dspeed_backfill(self):
+        if not self.drawing_speed_point_list:
+            raise ValueError("未设置拉速补偿数据点")
+        if self._backfill_on_off.is_set():
+            return
+        self._backfill_on_off.set()
+        for i in self.drawing_speed_point_list:
+            i.data = 0
+            i.set_state(0)
+            i.allow_update()
+
+    def close_dspeed_backfill(self):
+        self._backfill_on_off.clear()
+        if self.drawing_speed_point_list:
+            for i in self.drawing_speed_point_list:
+                i.allow_update(0)
+
+    def dspeed_convertor(self, data, index):
+        return data * 1000 >= self.sizing_point_list[index].data
+
+    def dspeed_backfill_action(self, index: int):
+        # 维护内部状态
+        self.dspeed_integrate_point_list[index].data = 0
+        self.dspeed_integrate_point_list[index].set_state(False)
+        
+        in_time = datetime.datetime.now()
+        while datetime.datetime.now() - in_time < datetime.timedelta(seconds=20):
+            if self.last_enqueue[index] and datetime.datetime.now() - self.last_enqueue[index].start_cutting_time < datetime.timedelta(seconds=90):
+                return
+            time.sleep(0.5)
+        
+        self.logger.info(f"[Counter]{index+1}流拉速判据回填钢坯")
+        billet = Billet(in_time)
+        billet.sizing = self.sizing_point_list[index].data
+        billet.drawing_speed = self.drawing_speed_point_list[index].data
+        billet.strand_no = index + 1
+        # 炉次信息可能因换炉阻塞,上面先获取实时信息
+        billet.heat = self.heat.get_current_heat_info()
+        if billet.heat:
+            self.trans_billet(billet)
+        else:
+            self.logger.warning(f"[Counter]{index+1}流钢坯未获取到炉次信息")
+
+
+class Batcher:
+    @property
+    def data_mqtt(self):
+        if self._data_mqtt.cli == None:
+            raise ValueError('The MQTT connection to MES has not been initialized.')
+        return self._data_mqtt
+    
+    @property
+    def data_s7(self):
+        if self._data_s7 == None:
+            raise ValueError('The S7 connection to the casting machine PLC has not been initialized.')
+        return self._data_s7
+    
+    @property
+    def sender(self):
+        if self._sender == None:
+            raise ValueError('The sender has not been set.')
+        return self._sender
+
+    def __init__(self, data_mqtt: Mqttdata, data_s7: S7data, data_web: Mqttdata, ccmNo, logger: logging.Logger, sender: Sender, data_nuo: MysqlData, data_qbc: MysqlDataSizing):
+        self._data_mqtt = data_mqtt
+        self._data_s7 = data_s7
+        self.data_web = data_web
+        self._sender = sender
+        self.logger = logger
+        self.ccm_no = ccmNo
+        self.data_nuo = data_nuo
+        self.data_qbc = data_qbc
+
+        self.logger.info(f"[Counter]分炉分坯模块:{ccmNo}号机模块启动")
+
+        self.ladle_weight_1 = self.data_s7.make_point('大包重量1')
+        self.ladle_weight_2 = self.data_s7.make_point('大包重量2')
+
+        self.begin_pour_sig = self.data_mqtt.make_point(f'{ccmNo}#开浇信号')
+        self.end_pour_sig = self.data_mqtt.make_point(f'{ccmNo}#停浇信号')
+        self.nuo_heat_point = self.data_nuo.make_point(f"{ccmNo}#到站信息")
+        self.manual_change_heat_sig = data_web.make_point(f"{ccmNo}#手动换炉")
+
+        self.begin_cutting_sig = [self.data_s7.make_point(f'L{i+1}切割信号[0]') for i in range(8)]
+        self.end_cutting_sig = [self.data_s7.make_point(f'L{i+1}切割信号[1]') for i in range(8)]
+        if ccmNo == '5':
+            self.length_cutting = [self.data_qbc.make_point(f'{i+1}流定尺') for i in range(8)]
+        elif ccmNo == '6':
+            self.length_cutting = [self.data_s7.make_point(f'L{i+1}定尺') for i in range(8)]
+        self.short_length_cutting = [self.data_s7.make_point(f'L{i+1}短尺') for i in range(8)]
+        self.drawing_speed = [self.data_s7.make_point(f'L{i+1}拉速') for i in range(8)]
+
+        # 拉速补偿点位
+        self.dspeed_backfill_point = [self.data_s7.make_point(f"L{i+1}拉速", Integration_speed_mpmin) for i in range(8)]
+        
+        # 主逻辑对象
+        self.batch_heat = BatchHeat(self.ccm_no, self.manual_change_heat_sig,
+                                    self.nuo_heat_point, self.begin_pour_sig,
+                                    self.ladle_weight_1, self.ladle_weight_2,
+                                    self.sender, self.logger)
+        self.batch_heat.loop_start()
+        
+        self.batch_billet = BatchBillet(self.batch_heat, self.begin_cutting_sig, self.short_length_cutting,
+                                        self.length_cutting, self.drawing_speed, self.logger)
+        self.batch_billet.set_dspeed_backfill(self.dspeed_backfill_point)
+        self.batch_billet.open_dspeed_backfill()

+ 0 - 301
models/billet_counter.py

@@ -1,301 +0,0 @@
-from utils.statepoint import *
-from utils.mqttdata import *
-from utils.s7data import *
-from models.data_sender import *
-from models.billet_trace_pusher import *
-import logging
-
-_Debug = 0
-
-class Counter:
-    @property
-    def data_mqtt(self):
-        if self._data_mqtt.cli == None:
-            raise ValueError('The MQTT connection to MES has not been initialized.')
-        return self._data_mqtt
-    
-    @property
-    def data_s7(self):
-        if self._data_s7 == None:
-            raise ValueError('The S7 connection to the casting machine PLC has not been initialized.')
-        return self._data_s7
-    
-    @property
-    def sender(self):
-        if self._sender == None:
-            raise ValueError('The sender has not been set.')
-        return self._sender
-        
-    # 数据点定义
-    def create_data_point(self, ccmNo):
-        self.ladle_weight_1 = self.data_s7.make_point('大包重量1')
-        self.ladle_weight_2 = self.data_s7.make_point('大包重量2')
-
-        self.begin_pour = self.data_mqtt.make_point(f'{ccmNo}#开浇信号')
-        self.end_pour = self.data_mqtt.make_point(f'{ccmNo}#停浇信号')
-
-        if _Debug:
-            self.begin_pour_ring = self.data_s7.make_point('浇铸信号[2]')
-
-        self.begin_cutting = []
-        for i in range(8):
-            self.begin_cutting.append(self.data_s7.make_point(f'L{i+1}切割信号[0]'))
-
-        self.end_cutting = []
-        for i in range(8):
-            self.end_cutting.append(self.data_s7.make_point(f'L{i+1}切割信号[1]'))
-
-        self.length_cutting = []
-        for i in range(8):
-            self.length_cutting.append(self.data_s7.make_point(f'L{i+1}定尺'))
-
-        self.drawing_speed = []
-        for i in range(8):
-            self.drawing_speed.append(self.data_s7.make_point(f'L{i+1}拉速'))
-
-
-    def init_data_point(self):
-        # 统一初始化数据点
-        self.begin_pour.allow_update(False)
-        self.begin_pour.set_state(False)
-        self.end_pour.allow_update(False)
-        self.end_pour.set_state(False)
-        for i in range(8):
-            self.begin_cutting[i].allow_update(False)
-            self.begin_cutting[i].set_state(False)
-            self.begin_cutting[i].data = 0
-            self.end_cutting[i].allow_update(False)
-            self.end_cutting[i].set_state(False)
-            self.end_cutting[i].data = 0
-
-        # 数据点逻辑设置
-        self.begin_pour.set_excite_action(self.begin_pour_action)
-        self.end_pour.set_excite_action(self.end_pour_action)
-
-        for i in range(8):
-            self.begin_cutting[i].set_excite_action(lambda i=i: self.begin_cutting_action(i))
-            self.begin_cutting[i].set_keep_time(3000)
-            self.end_cutting[i].set_excite_action(lambda i=i: self.end_cutting_action(i))
-            self.end_cutting[i].set_reset_action(lambda i=i: self.end_cutting[i].allow_update(False))
-
-        # 统一开启数据点
-        self.begin_pour.allow_update()
-        self.end_pour.allow_update()
-        for i in range(8):
-            self.begin_cutting[i].allow_update()
-
-
-    def __init__(self, data_mqtt: Mqttdata, data_s7: S7data, ccmNo, logger: logging.Logger, sender: Sender, pusher_trace: Trace_pusher):
-        # 模块入口、出口、日志定义
-        self._data_mqtt = data_mqtt
-        self._data_s7 = data_s7
-        self._sender = sender
-        self.logger = logger
-        self.send_dst = pusher_trace
-
-        self.logger.info(f"[Counter]分炉分坯模块:{ccmNo}号机模块启动")
-
-        # 配置必须的数据点
-        self.create_data_point(ccmNo)
-        self.init_data_point()
-
-        #分炉分坯功能
-        self.last_cutting_timestamp = 0
-
-        self.strand = [0, 0, 0, 0, 0, 0, 0, 0]
-        self.cutting_state_heat = [{}, {}, {}, {}, {}, {}, {}, {}]
-        self.cutting_state_heat_index = [0, 0, 0, 0, 0, 0, 0, 0]
-        self.total = 0
-        self.limit_count = 0
-        self.limit_target = 24
-        self.limit_flag = False
-        self.lock = threading.Lock()
-        self.old_heat = {}
-        self.new_heat = {}
-        self.last_cutting_strand = 0
-
-        self.send_list = [[], [], [], [], [], [], [], []]
-
-
-    def begin_pour_action(self):
-        # 标志是否为铸机开机的第一次开浇
-        flag = (time.time() - self.last_cutting_timestamp) > 1800
-
-        # 大包重量选择算法
-        ladle_weight = max(self.ladle_weight_1.data, self.ladle_weight_2.data)
-
-        # 写入日志
-        if flag:
-            self.logger.info(f'[Counter]首次开浇:{self.begin_pour.data['heatNo']}')
-        elif self.new_heat == {}:
-            self.logger.info(f'[Counter]炉次开浇:{self.begin_pour.data['heatNo']},当前出坯炉次:{"未知" if self.old_heat == {} else self.old_heat["heatNo"]}')
-        else:
-            self.logger.warning(f'[Counter]炉次开浇:{self.begin_pour.data['heatNo']},炉次{self.new_heat['heatNo']}被覆盖')
-
-        # 维护换炉操作
-        if flag:
-            self.old_heat = self.begin_pour.data
-            self.new_heat = {}
-        else:
-            self.new_heat = self.begin_pour.data
-            self.start_limit()
-
-        # 使用sender向外发送信号
-        self.sender.begin_pour(self.begin_pour.data, ladle_weight)
-
-
-    def end_pour_action(self):
-        # 写入日志
-        self.logger.info(f'[Counter]炉次停浇:{self.end_pour.data["heatNo"]}')
-
-        # 使用sender向外发送信号
-        if self.old_heat:
-            self.sender.end_pour(self.end_pour.data)
-
-
-    def cutting_data_comple(self, i):
-        count = 3
-
-        # 补充计入模块
-        while count:
-            time.sleep(0.5)
-            sizing = self.length_cutting[i].data
-            speed = self.drawing_speed[i].data
-            if 0 < sizing < 30000 and 0 < speed < 10:
-                self.strand_add(i+1, sizing, speed)
-                return None
-            count -= 1
-        
-        time.sleep(0.5)
-        sizing = self.length_cutting[i].data
-        speed = self.drawing_speed[i].data
-        if not (0 < sizing < 30000 and 0 < speed < 10):
-            self.logger.warning(f"[Counter]请注意,定尺{sizing}/拉速{speed}数据持续异常,已按照异常数据发送")
-
-        self.strand_add(i+1, sizing, speed)
-        self.end_cutting[i].allow_update()
-
-    def begin_cutting_action(self, i):
-        # 写入日志
-        # self.logger.info(f'[Counter]{i+1}流:开始切割')
-
-        # 计入模块
-        sizing = self.length_cutting[i].data
-        speed = self.drawing_speed[i].data
-        if 0 < sizing < 30000 and 0 < speed < 10:
-            self.strand_add(i+1, sizing, speed)
-            self.end_cutting[i].allow_update()
-        else:
-            threading.Thread(target=self.cutting_data_comple, args=(i,)).start()
-
-
-
-    def end_cutting_action(self, i):
-        # 写入日志
-        # self.logger.info(f'[Counter]{i+1}流:完成切割')
-
-        # 复位流状态
-        cutting_state_heat = self.cutting_state_heat[i]
-        cutting_state_heat_index = self.cutting_state_heat_index[i]
-        self.cutting_state_heat[i] = {}
-        self.cutting_state_heat_index[i] = 0
-
-        if cutting_state_heat:
-            # 使用sender向外发送信号
-            # self.sender.end_cut(cutting_state_heat, cutting_state_heat_index)
-
-            tmp = self.send_list[i]
-            self.send_list[i] = []
-            tmp[5] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
-            self.send_dst.data_from_casting(i, tmp, True)
-
-            # 检查自己是否是本炉最后一根钢坯
-            if self.last_cutting_strand == i+1:
-                self.last_cutting_strand = 0
-                # self.sender.heat_last(cutting_state_heat)
-
-
-    def strand_add(self, sno, sizing, speed):
-        with self.lock:
-            # 维护辅助时间戳
-            self.last_cutting_timestamp = time.time()
-
-            #维护内部正确性的主要算法
-            if self.limit_flag:
-                self.limit_count += 1
-                if self.limit_count == self.limit_target:
-                    # 此处已经在切割本炉最后一根
-                    if self.old_heat:
-                        self.last_cutting_strand = sno
-                if self.limit_count > self.limit_target:
-                    # 此处已经在切割新炉第一根
-                    self.change_heat()
-                    self.total = 0
-                    self.strand = [0, 0, 0, 0, 0, 0, 0, 0]
-                    self.limit_count = 0
-                    self.limit_flag = False
-            self.strand[sno-1] += 1
-            self.total += 1
-
-            heatData = self.old_heat
-            heatIndex = self.total
-            strandIndex = self.strand[sno-1]
-        
-        if heatData:
-            # 记录当前流状态,帮助停切信号判断钢坯信息
-            self.cutting_state_heat[sno-1] = heatData
-            self.cutting_state_heat_index[sno-1] = heatIndex
-            # 生成坯号,使用sender向外发送信号
-            ccmNo = heatData['ccmNo']
-            billetNo = heatData['heatNo'] + ccmNo + str(sno) + '{:0>2}'.format(strandIndex)
-            # self.logger.info(f"[Counter]{sno}流:{heatData['heatNo']}炉第{heatIndex}根计入系统,坯号:{billetNo}")
-
-            # [坯号, 炉次信息, 定尺, 拉速, 开浇时间, 停浇时间]
-            self.send_list[sno-1] = [billetNo, heatData, sizing, speed, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), '']
-            self.send_dst.data_from_casting(sno-1, self.send_list[sno-1])
-            # self.sender.begin_cut(heatData, billetNo, heatIndex, sizing, speed)
-            # 使用sender发送炉次首次开切信号
-            if heatIndex == 1:
-                pass
-                # self.sender.heat_first(heatData)
-        else:
-            self.send_dst.data_from_casting(sno-1, [f'000000000{sno}00', {}, sizing, speed, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), ''])
-            # self.logger.info(f"[Counter]{sno}流:未知炉第{heatIndex}根,本炉无法计入系统,下一炉开始正常")
-
-
-    def start_limit(self):
-        # 根据实际情况设置本炉总支数
-        with self.lock:
-            if self.old_heat:
-                if self.total % 4 == 1:
-                    self.limit_target = 23
-                elif self.total % 4 == 2:
-                    if self.total >= 28:
-                        self.limit_target = 22
-                    else:
-                        self.limit_target = 26
-                elif self.total % 4 == 3:
-                    self.limit_target = 25
-                else:
-                    self.limit_target = 24
-            else:
-                self.limit_target = 24
-                
-            self.limit_flag = True
-
-
-    def change_heat(self):
-        # 异常情况
-        if not self.new_heat:
-            self.logger.error('[Counter]换炉:失败,无新炉次信息')
-            return None
-        
-        # 写入日志
-        if self.old_heat:
-            self.logger.info(f'[Counter]换炉:{self.old_heat["heatNo"]}->{self.new_heat["heatNo"]}')
-        else:
-            self.logger.info(f'[Counter]换炉:未知炉次->{self.new_heat["heatNo"]}')
-
-        #真正换炉过程
-        self.old_heat = self.new_heat
-        self.new_heat = {}

+ 162 - 406
models/billet_trace_pusher.py

@@ -2,427 +2,203 @@ import logging
 from utils.statepoint import *
 from utils.s7data import *
 from models.data_sender import *
+from models.batcher import *
+import queue
 
 class Trace_pusher:
-    def __init__(self, data_s7: S7data, logger: logging.Logger, sender: Sender, strand_position: list, manual_change_heat_sig: Statepoint, hostsend_flag=False, hostmove_flag=False):
+    def __init__(self, billet_input: list[queue.Queue[Billet]], data_s7: S7data, logger: logging.Logger, sender: Sender, strand_position: list, hostsend_flag=False, hostmove_flag=False):
+        self.billet_input = billet_input
         self.data_s7 = data_s7
         self.logger = logger
         self.sender = sender
-        self.manual_change_heat_sig = manual_change_heat_sig
-        self.strands_cutting = [[], [], [], [], [], [], [], []]
-        self.strands_buffer = [[], [], [], [], [], [], [], []]
-        self.locks = [threading.Lock() for i in range(8)]
         self.strand_position = strand_position
         self.hostsend_flag = hostsend_flag
         self.hostmove_flag = hostmove_flag
-        
-        self.heat_filter_index = 1
-        self.heat_filter = ["00000000"] + ['' for i in range(9)]
-        self.old_heatNo = "00000000"
-        self.current_heatNo = "00000000"
-        self.old_heatData = {}
-        self.current_heatData = {}
-        self.total = 0
-        self.strand = [0, 0, 0, 0, 0, 0, 0, 0]
-        self.count_lock = threading.Lock()
-
-        self.sizing_count_heatNo = "00000000"
-        self.sizing_count = {}
-
-        self.pusher_left_list = []
-        self.pusher_right_list = []
 
         self.bed_left = [[], [], []]
         self.bed_right = [[], [], []]
+        self.cool_bed_lock = threading.Lock()
+
+        # 挡板事件 出坯到推钢区域
+        self.barrier_up_sig = [self.data_s7.make_point(f'L{i+1}挡板') for i in range(8)]
+        self.barrier_checker = [False, False, False, False, False, False, False, False]
+
+        for i in range(8):
+            self.barrier_up_sig[i].set_convertor(lambda data: not bool(data))
+            self.barrier_up_sig[i].set_excite_action(lambda i=i: self.barrier_up_action(i))
+            self.barrier_up_sig[i].set_reset_action(lambda i=i: self.barrier_down_action(i))
 
+        # 推钢事件 推钢到南北小冷床
         self.pusher_left = data_s7.make_point('推钢机激光')
         self.pusher_right = data_s7.make_point('推钢机激光')
         self.pusher_left.hmd_add(0)
         self.pusher_right.hmd_add(0)
-
-        self.manual_change_heat_sig.set_excite_action(self.manual_change_heat)
+        self.pusher_left_list: list[Billet] = []
+        self.pusher_right_list: list[Billet] = []
 
         self.pusher_left.set_convertor(lambda data: data < min(self.strand_position))
         self.pusher_right.set_convertor(lambda data: data > max(self.strand_position))
         self.pusher_left.set_excite_action(lambda: self.arrive_cooling_bed('left'))
         self.pusher_right.set_excite_action(lambda: self.arrive_cooling_bed('right'))
 
-        self.billet_out = [[], [], [], [], [], [], [], []]
-
-        self.length_cutting = []
-        for i in range(8):
-            self.length_cutting.append(self.data_s7.make_point(f'L{i+1}定尺'))
-
-        self.drawing_speed = []
-        for i in range(8):
-            self.drawing_speed.append(self.data_s7.make_point(f'L{i+1}拉速'))
-
-        # 5号机 + 6号机
-        # 拉速积分得到长度
-        self.integration_total = [0] * 8
-        self.integration_lock = threading.Lock()
-        self.billet_out_sig = [self.data_s7.make_point(f"L{i+1}拉速", Integration_speed_mpmin) for i in range(8)]
-        self.speed_to_zero_sig = [self.data_s7.make_point(f"L{i+1}拉速") for i in range(8)]
-        self.billet_position = [
-            self.data_s7.make_point('L1切割信号[1]'),
-            self.data_s7.make_point('L2切割信号[1]'),
-            self.data_s7.make_point('L3切割信号[1]'),
-            self.data_s7.make_point('L4切割信号[1]'),
-            self.data_s7.make_point('L5切割信号[1]'),
-            self.data_s7.make_point('L6切割信号[1]'),
-            self.data_s7.make_point('L7切割信号[1]'),
-            self.data_s7.make_point('L8切割信号[1]')
-        ]
-
-        self.barrier = [
-            self.data_s7.make_point('L1挡板'),
-            self.data_s7.make_point('L2挡板'),
-            self.data_s7.make_point('L3挡板'),
-            self.data_s7.make_point('L4挡板'),
-            self.data_s7.make_point('L5挡板'),
-            self.data_s7.make_point('L6挡板'),
-            self.data_s7.make_point('L7挡板'),
-            self.data_s7.make_point('L8挡板')
-        ]
-
-        self.barrier_checker = [False, False, False, False, False, False, False, False]
-
+        # 棒一热送事件(仅五号机)
         if self.hostsend_flag:
-            self.hostsend_barrier = [
-                self.data_s7.make_point('热送挡板[0]'),
-                self.data_s7.make_point('热送挡板[1]'),
-                self.data_s7.make_point('热送挡板[2]'),
-                self.data_s7.make_point('热送挡板[3]'),
-                self.data_s7.make_point('热送挡板[4]'),
-                self.data_s7.make_point('热送挡板[5]'),
-                self.data_s7.make_point('热送挡板[6]'),
-                self.data_s7.make_point('热送挡板[7]')
-            ]
+            self.hostsend_barrier = [self.data_s7.make_point(f'热送挡板[{i}]') for i in range(8)]
 
+            for i in range(8):
+                self.hostsend_barrier[i].set_convertor(lambda data: not bool(data))
+                self.hostsend_barrier[i].set_excite_action(lambda i=i: self.hostsend_barrier_up_action(i))
+                self.hostsend_barrier[i].set_reset_action(lambda i=i: self.logger.debug(f"[TRACE]{i+1}流热送挡板关闭"))
+
+        # 热送副跨事件(仅六号机)
         if self.hostmove_flag:
             self.hostmove_sig = data_s7.make_point('热送辊道状态[1]')
             self.hostmove_sig.set_excite_action(lambda: self.billet_to_stack("步进冷床", self.get_billet('right')))
 
-        for i in range(8):
-            self.billet_position[i].allow_update(False)
-            self.billet_position[i].set_state(False)
-            self.billet_out_sig[i].allow_update(False)
-            self.billet_out_sig[i].set_state(False)
-            # 6号机 + 5号机
-            # 拉速补充钢坯
-            self.billet_out_sig[i].set_convertor(lambda data, i=i: round(data*1000) >= self.integration_total[i] + self.length_cutting[i].data)
-            self.billet_out_sig[i].set_excite_action(lambda i=i: self.billet_out_action(i))
-            # 拉速为0时表示此时拉速补偿已经不可用
-            self.speed_to_zero_sig[i].set_convertor(lambda data: data == 0)
-            self.speed_to_zero_sig[i].set_excite_action(lambda i=i: self.billet_out_sig[i].allow_update(False))
-            # 正常情况进入跟踪
-            self.billet_position[i].set_excite_action(lambda i=i: self.billet_in_buffer_action(i))
-            self.billet_position[i].allow_update()
-            # 挡板抬起
-            self.barrier[i].allow_update(False)
-            self.barrier[i].set_state(False)
-            self.barrier[i].set_convertor(lambda data: not bool(data))
-            self.barrier[i].set_excite_action(lambda i=i: self.barrier_up_action(i))
-            self.barrier[i].set_reset_action(lambda i=i: self.barrier_down_action(i))
-            self.barrier[i].allow_update()
-            # 5号机直轧挡板抬起
-            if self.hostsend_flag:
-                self.hostsend_barrier[i].allow_update(False)
-                self.hostsend_barrier[i].set_state(False)
-                self.hostsend_barrier[i].set_convertor(lambda data: not bool(data))
-                self.hostsend_barrier[i].set_excite_action(lambda i=i: self.hostsend_barrier_up_action(i))
-                self.hostsend_barrier[i].set_reset_action(lambda i=i: self.logger.debug(f"{i+1}流热送挡板关闭"))
-                self.hostsend_barrier[i].allow_update()
-
-    def integration_start(self, i):
-        if self.speed_to_zero_sig[i].data == 0:
-            return None
-        self.billet_out_sig[i].data = 0
-        self.billet_out_sig[i].state = False
-        self.integration_total[i] = 0
-        self.billet_out_sig[i].allow_update()
-
-    def billet_out_action(self, i):
-        billetNo = self.current_heatNo + '0' + str(i+1) + '99'
-        sizing = self.length_cutting[i].data
-        speed = self.drawing_speed[i].data
-        
-        if self.strands_cutting[i]:
-            return None
-        
-        # [坯号, 炉次信息, 定尺, 拉速, 开切时间, 停切时间]
-        self.billet_out[i] = [billetNo, self.current_heatData, sizing, speed, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), '']
-
-        # 等待20s,若无开切信号,即为漏坯
-        start_time = time.time()
-        while time.time() - start_time < 20:
-            if self.strands_cutting[i]:
-                return None
-            time.sleep(0.5)
-
-        # 漏钢时,积分数据不会被清零,需要累加
-        with self.integration_lock:
-            self.integration_total[i] += sizing
-        self.billet_in_buffer_action(i)
-
-    def billet_in_buffer_action(self, i):
-        with self.locks[i]:
-            if self.strands_cutting[i]:
-                time.sleep(15)
-                self.logger.info(f"[TRACE]{i+1}流新增钢坯存储")
-                self.strands_buffer[i] = self.strands_cutting[i]
-                self.strands_cutting[i] = []
-            elif self.billet_out[i]:
-                self.logger.info(f"[TRACE]{i+1}流无已经开切的钢坯,算法补入")
-                self.strands_buffer[i] = self.billet_out[i]
+    def barrier_up_action(self, i: int):
+        self.logger.debug(f"[TRACE]{i+1}流挡板开启")
+        while self.barrier_up_sig[i].converter(self.barrier_up_sig[i].data):
+            if self.billet_input[i].empty() or datetime.datetime.now() - self.billet_input[i].queue[0].start_cutting_time < datetime.timedelta(seconds=25):
+                time.sleep(0.5)
+                continue
+            billet = self.billet_input[i].get(False)
+            self.barrier_checker[i] = True
+            self.logger.debug(f"[TRACE]{i+1}流钢坯经过挡板")
+            time.sleep(10)
+            if self.strand_position[i] <= self.pusher_left.data:
+                self.logger.info(f"[TRACE]{i+1}流钢坯通过挡板进入推钢区域,在推钢机北侧")
+                self.pusher_left_list.append(billet)
             else:
-                self.logger.info(f"[TRACE]{i+1}流无已经开切的钢坯,但未检测到可补入的钢坯信息")
-            self.billet_out[i] = []
-
-        if self.barrier[i].state and self.barrier_checker[i] == False:
-            self.barrier_up_action(i)
-
-    def barrier_up_action(self, i):
-        time.sleep(1)
-        with self.locks[i]:
-            if self.strands_buffer[i]:
-                self.barrier_checker[i] = True
-                time.sleep(10)
-                billetData = self.strands_buffer[i]
-                self.strands_buffer[i] = []
-                if self.strand_position[i] <= self.pusher_left.data:
-                    self.logger.info(f"[TRACE]{i+1}流钢坯通过挡板进入推钢区域,在推钢机北侧")
-                    self.pusher_left_list.append(billetData)
-                else:
-                    self.logger.info(f"[TRACE]{i+1}流钢坯通过挡板进入推钢区域,在推钢机南侧")
-                    self.pusher_right_list.append(billetData)
-        
-        if self.hostsend_flag and self.hostsend_barrier[i].state:
-            self.hostsend_barrier_up_action(i)
-
+                self.logger.info(f"[TRACE]{i+1}流钢坯通过挡板进入推钢区域,在推钢机南侧")
+                self.pusher_right_list.append(billet)
+            if self.hostsend_flag and self.hostsend_barrier[i].state:
+                self.hostsend_barrier_up_action(i)
+            time.sleep(5)
 
-    def barrier_down_action(self, i):
+    def barrier_down_action(self, i: int):
         if self.barrier_checker[i]:
             self.logger.debug(f"[TRACE]{i+1}流挡板关闭")
         else:
-            self.logger.error(f"[TRACE]{i+1}流挡板关闭,期间无钢坯流出")
+            self.logger.error(f"[TRACE]{i+1}流挡板开启期间无钢坯流出")
         self.barrier_checker[i] = False
-            
-    def hostsend_barrier_up_action(self, i):
-        with self.count_lock:
-            gp_tmp = []
-            if self.strand_position[i] <= self.pusher_left.data:
-                index = -1
-                for j in range(len(self.pusher_left_list)-1, -1, -1):
-                    if self.pusher_left_list[j][0][-3] == str(i+1):
-                        index = j
-                        break
-                if index == -1:
-                    pass
-                    #self.logger.warning(f"[TRACE]推钢机北侧未找到{i+1}流的热送钢坯")
-                else:
-                    gp_tmp = self.pusher_left_list[index]
-                    self.pusher_left_list = self.pusher_left_list[:index] + self.pusher_left_list[index+1:]
-            else:
-                index = -1
-                for j in range(len(self.pusher_right_list)-1, -1, -1):
-                    if self.pusher_right_list[j][0][-3] == str(i+1):
-                        index = j
-                        break
-                if index == -1:
-                    pass
-                    #self.logger.warning(f"[TRACE]推钢机南侧未找到{i+1}流的热送钢坯")
-                else:
-                    gp_tmp = self.pusher_right_list[index]
-                    self.pusher_right_list = self.pusher_right_list[:index] + self.pusher_right_list[index+1:]
-
-            if gp_tmp:
-                # self.logger.info(f"[TRACE]{i+1}流钢坯热送")
-                if not gp_tmp[0].startswith('0') and gp_tmp[0][:8] not in self.heat_filter:
-                    self.change_heat(gp_tmp)
-                if not self.current_heatNo.startswith('0'):
-                    self.hostsend(gp_tmp)
-            
-    def change_heat(self, data):
-        # 换炉代码
-        self.old_heatNo = self.current_heatNo
-        self.old_heatData = self.current_heatData
-        self.current_heatNo = data[0][:8]
-        self.current_heatData = data[1]
-        self.heat_filter[self.heat_filter_index] = self.current_heatNo
-        self.heat_filter_index = (self.heat_filter_index + 1) % 10
-        # 上一炉终止信号在这里发
-        self.total = 0
-        self.strand = [0, 0, 0, 0, 0, 0, 0, 0]
-
-    def manual_change_heat(self):
-        with self.count_lock:
-            tmp = self.manual_change_heat_sig.data
-            if tmp['heatNo'] == self.current_heatNo:
-                return None
-            
-            self.logger.info(f"手动换炉触发:{self.current_heatNo}->{tmp['heatNo']}")
-
-            tmp['sendTime'] = tmp['startPourTime']
-            self.sender.begin_pour(tmp, max(self.data_s7.get_value('大包重量1'), self.data_s7.get_value('大包重量2')))
-
-            self.change_heat([tmp['heatNo'], tmp])
 
     def arrive_cooling_bed(self, direc):
-        with self.count_lock:
-            if direc == 'left':
-                self.logger.debug(f"北侧冷床上推入{len(self.pusher_left_list)}根钢坯")
-                tmp = self.pusher_left_list
-                self.pusher_left_list = []
-
-            elif direc == 'right':
-                self.logger.debug(f"南侧冷床上推入{len(self.pusher_right_list)}根钢坯")
-                tmp = self.pusher_right_list
-                self.pusher_right_list = []
-
-            for i in tmp:
-                if not i[0].startswith('0') and i[0][:8] not in self.heat_filter:
-                    self.change_heat(i)
-                    break
-
-            self.billet_to_bed(tmp, direc)
-
-    def billet_to_bed(self, billets, direc):
         if direc == 'left':
-            if len(billets):
-                self.billet_to_bed_impl(billets, self.bed_left)
-            self.logger.debug(f"北侧冷床目前情况:{len(self.bed_left[0])}根|{len(self.bed_left[1])}根|{len(self.bed_left[2])}根")
+            tmp = self.pusher_left_list
+            self.pusher_left_list = []
+            tmp.sort(key=lambda obj: obj.strand_no)
+            dst = self.bed_left
         elif direc == 'right':
-            if len(billets):
-                self.billet_to_bed_impl(billets, self.bed_right)
-            self.logger.debug(f"南侧冷床目前情况:{len(self.bed_right[2])}根|{len(self.bed_right[1])}根|{len(self.bed_right[0])}根")
-
-
-    def billet_to_bed_impl(self, billets: list, dst: list):
-        i = 0
-        count = 0
-        while i < len(dst) and dst[i]:
-            count += 1
-            i += 1
-
-        if count == 0:
-            dst[0].extend(billets)
-        elif count == 3 and (len(billets) >= 4 or len(dst[count-1]) >= 4 or len(dst[count-1]) + len(billets) > 4):
-            self.logger.error(f"组坯异常!")
-
-            # test 把无法组坯的不正常钢坯 直接上传服务器
-            tmp = dst[0]
-            if len(tmp) < 4:
-                self.billet_union(tmp)
-
-            dst.remove(dst[0])
-            dst.append(billets)
-            count -= 1
-        elif len(billets) >= 4 or len(dst[count-1]) >= 4 or len(dst[count-1]) + len(billets) > 4:
-            dst[count].extend(billets)
+            tmp = self.pusher_right_list
+            self.pusher_right_list = []
+            tmp.sort(key=lambda obj: obj.strand_no, reverse=True)
+            dst = self.bed_right
         else:
-            dst[count-1].extend(billets)
-            count -= 1
+            return
+
+        if tmp:
+            self.billet_to_bed(tmp, dst, True if direc == "right" else False)
+
+    def billet_to_bed(self, billets: list[Billet], dst: list[list[Billet]], union_reverse = False):
+        with self.cool_bed_lock:
+            len_count = len(billets)
+            # 找到第一个空位
+            count = 0
+            while count < len(dst) and dst[count]:
+                count += 1
+
+            if count == 0:
+                dst[0].extend(billets)
+            elif count == len(dst) and (len_count >= 4 or len(dst[count-1]) >= 4 or len(dst[count-1]) + len_count > 4):
+                self.logger.error(f"[TRACE]组坯异常!")
+
+                tmp = dst.pop(0)
+                dst.append(billets)
+
+                # test 把无法组坯的不正常钢坯 直接上传服务器
+                if len(tmp) < 4:
+                    self.billet_union(tmp)
+
+                count -= 1
+            elif len_count >= 4 or len(dst[count-1]) >= 4 or len(dst[count-1]) + len_count > 4:
+                dst[count].extend(billets)
+            else:
+                count -= 1
+                dst[count].extend(billets)
 
-        if len(dst[count]) >= 4:
-            self.billet_union(dst[count])
+            if len(dst[count]) >= 4:
+                self.billet_union(dst[count], union_reverse)
 
+            if not union_reverse:
+                self.logger.info(f"[TRACE]北侧冷床上推入{len_count}根钢坯,目前情况:{len(self.bed_left[0])}根|{len(self.bed_left[1])}根|{len(self.bed_left[2])}根")
+            else:
+                self.logger.info(f"[TRACE]南侧冷床上推入{len_count}根钢坯,目前情况:{len(self.bed_right[2])}根|{len(self.bed_right[1])}根|{len(self.bed_right[0])}根")
 
-    def billet_union(self, billets):
-        ccmNo = self.current_heatData['ccmNo'] if self.current_heatData else '0'
+    def billet_union(self, billets: list[Billet], reverse = False):
+        if reverse:
+            billets.reverse()
 
         if len(billets) < 4:
             for i in billets:
-                strandNo = i[0][9]
-                billetNo = self.current_heatNo + ccmNo + strandNo + '{:0>2}'.format(99)
-                i[0] = billetNo
-                i[1] = self.current_heatData
-                if self.current_heatData:
-                    self.sender.billet_upload(self.current_heatData, billetNo, self.total, i[2], i[3], i[4], i[5], '', error=True)
-            self.logger.warning(f"{self.current_heatNo}炉发送异常钢坯{len(billets)}根")
-            return None
-
-        if self.sizing_count_heatNo != self.current_heatNo:
-            self.sizing_count_heatNo = self.current_heatNo
-            self.sizing_count = {}
-
-        # 组坯时,定尺为此组内定尺最小值
-        sizing = billets[0][2]
-        for i in billets:
-            if i[2] < sizing:
-                sizing = i[2]
+                if i.batch_strand():
+                    self.sender.billet_upload(i.heat.to_dict(), i.heat_no + i.ccm_no + str(i.strand_no) + "99", 0, i.sizing, i.drawing_speed, 
+                                              i.start_cutting_time.strftime("%Y-%m-%d %H:%M:%S"), "", '', error=True)
+            self.logger.warning(f"[TRACE]发送异常钢坯{len(billets)}根")
+            return
 
-        if sizing not in self.sizing_count:
-            self.sizing_count[sizing] = 0
+        # 组坯时,定尺为此组内定尺最小值,炉次为最后一根加入此组的炉次
+        sizing = min(billets, key=lambda obj: obj.sizing).sizing
+        heat = max(billets, key=lambda obj: obj.start_cutting_time).heat
 
-        self.sizing_count[sizing] += 1
-        billet_unionNo = self.current_heatNo + '{:0>5}'.format(int(sizing)) + '{:0>2}'.format(self.sizing_count[sizing])
+        billet_unionNo = heat.heat_no + '{:0>5}'.format(int(sizing)) + '{:0>2}'.format(heat.claim_union_seq(sizing))
 
         billetsNo = []
         for i in billets:
-            strandNo = i[0][9]
-            self.total += 1
-            self.strand[int(strandNo)-1] += 1
-            billetNo = self.current_heatNo + ccmNo + strandNo + '{:0>2}'.format(self.strand[int(strandNo)-1])
-            billetsNo.append(billetNo)
-            i[0] = billetNo
-            i[1] = self.current_heatData
-            i[2] = sizing
-            if self.current_heatData:
-                self.sender.billet_upload(self.current_heatData, billetNo, self.total, i[2], i[3], i[4], i[5], billet_unionNo)
+            i.heat = heat
+            i.batch_strand()
+            billetsNo.append(i.billet_no)
+            self.sender.billet_upload(heat.to_dict(), i.billet_no, i.heat_seq, i.sizing, i.drawing_speed,
+                                      i.start_cutting_time.strftime("%Y-%m-%d %H:%M:%S"), "", billet_unionNo)
         
-        if self.current_heatData:
-            self.sender.billet_union(self.current_heatData, billet_unionNo, billetsNo, sizing)
+        self.sender.billet_union(heat.to_dict(), billet_unionNo, billetsNo, sizing)
 
-        self.logger.info(f"{self.current_heatNo}炉组号{billet_unionNo}钢坯{len(billets)}根:\n    {'、'.join(billetsNo)}")
+        self.logger.info(f"[TRACE]{heat.heat_no}炉组号{billet_unionNo}钢坯{len(billets)}根:\n    {'、'.join(billetsNo)}")
 
+    def hostsend_barrier_up_action(self, i: int):
+        gp_tmp = None
+        if self.strand_position[i] <= self.pusher_left.data:
+            index = -1
+            for j in range(len(self.pusher_left_list)-1, -1, -1):
+                if self.pusher_left_list[j].strand_no == i+1:
+                    index = j
+                    break
+            if index == -1:
+                self.logger.debug(f"[TRACE]推钢机北侧未找到{i+1}流的热送钢坯")
+            else:
+                gp_tmp = self.pusher_left_list[index]
+                self.pusher_left_list = self.pusher_left_list[:index] + self.pusher_left_list[index+1:]
+        else:
+            index = -1
+            for j in range(len(self.pusher_right_list)-1, -1, -1):
+                if self.pusher_right_list[j].strand_no == i+1:
+                    index = j
+                    break
+            if index == -1:
+                self.logger.debug(f"[TRACE]推钢机南侧未找到{i+1}流的热送钢坯")
+            else:
+                gp_tmp = self.pusher_right_list[index]
+                self.pusher_right_list = self.pusher_right_list[:index] + self.pusher_right_list[index+1:]
 
-    def data_from_casting(self, i, data, extend=False):
-        with self.locks[i]:
-            if extend:
-                if self.strands_cutting[i] and self.strands_cutting[i][0] == data[0]:
-                    self.logger.info(f"{i+1}流补充了钢坯停切时间")
-                    self.strands_cutting[i] = data
-                elif self.strands_buffer[i] and self.strands_buffer[i][0] == data[0]:
-                    self.logger.info(f"{i+1}流补充了钢坯停切时间")
-                    self.strands_buffer[i] = data
-                else:
-                    self.logger.warning(f"{i+1}流对已经离开的钢坯补充停切时间,无效")
+        if gp_tmp:
+            self.hostsend(gp_tmp)
 
-            else:
-                with self.integration_lock:
-                    # 验证现在拉速积分是否有效
-                    if self.billet_out_sig[i].permitted_update:
-                        # 有效:检测并把所有短尺的钢坯定尺都改为定尺软件给的
-                        if round(self.billet_out_sig[i].data * 1000) - self.integration_total[i] < 10000:
-                            data[2] = self.data_s7.get_value(f"L{i+1}短尺")
-                        
-                        # 对拉速积分进行切割信号校准
-                        self.billet_out_sig[i].data = 0
-                        self.integration_total[i] = 0
-                    else:
-                        # 无效:开启拉速积分
-                        self.integration_start(i)
-
-                if self.strands_cutting[i]:
-                    self.logger.warning(f"{i+1}流有钢坯开切冲突")
-                    self.strands_cutting[i] = data
-                else:
-                    self.logger.info(f"{i+1}流钢坯开切")
-                    self.strands_cutting[i] = data
-
-    def hostsend(self, i):
-        ccmNo = self.current_heatData['ccmNo']
-        strandNo = i[0][9]
-        self.total += 1
-        self.strand[int(strandNo)-1] += 1
-        billetNo = self.current_heatNo + ccmNo + strandNo + '{:0>2}'.format(self.strand[int(strandNo)-1])
-        i[0] = billetNo
-        i[1] = self.current_heatData
-        self.sender.billet_upload(self.current_heatData, billetNo, self.total, i[2], i[3], i[4], i[5], '')
-
-        self.logger.info(f"{self.current_heatNo}炉{strandNo}流钢坯热送:{billetNo}")
-        self.sender.host_send(ccmNo, billetNo, "棒一")
+    def hostsend(self, billet: Billet):
+        billet.batch_strand()
+        self.sender.billet_upload(billet.heat.to_dict(), billet.billet_no, billet.heat_seq, billet.sizing, billet.drawing_speed,
+                                  billet.start_cutting_time.strftime("%Y-%m-%d %H:%M:%S"), "", '')
+
+        self.logger.info(f"[TRACE]{billet.heat.heat_no}炉{billet.strand_no}流钢坯热送:{billet.billet_no}")
+        self.sender.host_send(billet.ccm_no, billet.billet_no, "棒一")
 
     def show_coolbed(self):
         print(f"北侧冷床目前情况:{len(self.bed_left[0])}根|{len(self.bed_left[1])}根|{len(self.bed_left[2])}根")
@@ -430,48 +206,28 @@ class Trace_pusher:
 
     def clean_status(self):
         self.logger.debug(f"[TRACE]小冷床状态清空")
-        with self.count_lock:
-            self.pusher_left_list = []
-            self.pusher_right_list = []
-            self.bed_left = [[], [], []]
-            self.bed_right = [[], [], []]
-
-    def init_coolbed(self):
-        left_count = input(f"输入北侧冷床钢坯根数序列(由外到内):")
-        right_count = input(f"输入南侧冷床钢坯根数序列(由外到内):")
-        if not (left_count.isdigit() and len(left_count) == 3 and right_count.isdigit() and len(right_count) == 3):
-            warnings.warn("冷床格式化错误,请输入正确格式")
-            return None
-        
-        left_count = [int(i) for i in left_count]
-        right_count = [int(i) for i in right_count]
-        temp = ['000000000000', {}, 0, 0, '', '']
-        with self.count_lock:
-            self.bed_left = [[temp.copy() for i in range(left_count[j])] for j in range(3)]
-            self.bed_right = [[temp.copy() for i in range(right_count[j])] for j in range(3)]
-
-
-    def get_billet(self, direc):
-        with self.count_lock:
-            if direc == "left":
-                return self.get_billet_action(self.bed_left)
-            if direc == "right":
-                return self.get_billet_action(self.bed_right)
-        
-    def get_billet_action(self, src: list):
-        for i in range(len(src)-1, -1, -1):
-            if len(src[i]) >= 4:
-                tmp = [j[0] for j in src.pop(i)]
-                src.append([])
-                if tmp[0].startswith('0'):
-                    return []
-                return tmp
+        self.pusher_left_list = []
+        self.pusher_right_list = []
+        self.bed_left = [[], [], []]
+        self.bed_right = [[], [], []]
+
+    def get_billet(self, direc) -> list[str]:
+        if direc == "left":
+            return self.get_billet_action(self.bed_left)
+        if direc == "right":
+            return self.get_billet_action(self.bed_right)
+    
+    def get_billet_action(self, src: list[list[Billet]]) -> list[str]:
+        with self.cool_bed_lock:
+            for i in range(len(src)-1, -1, -1):
+                if len(src[i]) >= 4:
+                    tmp = [j.billet_no for j in src.pop(i)]
+                    src.append([])
+                    return tmp
         
         return []
     
-    def billet_to_stack(self, stackNo, billets):
+    def billet_to_stack(self, stackNo, billets: list[str]):
         if billets:
-            self.logger.info(f"有钢坯放入{stackNo}堆垛")
+            self.logger.info(f"[TRACE]有钢坯放入{stackNo}堆垛")
             self.sender.stack_add('6', billets, "", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), "6#小冷床(南)", '步进冷床堆垛')
-
-# [坯号, 炉次信息, 定尺, 拉速, 开切时间, 停切时间]

+ 5 - 5
models/data_checker.py

@@ -24,16 +24,16 @@ class Checker:
             '车位2车牌看门狗': [120, '6#机车位2车牌识别', '车位2车牌识别', '立刻排查通信、识别软件是否启动'],
             '车位3车牌看门狗': [120, '6#机车位3车牌识别', '车位3车牌识别', '立刻排查通信、识别软件是否启动'],
             '车位4车牌看门狗': [120, '6#机车位4车牌识别', '车位4车牌识别', '立刻排查通信、识别软件是否启动'],
-            ('L1切割信号[1]', 'L2切割信号[1]', 'L3切割信号[1]', 'L4切割信号[1]'): [1200, '5#机1-4流切割数据', '液压剪信号', '立刻排查停机/通信异常'],
-            ('L5切割信号[1]', 'L6切割信号[1]', 'L7切割信号[1]', 'L8切割信号[1]'): [1200, '5#机5-8流切割数据', '液压剪信号', '立刻排查停机/通信异常']
+            ('L1切割信号[1]', 'L2切割信号[1]', 'L3切割信号[1]', 'L4切割信号[1]'): [1800, '5#机1-4流切割数据', '液压剪信号', '立刻排查停机/通信异常'],
+            ('L5切割信号[1]', 'L6切割信号[1]', 'L7切割信号[1]', 'L8切割信号[1]'): [1800, '5#机5-8流切割数据', '液压剪信号', '立刻排查停机/通信异常']
         }
 
         self.policy_6 = {
             '定尺看门狗1': [120, '6#机1-4流定尺数据', '定尺信号', '立刻排查通信'],
             '定尺看门狗2': [120, '6#机5-8流定尺数据', '定尺信号', '立刻排查通信'],
-            '推钢机激光': [1200, '6#机钢坯基础信息、组坯等所有数据', '推钢机信号', '排查停机、plc内信号响应'],
-            ('L1切割信号[1]', 'L2切割信号[1]', 'L3切割信号[1]', 'L4切割信号[1]'): [1200, '6#机1-4流切割数据', '液压剪信号', '立刻排查停机/通信异常'],
-            ('L5切割信号[1]', 'L6切割信号[1]', 'L7切割信号[1]', 'L8切割信号[1]'): [1200, '6#机5-8流切割数据', '液压剪信号', '立刻排查停机/通信异常']
+            '推钢机激光': [1800, '6#机钢坯基础信息、组坯等所有数据', '推钢机信号', '排查停机、plc内信号响应'],
+            ('L1切割信号[1]', 'L2切割信号[1]', 'L3切割信号[1]', 'L4切割信号[1]'): [1800, '6#机1-4流切割数据', '液压剪信号', '立刻排查停机/通信异常'],
+            ('L5切割信号[1]', 'L6切割信号[1]', 'L7切割信号[1]', 'L8切割信号[1]'): [1800, '6#机5-8流切割数据', '液压剪信号', '立刻排查停机/通信异常']
         }
 
         self.time_5 = {}

+ 6 - 3
models/data_sender.py

@@ -1,9 +1,9 @@
 import paho.mqtt.client as mqtt
-import json, time, requests, pymysql, threading
-
+import json, time, requests, pymysql, threading, datetime
+from utils.logger import Logger
 
 class Sender:
-    def __init__(self, logger):
+    def __init__(self, logger: Logger):
         self.logger = logger
 
         self.topic = {
@@ -245,6 +245,9 @@ class Sender:
         tmp['ladleCode'] = heat_data.get('ladleNo', '')
         tmp['moltenSteelWeight'] = float(heat_data.get('netWeight', 0))
         tmp['startPourTime'] = heat_data.get('sendTime', '')
+
+        if isinstance(tmp['startPourTime'], datetime.datetime):
+            tmp['startPourTime'] = tmp['startPourTime'].strftime("%Y-%m-%d %H:%M:%S")
         
         self._cache[heat_data.get('heatNo', 'error')] = tmp
         self.send('heat_add', tmp)

+ 67 - 0
models/mysql_data.py

@@ -0,0 +1,67 @@
+import time, threading
+from utils.logger import Logger
+from dbutils.pooled_db import PooledDB
+from utils.statepoint import Statepoint
+
+class MysqlData:
+    def __init__(self, mysql_pool: PooledDB, logger: Logger):
+        self.mysql_pool = mysql_pool
+        self.logger = logger
+        self.datas = {"5#到站信息": (), "6#到站信息": ()}
+        self.points = {}
+
+        self.thread_flag = True
+        self.thread = threading.Thread(target=self.update_forever)
+        self.thread.start()
+    
+    def get_value(self, ccm_no, default=()):
+        sql = "SELECT heat_no, into_cc_time from nuo_cc_load_info where cc_no = %s order by into_cc_time desc limit 1;"
+        try:
+            with self.mysql_pool.connection() as conn:
+                with conn.cursor() as cursor:
+                    cursor.execute(sql, (ccm_no,))
+                    result = cursor.fetchall()
+            if len(result) == 0:
+                raise ValueError("Read no data from mysql.")
+            return result[0]
+        except Exception as e:
+            self.logger.error(f"[mysql]:{e}")
+            return default
+        
+    def update_forever(self, fru=500):
+        while self.thread_flag:
+            for i in self.datas.keys():
+                self.datas[i] = self.get_value(i[0])
+                if i in self.points.keys():
+                    for j in self.points[i]:
+                        j.inject(self.datas[i])
+            time.sleep(fru/1000)
+
+    def make_point(self, name, point_t=Statepoint):
+        if name not in self.datas.keys():
+            raise NameError(f"Name {name} is not defined.")
+        if name not in self.points.keys():
+            self.points[name] = []
+        
+        tmp_point = point_t(self.datas[name], False)
+        self.points[name].append(tmp_point)
+        return tmp_point
+
+class MysqlDataSizing(MysqlData):
+    def __init__(self, mysql_pool, logger):
+        super().__init__(mysql_pool, logger)
+        self.datas = {f"{i}流定尺": 0 for i in range(1, 9)}
+
+    def get_value(self, strand_no, default=0):
+        sql = "SELECT length FROM autocutwrite WHERE stream = %s ORDER BY cuttime DESC LIMIT 1;"
+        try:
+            with self.mysql_pool.connection() as conn:
+                with conn.cursor() as cursor:
+                    cursor.execute(sql, (strand_no,))
+                    result = cursor.fetchall()
+            if len(result) == 0:
+                raise ValueError("Read no data from mysql.")
+            return result[0][0]
+        except Exception as e:
+            self.logger.error(f"[mysql]:{e}")
+            return default

+ 4 - 4
models/parking.py

@@ -29,6 +29,9 @@ class Parking:
         self.plate_set = {'厂内00664', '厂内00415', '厂内00687', '厂内00701', '厂内00700', '厂内00901', '厂内00699', '厂内00695', '厂内00694', '厂内00693', '厂内00692', '厂内00902', '厂内00690', '厂内00689', '陕E08582D', '陕E08515D', '陕E08000D', '陕E00298D'}
 
         for name in range(4):
+            self.value_a.append(data_s7.make_point(f"车{name+1}自动触发结果"))
+            self.value_m.append(data_s7.make_point(f"车{name+1}手动触发结果"))
+
             tmp = data_s7.make_point(f"车{name+1}自动触发信号")
             tmp.set_excite_action(lambda i=name: self.signal_on(i, 0))
             self.auto_trigger.append(tmp)
@@ -36,10 +39,7 @@ class Parking:
             tmp = data_s7.make_point(f"车{name+1}手动触发信号")
             tmp.set_excite_action(lambda i=name: self.signal_on(i, 1))
             self.manual_trigger.append(tmp)
-
-            self.value_a.append(data_s7.make_point(f"车{name+1}自动触发结果"))
-            self.value_m.append(data_s7.make_point(f"车{name+1}手动触发结果"))
-
+            
             tmp = data_s7.make_point(f"车{name+1}存在")
             tmp.set_excite_action(lambda i=name: self.car_in(i))
             tmp.set_reset_action(lambda i=name: self.car_out(i))

+ 53 - 0
models/s7_writer.py

@@ -0,0 +1,53 @@
+from utils.s7data import S7Client
+from utils.statepoint import Statepoint
+import snap7.util, threading, time
+
+class S7Writer:
+    def __init__(self, s7c: S7Client):
+        self.client = s7c
+        self.bool_dict = {}
+        self.write_dict = {}
+        self.sleep_time = 1000
+        self.run = threading.Event()
+        self.type_dict = {"int": (2, snap7.util.set_int),
+                          "dint": (4, snap7.util.set_dint),
+                          "real": (4, snap7.util.set_real)}
+
+    def add_task_bool(self, db: int, start: int, offset: int, point: Statepoint):
+        self.bool_dict[(db, start, offset)] = point
+
+    def add_task(self, dtype: str, db: int, start: int, point: Statepoint):
+        if dtype not in self.type_dict.keys():
+            raise ValueError(f"type {dtype} is not supported.")
+        self.write_dict[(dtype, db, start)] = point
+
+    def do_task_bool(self):
+        for addr, point in self.bool_dict.items():
+            tmp = self.client.db_read(addr[0], addr[1], 1)
+            snap7.util.set_bool(tmp, 0, addr[2], point.data)
+            self.client.db_write(addr[0], addr[1], tmp)
+
+    def do_task(self):
+        for addr, point in self.write_dict.items():
+            type_length, type_func = self.type_dict[addr[0]]
+            tmp = bytearray(type_length)
+            type_func(tmp, 0, point.data)
+            self.client.db_write(addr[1], addr[2], tmp)
+
+    def write_forever(self):
+        self.run.set()
+        while self.run.is_set():
+            try:
+                self.do_task_bool()
+                self.do_task()
+            except Exception as e:
+                print("PLC写入线程异常:", e)
+                self.run.clear()
+                return
+            time.sleep(self.sleep_time/1000)
+
+    def loop_start(self):
+        if hasattr(self, "thread"):
+            raise ValueError("It can only be started once.")
+        self.thread = threading.Thread(target=self.write_forever)
+        self.thread.start()

+ 18 - 0
utils/s7data.py

@@ -13,6 +13,24 @@ class TS7DataItem(ctypes.Structure):
     ]
 
 class S7Client(snap7.client.Client):
+    def connect(self, address, rack, slot, tcp_port = 102, retry: bool = True, retry_times: int = 10, max_stay: int = 300):
+        if bool(retry) == False:
+            return super().connect(address, rack, slot, tcp_port)
+        
+        stay = 1
+        while retry_times == 0 or retry_times > 1:
+            try:
+                return super().connect(address, rack, slot, tcp_port)
+            except:
+                if retry_times > 1:
+                    retry_times -= 1
+                time.sleep(stay)
+                stay *= 2
+                if stay > max_stay:
+                    stay = max_stay
+        
+        return super().connect(address, rack, slot, tcp_port)
+
     def multi_db_read_py(self, db_number: list, start: list, size: list):
         count = len(size)
         buffers = [ctypes.create_string_buffer(i) for i in size]

+ 7 - 1
utils/statepoint.py

@@ -81,7 +81,13 @@ class Statepoint:
 
     def set_excite_action(self, func = lambda: None):
         if callable(func):
-            self.do_excite = func
+            if self.permitted_update:
+                self.allow_update(0)
+                self.set_state(False)
+                self.do_excite = func
+                self.allow_update()
+            else:
+                self.do_excite = func
         else:
             raise TypeError('The parameter func can only be a function')