s7data.py 11 KB


  1. import snap7, csv, threading, warnings, time, ctypes
  2. from utils.statepoint import *
  3. class TS7DataItem(ctypes.Structure):
  4. _fields_ = [
  5. ('Area', ctypes.c_int),
  6. ('WordLen', ctypes.c_int),
  7. ('Result', ctypes.c_int),
  8. ('DBNumber', ctypes.c_int),
  9. ('Start', ctypes.c_int),
  10. ('Amount', ctypes.c_int),
  11. ('pdata', ctypes.c_void_p)
  12. ]
  13. class S7Client(snap7.client.Client):
  14. def connect(self, address, rack, slot, tcp_port = 102, retry: bool = True, retry_times: int = 10, max_stay: int = 300):
  15. if bool(retry) == False:
  16. return super().connect(address, rack, slot, tcp_port)
  17. stay = 1
  18. while retry_times == 0 or retry_times > 1:
  19. try:
  20. return super().connect(address, rack, slot, tcp_port)
  21. except:
  22. if retry_times > 1:
  23. retry_times -= 1
  24. time.sleep(stay)
  25. stay *= 2
  26. if stay > max_stay:
  27. stay = max_stay
  28. return super().connect(address, rack, slot, tcp_port)
  29. def multi_db_read_py(self, db_number: list, start: list, size: list):
  30. count = len(size)
  31. buffers = [ctypes.create_string_buffer(i) for i in size]
  32. params = []
  33. for i in range(count):
  34. params.append(TS7DataItem(snap7.type.Areas.DB, snap7.type.WordLen.Byte, 0, db_number[i], start[i], size[i], ctypes.cast(buffers[i], ctypes.c_void_p)))
  35. array_type = TS7DataItem * count
  36. param_array = array_type(*params)
  37. result = self.read_multi_vars(param_array)
  38. if result[0]:
  39. raise RuntimeError("多组读取失败")
  40. res_rtn = [bytearray(i) for i in buffers]
  41. return res_rtn
  42. class S7data:
  43. def __init__(self, csvfile):
  44. self.logger = None
  45. self.S7Client = None
  46. self.lock = threading.Lock()
  47. self.thread_run = False
  48. self.threads = []
  49. self.nodes = {}
  50. self.node_data = {}
  51. self.groups = {}
  52. self.target_from_name = {}
  53. with open(csvfile) as f:
  54. for i in csv.DictReader(f):
  55. if i['name'] in self.nodes:
  56. raise Exception(f"S7配置文件节点名称重复:{i['name']}")
  57. else:
  58. self.nodes[i['name']] = i
  59. self.node_data[i['name']] = bytearray(int(i['size']))
  60. if i['group'] not in self.groups:
  61. self.groups[i['group']] = []
  62. self.groups[i['group']].append(i['name'])
  63. def set_logger(self, logger):
  64. self.logger = logger
  65. def set_S7Client(self, s7c: S7Client):
  66. self.S7Client = s7c
  67. def get_S7Client(self):
  68. return self.S7Client
  69. def get_value(self, name):
  70. if len(name) > 3 and name[-3] == '[' and name[-1] == ']' and name[-2].isdigit() and 0 <= int(name[-2]) < 8:
  71. index = int(name[-2])
  72. name = name[:-3]
  73. data = (self.node_data[name][0] >> index) & 1
  74. elif self.nodes[name]['type'] == 'int':
  75. data = snap7.util.get_int(self.node_data[name], 0)
  76. elif self.nodes[name]['type'] == 'dint':
  77. data = snap7.util.get_dint(self.node_data[name], 0)
  78. elif self.nodes[name]['type'] == 'bool':
  79. data = snap7.util.get_bool(self.node_data[name], 0, int(self.nodes[name]['offset']))
  80. elif self.nodes[name]['type'] == 'boollist':
  81. data = [(self.node_data[name][0] >> i) & 1 for i in range(8)]
  82. elif self.nodes[name]['type'] == 'real':
  83. data = snap7.util.get_real(self.node_data[name], 0)
  84. elif self.nodes[name]['type'] == 'string':
  85. data = self.node_data[name][2:2+int.from_bytes(self.node_data[name][1:2])].decode('gbk', errors='replace')
  86. elif self.nodes[name]['type'] == 'wstring':
  87. data = self.node_data[name][4:].decode(encoding='utf-16be', errors='replace')
  88. else:
  89. warnings.warn('暂不支持的类型:' + self.nodes[name]['type'])
  90. if self.logger:
  91. self.logger.error('暂不支持的类型:' + self.nodes[name]['type'])
  92. return None
  93. return data
  94. def send(self, name):
  95. if self.nodes[name]['type'] == 'int':
  96. data = snap7.util.get_int(self.node_data[name], 0)
  97. elif self.nodes[name]['type'] == 'dint':
  98. data = snap7.util.get_dint(self.node_data[name], 0)
  99. elif self.nodes[name]['type'] == 'bool':
  100. data = snap7.util.get_bool(self.node_data[name], 0, int(self.nodes[name]['offset']))
  101. elif self.nodes[name]['type'] == 'boollist':
  102. data = [(self.node_data[name][0] >> i) & 1 for i in range(8)]
  103. elif self.nodes[name]['type'] == 'real':
  104. data = snap7.util.get_real(self.node_data[name], 0)
  105. elif self.nodes[name]['type'] == 'string':
  106. data = self.node_data[name][2:2+int.from_bytes(self.node_data[name][1:2])].decode('gbk', errors='replace')
  107. elif self.nodes[name]['type'] == 'wstring':
  108. data = self.node_data[name][4:].decode(encoding='utf-16be', errors='replace')
  109. elif self.nodes[name]['type'] == 'int_list':
  110. data = []
  111. for i in range(0, int(self.nodes[name]['size']), 2):
  112. data.append(snap7.util.get_int(self.node_data[name], i))
  113. else:
  114. warnings.warn('暂不支持的类型:' + self.nodes[name]['type'])
  115. if self.logger:
  116. self.logger.error('暂不支持的类型:' + self.nodes[name]['type'])
  117. return None
  118. if name in self.target_from_name:
  119. for i in self.target_from_name[name]:
  120. i.inject(data)
  121. if self.nodes[name]['type'] == 'boollist' and name + '*' in self.target_from_name:
  122. for i in range(8):
  123. for j in self.target_from_name[name+'*'][i]:
  124. j.inject(data[i])
  125. def update(self, name):
  126. nodeinfo = self.nodes[name]
  127. try:
  128. while True:
  129. if not self.thread_run:
  130. return None
  131. self.lock.acquire()
  132. if not self.S7Client.get_connected():
  133. warnings.warn('S7Client连接中断')
  134. if self.logger:
  135. self.logger.error('S7Client连接中断')
  136. self.thread_run = False
  137. self.lock.release()
  138. return None
  139. tmp = self.S7Client.db_read(int(nodeinfo['db']), int(nodeinfo['start']), int(nodeinfo['size']))
  140. self.lock.release()
  141. if self.node_data[name] != tmp:
  142. self.node_data[name] = tmp
  143. self.send(name)
  144. time.sleep(float(nodeinfo['frequency']) / 1000)
  145. except RuntimeError as reason:
  146. warnings.warn(reason)
  147. if self.logger:
  148. self.logger.error(reason)
  149. self.thread_run = False
  150. self.lock.release()
  151. def start_auto_update(self):
  152. if self.thread_run:
  153. return None
  154. self.threads = []
  155. if self.S7Client == None:
  156. warnings.warn('未初始化S7Client')
  157. if self.logger:
  158. self.logger.error('未初始化S7Client')
  159. return None
  160. if not self.S7Client.get_connected():
  161. warnings.warn('S7Client未连接')
  162. if self.logger:
  163. self.logger.error('S7Client未连接')
  164. return None
  165. for key, value in self.nodes.items():
  166. if value['read_allow'].upper() != 'FALSE':
  167. self.threads.append(threading.Thread(target=self.update, args=(value['name'],)))
  168. self.thread_run = True
  169. for i in self.threads:
  170. i.start()
  171. def update_group(self, group_name):
  172. nodesname = self.groups[group_name]
  173. db_number = []
  174. start = []
  175. size = []
  176. for name in nodesname:
  177. nodeinfo = self.nodes[name]
  178. db_number.append(int(nodeinfo['db']))
  179. start.append(int(nodeinfo['start']))
  180. size.append(int(nodeinfo['size']))
  181. while True:
  182. if not self.thread_run:
  183. return None
  184. tmp = False
  185. read_valid = True
  186. with self.lock:
  187. if not self.S7Client.get_connected():
  188. warnings.warn('S7Client连接中断')
  189. if self.logger:
  190. self.logger.error('S7Client连接中断')
  191. self.thread_run = False
  192. return None
  193. try:
  194. tmp = self.S7Client.multi_db_read_py(db_number, start, size)
  195. except RuntimeError as reason:
  196. warnings.warn(reason)
  197. read_valid = False
  198. if self.logger:
  199. self.logger.error(reason)
  200. self.thread_run = False
  201. if read_valid and tmp:
  202. for i in range(len(tmp)):
  203. if self.node_data[nodesname[i]] != tmp[i]:
  204. self.node_data[nodesname[i]] = tmp[i]
  205. self.send(nodesname[i])
  206. def auto_update_group(self):
  207. if self.thread_run:
  208. return None
  209. self.threads = []
  210. if self.S7Client == None:
  211. warnings.warn('未初始化S7Client')
  212. if self.logger:
  213. self.logger.error('未初始化S7Client')
  214. return None
  215. if not self.S7Client.get_connected():
  216. warnings.warn('S7Client未连接')
  217. if self.logger:
  218. self.logger.error('S7Client未连接')
  219. return None
  220. for group in self.groups.keys():
  221. self.threads.append(threading.Thread(target=self.update_group, args=(group,)))
  222. self.thread_run = True
  223. for i in self.threads:
  224. i.start()
  225. def end_auto_update(self):
  226. self.thread_run = False
  227. for i in self.threads:
  228. i.join()
  229. def make_point(self, name, point_type = Statepoint):
  230. index = -1
  231. solvedname = name
  232. if len(name) > 3 and name[-3] == '[' and name[-1] == ']' and name[-2].isdigit() and 0 <= int(name[-2]) < 8:
  233. index = int(name[-2])
  234. name = name[:-3]
  235. solvedname = name + '*'
  236. if name not in self.nodes:
  237. raise ValueError("创建了未配置的点")
  238. if solvedname not in self.target_from_name:
  239. if index == -1:
  240. self.target_from_name[solvedname] = []
  241. else:
  242. self.target_from_name[solvedname] = [[],[],[],[],[],[],[],[]]
  243. res = point_type()
  244. if index == -1:
  245. self.target_from_name[solvedname].append(res)
  246. else:
  247. self.target_from_name[solvedname][index].append(res)
  248. self.send(name)
  249. return res