s7_writer.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. from utils.s7data import S7Client
  2. from utils.statepoint import Statepoint
  3. import snap7.util, threading, time
  4. class S7Writer:
  5. def __init__(self, s7c: S7Client):
  6. self.client = s7c
  7. self.bool_dict = {}
  8. self.write_dict = {}
  9. self.sleep_time = 1000
  10. self.run = threading.Event()
  11. self.type_dict = {"int": (2, snap7.util.set_int),
  12. "dint": (4, snap7.util.set_dint),
  13. "real": (4, snap7.util.set_real)}
  14. def add_task_bool(self, db: int, start: int, offset: int, point: Statepoint):
  15. self.bool_dict[(db, start, offset)] = point
  16. def add_task(self, dtype: str, db: int, start: int, point: Statepoint):
  17. if dtype not in self.type_dict.keys():
  18. raise ValueError(f"type {dtype} is not supported.")
  19. self.write_dict[(dtype, db, start)] = point
  20. def do_task_bool(self):
  21. for addr, point in self.bool_dict.items():
  22. tmp = self.client.db_read(addr[0], addr[1], 1)
  23. snap7.util.set_bool(tmp, 0, addr[2], point.data)
  24. self.client.db_write(addr[0], addr[1], tmp)
  25. def do_task(self):
  26. for addr, point in self.write_dict.items():
  27. type_length, type_func = self.type_dict[addr[0]]
  28. tmp = bytearray(type_length)
  29. type_func(tmp, 0, point.data)
  30. self.client.db_write(addr[1], addr[2], tmp)
  31. def write_forever(self):
  32. self.run.set()
  33. while self.run.is_set():
  34. try:
  35. self.do_task_bool()
  36. self.do_task()
  37. except Exception as e:
  38. print("PLC写入线程异常:", e)
  39. self.run.clear()
  40. return
  41. time.sleep(self.sleep_time/1000)
  42. def loop_start(self):
  43. if hasattr(self, "thread"):
  44. raise ValueError("It can only be started once.")
  45. self.thread = threading.Thread(target=self.write_forever)
  46. self.thread.start()