12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- import threading, time
- from utils.s7data import S7data
- from models.data_sender import Sender
- class Parking:
- def __init__(self, data_s7: S7data, sender: Sender):
- self.sender = sender
- self.ccmNo_list = ["5", "6", "6", "6"]
- self.auto_trigger = []
- self.manual_trigger = []
- self.value_a = []
- self.value_m = []
- self.lock = threading.Lock()
- self.current_car = ["", "", "", ""]
- self.car_time = [0, 0, 0, 0]
- self.car_be = []
- self.car_be_send_state = [False] * 4
- self.car_be_lock = [threading.Lock() for i in range(4)]
- self.plate_set = {'厂内00664', '厂内00415', '厂内00687', '厂内00701', '厂内00700', '厂内00901', '厂内00699', '厂内00695', '厂内00694', '厂内00693', '厂内00692', '厂内00902', '厂内00690', '厂内00689', '陕E08582D', '陕E08515D', '陕E08000D', '陕E00298D'}
- for name in range(4):
- tmp = data_s7.make_point(f"车{name+1}自动触发信号")
- tmp.set_excite_action(lambda i=name: self.signal_on(i, 0))
- self.auto_trigger.append(tmp)
- tmp = data_s7.make_point(f"车{name+1}手动触发信号")
- tmp.set_excite_action(lambda i=name: self.signal_on(i, 1))
- self.manual_trigger.append(tmp)
- self.value_a.append(data_s7.make_point(f"车{name+1}自动触发结果"))
- self.value_m.append(data_s7.make_point(f"车{name+1}手动触发结果"))
- tmp = data_s7.make_point(f"车{name+1}存在")
- tmp.set_excite_action(lambda i=name: self.car_in(i))
- tmp.set_reset_action(lambda i=name: self.car_out(i))
- self.car_be.append(tmp)
- def signal_on(self, i, manual_flag):
- if manual_flag:
- valuedata = self.value_m[i]
- else:
- valuedata = self.value_a[i]
- time.sleep(3)
- send_flag = False
- with self.lock:
- if self.current_car[i] != valuedata.data:
- send_flag = True
- self.current_car[i] = valuedata.data
- self.car_time[i] = time.time()
- # self.sender.car_add(self.ccmNo_list[i], str(i+1), self.current_car[i])
- if send_flag:
- self.sender.plate_update(self.ccmNo_list[i], str(i+1), self.current_car[i])
- def car_in(self, i):
- with self.car_be_lock[i]:
- if self.car_be_send_state[i]:
- return None
- start_time = time.time()
- now = time.time()
- while self.car_be[i].state and now - start_time < 10:
- time.sleep(0.5)
- now = time.time()
-
- if now - start_time < 10:
- return None
- else:
- plate = ''
- if time.time() - self.car_time[i] <= 10:
- plate = self.current_car[i]
- self.sender.car_add(self.ccmNo_list[i], str(i+1), plate)
- self.car_be_send_state[i] = True
- def car_out(self, i):
- with self.car_be_lock[i]:
- if not self.car_be_send_state[i]:
- return None
- start_time = time.time()
- now = time.time()
- while not self.car_be[i].state and now - start_time < 10:
- time.sleep(0.5)
- now = time.time()
-
- if now - start_time < 10:
- return None
- else:
- self.sender.car_go(self.ccmNo_list[i], str(i+1))
- self.car_be_send_state[i] = False
|