from utils.s7data import S7Client from utils.statepoint import Statepoint import snap7.util, threading, time class S7Writer: def __init__(self, s7c: S7Client): self.client = s7c self.bool_dict = {} self.write_dict = {} self.sleep_time = 1000 self.run = threading.Event() self.type_dict = {"int": (2, snap7.util.set_int), "dint": (4, snap7.util.set_dint), "real": (4, snap7.util.set_real)} def add_task_bool(self, db: int, start: int, offset: int, point: Statepoint): self.bool_dict[(db, start, offset)] = point def add_task(self, dtype: str, db: int, start: int, point: Statepoint): if dtype not in self.type_dict.keys(): raise ValueError(f"type {dtype} is not supported.") self.write_dict[(dtype, db, start)] = point def do_task_bool(self): for addr, point in self.bool_dict.items(): tmp = self.client.db_read(addr[0], addr[1], 1) snap7.util.set_bool(tmp, 0, addr[2], point.data) self.client.db_write(addr[0], addr[1], tmp) def do_task(self): for addr, point in self.write_dict.items(): type_length, type_func = self.type_dict[addr[0]] tmp = bytearray(type_length) type_func(tmp, 0, point.data) self.client.db_write(addr[1], addr[2], tmp) def write_forever(self): self.run.set() while self.run.is_set(): try: self.do_task_bool() self.do_task() except Exception as e: print("PLC写入线程异常:", e) self.run.clear() return time.sleep(self.sleep_time/1000) def loop_start(self): if hasattr(self, "thread"): raise ValueError("It can only be started once.") self.thread = threading.Thread(target=self.write_forever) self.thread.start()