s7data.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. import snap7, csv, threading, warnings, time, ctypes
  2. from utils.statepoint import *
  3. class TS7DataItem(ctypes.Structure):
  4. _fields_ = [
  5. ('Area', ctypes.c_int),
  6. ('WordLen', ctypes.c_int),
  7. ('Result', ctypes.c_int),
  8. ('DBNumber', ctypes.c_int),
  9. ('Start', ctypes.c_int),
  10. ('Amount', ctypes.c_int),
  11. ('pdata', ctypes.c_void_p)
  12. ]
  13. class S7Client(snap7.client.Client):
  14. def multi_db_read_py(self, db_number: list, start: list, size: list):
  15. count = len(size)
  16. buffers = [ctypes.create_string_buffer(i) for i in size]
  17. params = []
  18. for i in range(count):
  19. params.append(TS7DataItem(snap7.type.Areas.DB, snap7.type.WordLen.Byte, 0, db_number[i], start[i], size[i], ctypes.cast(buffers[i], ctypes.c_void_p)))
  20. array_type = TS7DataItem * count
  21. param_array = array_type(*params)
  22. result = self.read_multi_vars(param_array)
  23. if result[0]:
  24. raise RuntimeError("多组读取失败")
  25. res_rtn = [bytearray(i) for i in buffers]
  26. return res_rtn
  27. class S7data:
  28. def __init__(self, csvfile):
  29. self.logger = None
  30. self.S7Client = None
  31. self.lock = threading.Lock()
  32. self.thread_run = False
  33. self.threads = []
  34. self.nodes = {}
  35. self.node_data = {}
  36. self.groups = {}
  37. self.target_from_name = {}
  38. with open(csvfile) as f:
  39. for i in csv.DictReader(f):
  40. if i['name'] in self.nodes:
  41. raise Exception(f"S7配置文件节点名称重复:{i['name']}")
  42. else:
  43. self.nodes[i['name']] = i
  44. self.node_data[i['name']] = bytearray(int(i['size']))
  45. if i['group'] not in self.groups:
  46. self.groups[i['group']] = []
  47. self.groups[i['group']].append(i['name'])
  48. def set_logger(self, logger):
  49. self.logger = logger
  50. def set_S7Client(self, s7c: S7Client):
  51. self.S7Client = s7c
  52. def get_S7Client(self):
  53. return self.S7Client
  54. def get_value(self, name):
  55. if len(name) > 3 and name[-3] == '[' and name[-1] == ']' and name[-2].isdigit() and 0 <= int(name[-2]) < 8:
  56. index = int(name[-2])
  57. name = name[:-3]
  58. data = (self.node_data[name][0] >> index) & 1
  59. elif self.nodes[name]['type'] == 'int':
  60. data = snap7.util.get_int(self.node_data[name], 0)
  61. elif self.nodes[name]['type'] == 'dint':
  62. data = snap7.util.get_dint(self.node_data[name], 0)
  63. elif self.nodes[name]['type'] == 'bool':
  64. data = snap7.util.get_bool(self.node_data[name], 0, int(self.nodes[name]['offset']))
  65. elif self.nodes[name]['type'] == 'boollist':
  66. data = [(self.node_data[name][0] >> i) & 1 for i in range(8)]
  67. elif self.nodes[name]['type'] == 'real':
  68. data = snap7.util.get_real(self.node_data[name], 0)
  69. elif self.nodes[name]['type'] == 'string':
  70. data = self.node_data[name][2:2+int.from_bytes(self.node_data[name][1:2])].decode('gbk', errors='replace')
  71. elif self.nodes[name]['type'] == 'wstring':
  72. data = self.node_data[name][4:].decode(encoding='utf-16be', errors='replace')
  73. else:
  74. warnings.warn('暂不支持的类型:' + self.nodes[name]['type'])
  75. if self.logger:
  76. self.logger.error('暂不支持的类型:' + self.nodes[name]['type'])
  77. return None
  78. return data
  79. def send(self, name):
  80. if self.nodes[name]['type'] == 'int':
  81. data = snap7.util.get_int(self.node_data[name], 0)
  82. elif self.nodes[name]['type'] == 'dint':
  83. data = snap7.util.get_dint(self.node_data[name], 0)
  84. elif self.nodes[name]['type'] == 'bool':
  85. data = snap7.util.get_bool(self.node_data[name], 0, int(self.nodes[name]['offset']))
  86. elif self.nodes[name]['type'] == 'boollist':
  87. data = [(self.node_data[name][0] >> i) & 1 for i in range(8)]
  88. elif self.nodes[name]['type'] == 'real':
  89. data = snap7.util.get_real(self.node_data[name], 0)
  90. elif self.nodes[name]['type'] == 'string':
  91. data = self.node_data[name][2:2+int.from_bytes(self.node_data[name][1:2])].decode('gbk', errors='replace')
  92. elif self.nodes[name]['type'] == 'wstring':
  93. data = self.node_data[name][4:].decode(encoding='utf-16be', errors='replace')
  94. elif self.nodes[name]['type'] == 'int_list':
  95. data = []
  96. for i in range(0, int(self.nodes[name]['size']), 2):
  97. data.append(snap7.util.get_int(self.node_data[name], i))
  98. else:
  99. warnings.warn('暂不支持的类型:' + self.nodes[name]['type'])
  100. if self.logger:
  101. self.logger.error('暂不支持的类型:' + self.nodes[name]['type'])
  102. return None
  103. if name in self.target_from_name:
  104. for i in self.target_from_name[name]:
  105. i.inject(data)
  106. if self.nodes[name]['type'] == 'boollist' and name + '*' in self.target_from_name:
  107. for i in range(8):
  108. for j in self.target_from_name[name+'*'][i]:
  109. j.inject(data[i])
  110. def update(self, name):
  111. nodeinfo = self.nodes[name]
  112. try:
  113. while True:
  114. if not self.thread_run:
  115. return None
  116. self.lock.acquire()
  117. if not self.S7Client.get_connected():
  118. warnings.warn('S7Client连接中断')
  119. if self.logger:
  120. self.logger.error('S7Client连接中断')
  121. self.thread_run = False
  122. self.lock.release()
  123. return None
  124. tmp = self.S7Client.db_read(int(nodeinfo['db']), int(nodeinfo['start']), int(nodeinfo['size']))
  125. self.lock.release()
  126. if self.node_data[name] != tmp:
  127. self.node_data[name] = tmp
  128. self.send(name)
  129. time.sleep(float(nodeinfo['frequency']) / 1000)
  130. except RuntimeError as reason:
  131. warnings.warn(reason)
  132. if self.logger:
  133. self.logger.error(reason)
  134. self.thread_run = False
  135. self.lock.release()
  136. def start_auto_update(self):
  137. if self.thread_run:
  138. return None
  139. self.threads = []
  140. if self.S7Client == None:
  141. warnings.warn('未初始化S7Client')
  142. if self.logger:
  143. self.logger.error('未初始化S7Client')
  144. return None
  145. if not self.S7Client.get_connected():
  146. warnings.warn('S7Client未连接')
  147. if self.logger:
  148. self.logger.error('S7Client未连接')
  149. return None
  150. for key, value in self.nodes.items():
  151. if value['read_allow'].upper() != 'FALSE':
  152. self.threads.append(threading.Thread(target=self.update, args=(value['name'],)))
  153. self.thread_run = True
  154. for i in self.threads:
  155. i.start()
  156. def update_group(self, group_name):
  157. nodesname = self.groups[group_name]
  158. db_number = []
  159. start = []
  160. size = []
  161. for name in nodesname:
  162. nodeinfo = self.nodes[name]
  163. db_number.append(int(nodeinfo['db']))
  164. start.append(int(nodeinfo['start']))
  165. size.append(int(nodeinfo['size']))
  166. while True:
  167. if not self.thread_run:
  168. return None
  169. tmp = False
  170. read_valid = True
  171. with self.lock:
  172. if not self.S7Client.get_connected():
  173. warnings.warn('S7Client连接中断')
  174. if self.logger:
  175. self.logger.error('S7Client连接中断')
  176. self.thread_run = False
  177. return None
  178. try:
  179. tmp = self.S7Client.multi_db_read_py(db_number, start, size)
  180. except RuntimeError as reason:
  181. warnings.warn(reason)
  182. read_valid = False
  183. if self.logger:
  184. self.logger.error(reason)
  185. self.thread_run = False
  186. if read_valid and tmp:
  187. for i in range(len(tmp)):
  188. if self.node_data[nodesname[i]] != tmp[i]:
  189. self.node_data[nodesname[i]] = tmp[i]
  190. self.send(nodesname[i])
  191. def auto_update_group(self):
  192. if self.thread_run:
  193. return None
  194. self.threads = []
  195. if self.S7Client == None:
  196. warnings.warn('未初始化S7Client')
  197. if self.logger:
  198. self.logger.error('未初始化S7Client')
  199. return None
  200. if not self.S7Client.get_connected():
  201. warnings.warn('S7Client未连接')
  202. if self.logger:
  203. self.logger.error('S7Client未连接')
  204. return None
  205. for group in self.groups.keys():
  206. self.threads.append(threading.Thread(target=self.update_group, args=(group,)))
  207. self.thread_run = True
  208. for i in self.threads:
  209. i.start()
  210. def end_auto_update(self):
  211. self.thread_run = False
  212. for i in self.threads:
  213. i.join()
  214. def make_point(self, name, point_type = Statepoint):
  215. index = -1
  216. solvedname = name
  217. if len(name) > 3 and name[-3] == '[' and name[-1] == ']' and name[-2].isdigit() and 0 <= int(name[-2]) < 8:
  218. index = int(name[-2])
  219. name = name[:-3]
  220. solvedname = name + '*'
  221. if name not in self.nodes:
  222. raise ValueError("创建了未配置的点")
  223. if solvedname not in self.target_from_name:
  224. if index == -1:
  225. self.target_from_name[solvedname] = []
  226. else:
  227. self.target_from_name[solvedname] = [[],[],[],[],[],[],[],[]]
  228. res = point_type()
  229. if index == -1:
  230. self.target_from_name[solvedname].append(res)
  231. else:
  232. self.target_from_name[solvedname][index].append(res)
  233. self.send(name)
  234. return res