parking.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import threading, time, snap7
  2. from utils.s7data import S7data
  3. from models.data_sender import Sender
  4. class Parking:
  5. def __init__(self, data_s7: S7data, sender: Sender):
  6. self.data_s7 = data_s7
  7. self.sender = sender
  8. self.ccmNo_list = ["5", "6", "6", "6"]
  9. self.auto_trigger = []
  10. self.manual_trigger = []
  11. self.value_a = []
  12. self.value_m = []
  13. self.lock = threading.Lock()
  14. self.current_car = ["", "", "", ""]
  15. self.car_time = [0, 0, 0, 0]
  16. self.car_be = []
  17. self.car_be_send_state = [False] * 4
  18. self.car_be_lock = [threading.Lock() for i in range(4)]
  19. self.send_lock = threading.Lock()
  20. self.s7_sender = snap7.client.Client()
  21. self.s7_sender.connect('192.168.1.215', 0, 1)
  22. self.plate_set = {'厂内00664', '厂内00415', '厂内00687', '厂内00701', '厂内00700', '厂内00901', '厂内00699', '厂内00695', '厂内00694', '厂内00693', '厂内00692', '厂内00902', '厂内00690', '厂内00689', '陕E08582D', '陕E08515D', '陕E08000D', '陕E00298D'}
  23. for name in range(4):
  24. self.value_a.append(data_s7.make_point(f"车{name+1}自动触发结果"))
  25. self.value_m.append(data_s7.make_point(f"车{name+1}手动触发结果"))
  26. tmp = data_s7.make_point(f"车{name+1}自动触发信号")
  27. tmp.set_excite_action(lambda i=name: self.signal_on(i, 0))
  28. self.auto_trigger.append(tmp)
  29. tmp = data_s7.make_point(f"车{name+1}手动触发信号")
  30. tmp.set_excite_action(lambda i=name: self.signal_on(i, 1))
  31. self.manual_trigger.append(tmp)
  32. tmp = data_s7.make_point(f"车{name+1}存在")
  33. tmp.set_excite_action(lambda i=name: self.car_in(i))
  34. tmp.set_reset_action(lambda i=name: self.car_out(i))
  35. self.car_be.append(tmp)
  36. def signal_on(self, i, manual_flag):
  37. if manual_flag:
  38. valuedata = self.value_m[i]
  39. else:
  40. valuedata = self.value_a[i]
  41. time.sleep(3)
  42. send_flag = False
  43. with self.lock:
  44. if self.current_car[i] != valuedata.data:
  45. send_flag = True
  46. self.current_car[i] = valuedata.data
  47. self.car_time[i] = time.time()
  48. # self.sender.car_add(self.ccmNo_list[i], str(i+1), self.current_car[i])
  49. if send_flag:
  50. self.sender.plate_update(self.ccmNo_list[i], str(i+1), self.current_car[i])
  51. def car_in(self, i):
  52. with self.car_be_lock[i]:
  53. if self.car_be_send_state[i]:
  54. return None
  55. # 临时测试
  56. with self.send_lock:
  57. byte_value = self.s7_sender.db_read(306, 38, 1)
  58. snap7.util.set_bool(byte_value, 0, 2, True)
  59. self.s7_sender.db_write(306, 38, byte_value)
  60. time.sleep(3)
  61. with self.send_lock:
  62. byte_value = self.s7_sender.db_read(306, 38, 1)
  63. snap7.util.set_bool(byte_value, 0, 2, False)
  64. self.s7_sender.db_write(306, 38, byte_value)
  65. start_time = time.time()
  66. now = time.time()
  67. while self.car_be[i].state and now - start_time < 10:
  68. time.sleep(0.5)
  69. now = time.time()
  70. if now - start_time < 10:
  71. return None
  72. else:
  73. plate = ''
  74. if time.time() - self.car_time[i] <= 10:
  75. plate = self.current_car[i]
  76. self.sender.car_add(self.ccmNo_list[i], str(i+1), plate)
  77. self.car_be_send_state[i] = True
  78. def car_out(self, i):
  79. with self.car_be_lock[i]:
  80. if not self.car_be_send_state[i]:
  81. return None
  82. start_time = time.time()
  83. now = time.time()
  84. while not self.car_be[i].state and now - start_time < 10:
  85. time.sleep(0.5)
  86. now = time.time()
  87. if now - start_time < 10:
  88. return None
  89. else:
  90. self.sender.car_go(self.ccmNo_list[i], str(i+1))
  91. self.car_be_send_state[i] = False