| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- from utils.s7data import S7Client
- from utils.statepoint import Statepoint
- import snap7.util, threading, time
- class S7Writer:
- def __init__(self, s7c: S7Client):
- self.client = s7c
- self.bool_dict = {}
- self.write_dict = {}
- self.sleep_time = 1000
- self.run = threading.Event()
- self.type_dict = {"int": (2, snap7.util.set_int),
- "dint": (4, snap7.util.set_dint),
- "real": (4, snap7.util.set_real)}
- def add_task_bool(self, db: int, start: int, offset: int, point: Statepoint):
- self.bool_dict[(db, start, offset)] = point
- def add_task(self, dtype: str, db: int, start: int, point: Statepoint):
- if dtype not in self.type_dict.keys():
- raise ValueError(f"type {dtype} is not supported.")
- self.write_dict[(dtype, db, start)] = point
- def do_task_bool(self):
- for addr, point in self.bool_dict.items():
- tmp = self.client.db_read(addr[0], addr[1], 1)
- snap7.util.set_bool(tmp, 0, addr[2], point.data)
- self.client.db_write(addr[0], addr[1], tmp)
- def do_task(self):
- for addr, point in self.write_dict.items():
- type_length, type_func = self.type_dict[addr[0]]
- tmp = bytearray(type_length)
- type_func(tmp, 0, point.data)
- self.client.db_write(addr[1], addr[2], tmp)
- def write_forever(self):
- self.run.set()
- while self.run.is_set():
- try:
- self.do_task_bool()
- self.do_task()
- except Exception as e:
- print("PLC写入线程异常:", e)
- self.run.clear()
- return
- time.sleep(self.sleep_time/1000)
- def loop_start(self):
- if hasattr(self, "thread"):
- raise ValueError("It can only be started once.")
- self.thread = threading.Thread(target=self.write_forever)
- self.thread.start()
|