123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- import snap7, csv, threading, warnings, time, ctypes
- from utils.statepoint import *
- class TS7DataItem(ctypes.Structure):
- _fields_ = [
- ('Area', ctypes.c_int),
- ('WordLen', ctypes.c_int),
- ('Result', ctypes.c_int),
- ('DBNumber', ctypes.c_int),
- ('Start', ctypes.c_int),
- ('Amount', ctypes.c_int),
- ('pdata', ctypes.c_void_p)
- ]
- class S7Client(snap7.client.Client):
- def multi_db_read_py(self, db_number: list, start: list, size: list):
- count = len(size)
- buffers = [ctypes.create_string_buffer(i) for i in size]
- params = []
- for i in range(count):
- params.append(TS7DataItem(snap7.type.Areas.DB, snap7.type.WordLen.Byte, 0, db_number[i], start[i], size[i], ctypes.cast(buffers[i], ctypes.c_void_p)))
-
- array_type = TS7DataItem * count
- param_array = array_type(*params)
- result = self.read_multi_vars(param_array)
- if result[0]:
- raise RuntimeError("多组读取失败")
-
- res_rtn = [bytearray(i) for i in buffers]
- return res_rtn
- class S7data:
- def __init__(self, csvfile):
- self.logger = None
- self.S7Client = None
- self.lock = threading.Lock()
- self.thread_run = False
- self.threads = []
- self.nodes = {}
- self.node_data = {}
- self.groups = {}
- self.target_from_name = {}
- with open(csvfile) as f:
- for i in csv.DictReader(f):
- if i['name'] in self.nodes:
- raise Exception(f"S7配置文件节点名称重复:{i['name']}")
- else:
- self.nodes[i['name']] = i
- self.node_data[i['name']] = bytearray(int(i['size']))
- if i['group'] not in self.groups:
- self.groups[i['group']] = []
- self.groups[i['group']].append(i['name'])
- def set_logger(self, logger):
- self.logger = logger
- def set_S7Client(self, s7c: S7Client):
- self.S7Client = s7c
- def get_S7Client(self):
- return self.S7Client
- def send(self, name):
- if self.nodes[name]['type'] == 'int':
- data = snap7.util.get_int(self.node_data[name], 0)
- elif self.nodes[name]['type'] == 'dint':
- data = snap7.util.get_dint(self.node_data[name], 0)
- elif self.nodes[name]['type'] == 'bool':
- data = snap7.util.get_bool(self.node_data[name], 0, int(self.nodes[name]['offset']))
- elif self.nodes[name]['type'] == 'boollist':
- data = [(self.node_data[name][0] >> i) & 1 for i in range(8)]
- elif self.nodes[name]['type'] == 'real':
- data = snap7.util.get_real(self.node_data[name], 0)
- elif self.nodes[name]['type'] == 'string':
- data = self.node_data[name][2:2+int.from_bytes(self.node_data[name][1:2])].decode('gbk')
- elif self.nodes[name]['type'] == 'wstring':
- data = self.node_data[name][4:].decode(encoding='utf-16be')
- elif self.nodes[name]['type'] == 'int_list':
- data = []
- for i in range(0, int(self.nodes[name]['size']), 2):
- data.append(snap7.util.get_int(self.node_data[name], i))
- else:
- warnings.warn('暂不支持的类型:' + self.nodes[name]['type'])
- if self.logger:
- self.logger.error('暂不支持的类型:' + self.nodes[name]['type'])
- return None
- if name in self.target_from_name:
- for i in self.target_from_name[name]:
- i.inject(data)
- if self.nodes[name]['type'] == 'boollist' and name + '*' in self.target_from_name:
- for i in range(8):
- for j in self.target_from_name[name+'*'][i]:
- j.inject(data[i])
- def update(self, name):
- nodeinfo = self.nodes[name]
- try:
- while True:
- if not self.thread_run:
- return None
- self.lock.acquire()
- if not self.S7Client.get_connected():
- warnings.warn('S7Client连接中断')
- if self.logger:
- self.logger.error('S7Client连接中断')
- self.thread_run = False
- self.lock.release()
- return None
- tmp = self.S7Client.db_read(int(nodeinfo['db']), int(nodeinfo['start']), int(nodeinfo['size']))
- self.lock.release()
- if self.node_data[name] != tmp:
- self.node_data[name] = tmp
- self.send(name)
- time.sleep(float(nodeinfo['frequency']) / 1000)
- except RuntimeError as reason:
- warnings.warn(reason)
- if self.logger:
- self.logger.error(reason)
- self.thread_run = False
- self.lock.release()
- def start_auto_update(self):
- if self.thread_run:
- return None
- self.threads = []
- if self.S7Client == None:
- warnings.warn('未初始化S7Client')
- if self.logger:
- self.logger.error('未初始化S7Client')
- return None
- if not self.S7Client.get_connected():
- warnings.warn('S7Client未连接')
- if self.logger:
- self.logger.error('S7Client未连接')
- return None
- for key, value in self.nodes.items():
- if value['read_allow'].upper() != 'FALSE':
- self.threads.append(threading.Thread(target=self.update, args=(value['name'],)))
- self.thread_run = True
- for i in self.threads:
- i.start()
- def update_group(self, group_name):
- nodesname = self.groups[group_name]
- db_number = []
- start = []
- size = []
- for name in nodesname:
- nodeinfo = self.nodes[name]
- db_number.append(int(nodeinfo['db']))
- start.append(int(nodeinfo['start']))
- size.append(int(nodeinfo['size']))
- while True:
- if not self.thread_run:
- return None
-
- tmp = False
- read_valid = True
- with self.lock:
- if not self.S7Client.get_connected():
- warnings.warn('S7Client连接中断')
- if self.logger:
- self.logger.error('S7Client连接中断')
- self.thread_run = False
- return None
- try:
- tmp = self.S7Client.multi_db_read_py(db_number, start, size)
- except RuntimeError as reason:
- warnings.warn(reason)
- read_valid = False
- if self.logger:
- self.logger.error(reason)
- self.thread_run = False
- if read_valid and tmp:
- for i in range(len(tmp)):
- if self.node_data[nodesname[i]] != tmp[i]:
- self.node_data[nodesname[i]] = tmp[i]
- self.send(nodesname[i])
- def auto_update_group(self):
- if self.thread_run:
- return None
- self.threads = []
- if self.S7Client == None:
- warnings.warn('未初始化S7Client')
- if self.logger:
- self.logger.error('未初始化S7Client')
- return None
- if not self.S7Client.get_connected():
- warnings.warn('S7Client未连接')
- if self.logger:
- self.logger.error('S7Client未连接')
- return None
-
- for group in self.groups.keys():
- self.threads.append(threading.Thread(target=self.update_group, args=(group,)))
- self.thread_run = True
- for i in self.threads:
- i.start()
- def end_auto_update(self):
- self.thread_run = False
- for i in self.threads:
- i.join()
- def make_point(self, name):
- index = -1
- solvedname = name
- if len(name) > 3 and name[-3] == '[' and name[-1] == ']' and name[-2].isdigit() and 0 <= int(name[-2]) < 8:
- index = int(name[-2])
- name = name[:-3]
- solvedname = name + '*'
- if name not in self.nodes:
- raise ValueError("创建了未配置的点")
- if solvedname not in self.target_from_name:
- if index == -1:
- self.target_from_name[solvedname] = []
- else:
- self.target_from_name[solvedname] = [[],[],[],[],[],[],[],[]]
-
- res = Statepoint()
- if index == -1:
- self.target_from_name[solvedname].append(res)
- else:
- self.target_from_name[solvedname][index].append(res)
- self.send(name)
- return res
|