from utils.s7data import * from models.data_sender import * from utils.statepoint import * import copy class Forward: def __init__(self, data_s7: S7data, sender: Sender, crane_flag: bool, ccmNo: str): self.data_s7 = data_s7 self.sender = sender self.crane_flag = crane_flag self.ccmNo = ccmNo self.billet_position_list = [] self.barrier_state_list = [] for i in range(8): self.billet_position_list.append(data_s7.make_point(f"L{i+1}坯头位置")) self.barrier_state_list.append(data_s7.make_point(f"L{i+1}挡板")) self.pusher = data_s7.make_point("推钢机激光") if self.crane_flag: self.crane_A1 = data_s7.make_point("天车A1位置") self.crane_A2 = data_s7.make_point("天车A2位置") self.crane_A3 = data_s7.make_point("天车A3位置") self.roller_one_sig = Through_state_continues3(data_s7.make_point("爬坡监测点[1]"), data_s7.make_point("爬坡监测点[2]"), data_s7.make_point("爬坡监测点[3]")) self.roller_one_sig.set_excite_action(self.roller_one_hostsend) self.topic = 'trace/performance/billet/monitor' self.qos = 0 self.template = { "type": "", # string 钢坯流道,行车,推钢机,辊道,车辆,堆垛 "data": { "machine": 0, # number 如果是流道的话,哪个铸机 "channel": 0, # number 如果是流道的话,第几流 "direction": "", # string 运行方向 up, down, left, right "distance": 0, # number 运行距离 ,(应该是毫米) "has_billet": False, # boolean 是否夹起钢坯 "position": 0, # number 车辆位置 , 从上到下,从左到右, 1 | 2 | 3 | 4, 行车位置 , 按图中位置,从左到右, A1 | A2 | A3 | B1 | B2 "car_num": "", # string 车牌号 "car_running": "", # string 车辆运行, in 表示车辆到站, out 车辆离开 "billet_stacking": "", # string 堆垛 501 601 602 604 coolbed(步进冷床) "layers": 0, # number 堆垛层数 "amount": 0 # number 堆垛夹数 } } self.task_thread = None self.run_flag = False def billet_position(self, strandNo, position): tmp = copy.deepcopy(self.template) tmp['type'] = 'channel' tmp['data']['machine'] = self.ccmNo tmp['data']['channel'] = strandNo tmp['data']['direction'] = 'down' tmp['data']['distance'] = position self.sender.mqtt_publish(self.topic, tmp, self.qos) def barrier_state(self, strandNo, state): tmp = copy.deepcopy(self.template) tmp['type'] = 'channel_barrier' tmp['data']['machine'] = self.ccmNo tmp['data']['channel'] = strandNo if state: tmp['data']['direction'] = 'down' else: tmp['data']['direction'] = 'up' self.sender.mqtt_publish(self.topic, tmp, self.qos) def pusher_position(self, position): tmp = copy.deepcopy(self.template) tmp['type'] = 'steel_pusher' tmp['data']['machine'] = self.ccmNo tmp['data']['distance'] = position self.sender.mqtt_publish(self.topic, tmp, self.qos) def overhead_crane_position(self, craneNo, position): tmp = copy.deepcopy(self.template) tmp['type'] = 'train_working' tmp['data']['position'] = craneNo tmp['data']['direction'] = 'left' tmp['data']['distance'] = position tmp['data']['has_billet'] = False self.sender.mqtt_publish(self.topic, tmp, self.qos) def roller_one_hostsend(self): tmp = copy.deepcopy(self.template) tmp['type'] = 'roller_one' tmp['data']['has_billet'] = True if self.sender.mqtt_publish(self.topic, tmp, self.qos)[0]: print("发送失败") else: print('发送成功') def auto_forward(self): while self.run_flag: for i in range(8): self.billet_position(i+1, self.billet_position_list[i].data) self.barrier_state(i+1, self.barrier_state_list[i].data) self.pusher_position(self.pusher.data) if self.crane_flag: self.overhead_crane_position('A1', self.crane_A1.data) self.overhead_crane_position('A2', self.crane_A2.data) self.overhead_crane_position('A3', self.crane_A3.data) time.sleep(0.5) def start_auto_forward(self): if self.task_thread: self.run_flag = False self.task_thread.join() self.run_flag = True if self.crane_flag: self.roller_one_sig.allow_update() self.task_thread = threading.Thread(target=self.auto_forward) self.task_thread.start()