import time, threading from utils.s7data import S7data from utils.logger import Logger from models.billet_trace_pusher import Trace_pusher from models.parking import Parking from models.data_sender import Sender class Crane: def __init__(self, data_s7: S7data, tracer_5: Trace_pusher, tracer_6: Trace_pusher, parking: Parking, sender: Sender, logger: Logger): self.data = data_s7 self.tracer_5 = tracer_5 self.tracer_6 = tracer_6 self.parking = parking self.sender = sender self.logger = logger self.cranes = { "A1": self.data.make_point("天车A1位置"), "A2": self.data.make_point("天车A2位置"), "A3": self.data.make_point("天车A3位置") } self.billetsNo = { "A1": "", "A2": "", "A3": "" } self.billets_from = { "A1": "", "A2": "", "A3": "" } self.areas = { "6#小冷床(左)": [35000, 38500], "6#小冷床(右)": [22000, 24000], "车位3": [2000, 4000], "高线辊道": [60300, 62300], "601堆垛": [10000, 14000] } self.threads_poor = [ threading.Thread(target=self.area_producer, args=("6#小冷床(左)",)), threading.Thread(target=self.area_producer, args=("6#小冷床(右)",)), threading.Thread(target=self.area_consumer, args=("车位3",)), threading.Thread(target=self.area_consumer, args=("高线辊道",)), threading.Thread(target=self.area_consumer, args=("601堆垛",)) ] for i in self.threads_poor: i.start() def scan_crane(self, areaNo): start = self.areas[areaNo][0] end = self.areas[areaNo][1] while True: for key, value in self.cranes.items(): if start <= value.data <= end: return key time.sleep(0.5) def crane_leave(self, craneNo, areaNo): start = self.areas[areaNo][0] end = self.areas[areaNo][1] wait_time = 0 while True: if self.cranes[craneNo].data < start or self.cranes[craneNo].data > end: return True if wait_time >= 10: return False time.sleep(0.5) wait_time += 0.5 def area_producer(self, areaNo): while True: craneNo = self.scan_crane(areaNo) self.logger.debug(f"天车{craneNo}进入区域{areaNo}") if self.crane_leave(craneNo, areaNo): self.logger.debug(f"天车{craneNo}离开区域{areaNo}") continue if self.billetsNo[craneNo] == "": self.logger.debug(f"天车{craneNo}正在等待或尝试夹起钢坯") if self.get_billet(craneNo, areaNo): self.logger.debug(f"天车{craneNo}获取到钢坯") else: self.logger.debug(f"天车{craneNo}未获取到钢坯直接离开") else: self.logger.debug(f"天车{craneNo}有未卸下的钢坯") while not self.crane_leave(craneNo, areaNo): pass self.logger.debug(f"天车{craneNo}离开区域{areaNo}") time.sleep(1) def area_consumer(self, areaNo): while True: craneNo = self.scan_crane(areaNo) self.logger.debug(f"天车{craneNo}进入区域{areaNo}") if self.crane_leave(craneNo, areaNo): self.logger.debug(f"天车{craneNo}离开区域{areaNo}") continue if self.billetsNo[craneNo] == "": self.logger.debug(f"天车{craneNo}未携带钢坯") else: self.logger.debug(f"天车{craneNo}正在等待或尝试卸下钢坯") self.unload_billet(craneNo, areaNo) while not self.crane_leave(craneNo, areaNo): pass self.logger.debug(f"天车{craneNo}离开区域{areaNo}") time.sleep(1) def get_billet(self, craneNo, areaNo): # 阻塞获取 start = self.areas[areaNo][0] end = self.areas[areaNo][1] tmp = [] while not bool(tmp): if self.cranes[craneNo].data < start or self.cranes[craneNo].data > end: return False if areaNo == "6#小冷床(左)": tmp = self.tracer_6.get_billet("left") elif areaNo == "6#小冷床(右)": tmp = self.tracer_6.get_billet("right") self.billetsNo[craneNo] = ','.join(tmp) self.billets_from[craneNo] = areaNo return tmp def unload_billet(self, craneNo, areaNo): # 直接放下 tmp = self.billetsNo[craneNo] self.billetsNo[craneNo] = "" self.billets_from[craneNo] = "" if areaNo == "车位3": self.logger.info(f"有一夹子钢坯放置进了车位3") self.sender.car_save(self.parking.ccmNo_list[2], tmp, self.parking.current_car[2], craneNo, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), self.billets_from[craneNo], areaNo) elif areaNo == "高线辊道": self.logger.info(f"有一夹子钢坯放置进了高线辊道") self.sender.host_send("6", tmp, "高线", craneNo, self.billets_from[craneNo]) elif areaNo == "601堆垛": self.logger.info(f"有一夹子钢坯放置进了601堆垛")