import snap7, csv, threading, warnings, time from utils.statepoint import * S7Client = snap7.client.Client class S7data: def __init__(self, csvfile): self.logger = None self.S7Client = None self.lock = threading.Lock() self.thread_run = False self.threads = [] self.nodes = {} self.node_data = {} self.target_from_name = {} with open(csvfile) as f: for i in csv.DictReader(f): if i['name'] in self.nodes: raise Exception(f"S7配置文件节点名称重复:{i['name']}") else: self.nodes[i['name']] = i self.node_data[i['name']] = bytearray(int(i['size'])) def set_logger(self, logger): self.logger = logger def set_S7Client(self, s7c): self.S7Client = s7c def get_S7Client(self): return self.S7Client def send(self, name): if self.nodes[name]['type'] == 'int': data = snap7.util.get_int(self.node_data[name], 0) elif self.nodes[name]['type'] == 'bool': data = snap7.util.get_bool(self.node_data[name], 0, int(self.nodes[name]['offset'])) elif self.nodes[name]['type'] == 'boollist': data = [(self.node_data[name][0] >> i) & 1 for i in range(8)] elif self.nodes[name]['type'] == 'real': data = snap7.util.get_real(self.node_data[name], 0) elif self.nodes[name]['type'] == 'string': data = name, snap7.util.get_string(self.node_data[name], 0) elif self.nodes[name]['type'] == 'wstring': data = self.node_data[name][4:].decode(encoding='utf-16be') else: warnings.warn('暂不支持的类型:' + self.nodes[name]['type']) if self.logger: self.logger.error('暂不支持的类型:' + self.nodes[name]['type']) return None if name in self.target_from_name: for i in self.target_from_name[name]: i.inject(data) if self.nodes[name]['type'] == 'boollist' and name + '*' in self.target_from_name: for i in range(8): for j in self.target_from_name[name+'*'][i]: j.inject(data[i]) def update(self, name): nodeinfo = self.nodes[name] try: while True: if not self.thread_run: return None self.lock.acquire() if not self.S7Client.get_connected(): warnings.warn('S7Client连接中断') if self.logger: self.logger.error('S7Client连接中断') self.thread_run = False self.lock.release() return None tmp = self.S7Client.db_read(int(nodeinfo['db']), int(nodeinfo['start']), int(nodeinfo['size'])) self.lock.release() if self.node_data[name] != tmp: self.node_data[name] = tmp self.send(name) time.sleep(float(nodeinfo['frequency']) / 1000) except RuntimeError as reason: warnings.warn(reason) if self.logger: self.logger.error(reason) self.thread_run = False self.lock.release() def start_auto_update(self): if self.thread_run: return None self.threads = [] if self.S7Client == None: warnings.warn('未初始化S7Client') if self.logger: self.logger.error('未初始化S7Client') return None if not self.S7Client.get_connected(): warnings.warn('S7Client未连接') if self.logger: self.logger.error('S7Client未连接') return None for key, value in self.nodes.items(): if value['read_allow'].upper() != 'FALSE': self.threads.append(threading.Thread(target=self.update, args=(value['name'],))) self.thread_run = True for i in self.threads: i.start() def end_auto_update(self): self.thread_run = False for i in self.threads: i.join() def make_point(self, name): index = -1 solvedname = name if len(name) > 3 and name[-3] == '[' and name[-1] == ']' and name[-2].isdigit() and 0 <= int(name[-2]) < 8: index = int(name[-2]) name = name[:-3] solvedname = name + '*' if name not in self.nodes: raise ValueError("创建了未配置的点") if solvedname not in self.target_from_name: if index == -1: self.target_from_name[solvedname] = [] else: self.target_from_name[solvedname] = [[],[],[],[],[],[],[],[]] res = Statepoint() if index == -1: self.target_from_name[solvedname].append(res) else: self.target_from_name[solvedname][index].append(res) return res