import threading, time, snap7 from utils.s7data import S7data from models.data_sender import Sender class Parking: def __init__(self, data_s7: S7data, sender: Sender): self.data_s7 = data_s7 self.sender = sender self.ccmNo_list = ["5", "6", "6", "6"] self.auto_trigger = [] self.manual_trigger = [] self.value_a = [] self.value_m = [] self.lock = threading.Lock() self.current_car = ["", "", "", ""] self.car_time = [0, 0, 0, 0] self.car_be = [] self.car_be_send_state = [False] * 4 self.car_be_lock = [threading.Lock() for i in range(4)] self.send_lock = threading.Lock() self.s7_sender = snap7.client.Client() self.s7_sender.connect('192.168.1.215', 0, 1) self.plate_set = {'厂内00664', '厂内00415', '厂内00687', '厂内00701', '厂内00700', '厂内00901', '厂内00699', '厂内00695', '厂内00694', '厂内00693', '厂内00692', '厂内00902', '厂内00690', '厂内00689', '陕E08582D', '陕E08515D', '陕E08000D', '陕E00298D'} for name in range(4): self.value_a.append(data_s7.make_point(f"车{name+1}自动触发结果")) self.value_m.append(data_s7.make_point(f"车{name+1}手动触发结果")) tmp = data_s7.make_point(f"车{name+1}自动触发信号") tmp.set_excite_action(lambda i=name: self.signal_on(i, 0)) self.auto_trigger.append(tmp) tmp = data_s7.make_point(f"车{name+1}手动触发信号") tmp.set_excite_action(lambda i=name: self.signal_on(i, 1)) self.manual_trigger.append(tmp) tmp = data_s7.make_point(f"车{name+1}存在") tmp.set_excite_action(lambda i=name: self.car_in(i)) tmp.set_reset_action(lambda i=name: self.car_out(i)) self.car_be.append(tmp) def signal_on(self, i, manual_flag): if manual_flag: valuedata = self.value_m[i] else: valuedata = self.value_a[i] time.sleep(3) send_flag = False with self.lock: if self.current_car[i] != valuedata.data: send_flag = True self.current_car[i] = valuedata.data self.car_time[i] = time.time() # self.sender.car_add(self.ccmNo_list[i], str(i+1), self.current_car[i]) if send_flag: self.sender.plate_update(self.ccmNo_list[i], str(i+1), self.current_car[i]) def car_in(self, i): with self.car_be_lock[i]: if self.car_be_send_state[i]: return None # 临时测试 with self.send_lock: byte_value = self.s7_sender.db_read(306, 38, 1) snap7.util.set_bool(byte_value, 0, 2, True) self.s7_sender.db_write(306, 38, byte_value) time.sleep(3) with self.send_lock: byte_value = self.s7_sender.db_read(306, 38, 1) snap7.util.set_bool(byte_value, 0, 2, False) self.s7_sender.db_write(306, 38, byte_value) start_time = time.time() now = time.time() while self.car_be[i].state and now - start_time < 10: time.sleep(0.5) now = time.time() if now - start_time < 10: return None else: plate = '' if time.time() - self.car_time[i] <= 10: plate = self.current_car[i] self.sender.car_add(self.ccmNo_list[i], str(i+1), plate) self.car_be_send_state[i] = True def car_out(self, i): with self.car_be_lock[i]: if not self.car_be_send_state[i]: return None start_time = time.time() now = time.time() while not self.car_be[i].state and now - start_time < 10: time.sleep(0.5) now = time.time() if now - start_time < 10: return None else: self.sender.car_go(self.ccmNo_list[i], str(i+1)) self.car_be_send_state[i] = False