import paho.mqtt.client as mqtt import json, warnings from utils.statepoint import * class MqttClient(mqtt.Client): def __init__(self, client_id, username=None, password=None, version=mqtt.CallbackAPIVersion.VERSION2): super().__init__(version, client_id) if username and password: self.username_pw_set(username, password) class Mqttdata: def __init__(self): self.logger = None self.thread = None self.node_data = {'5#开浇信号': {}, '5#停浇信号': {}, '6#开浇信号': {}, '6#停浇信号': {}} self.target_from_name = {} def set_logger(self, logger): self.logger = logger def set_mqtt_client(self, cli): self.cli = cli self.cli.on_connect = self.on_connect self.cli.on_message = self.on_message self.cli.subscribe('data/service/cast/info', qos=2) def start_auto_update(self): if self.thread == None: self.thread = threading.Thread(target=self.cli.loop_forever) self.thread.start() def send(self, name): if name in self.target_from_name: for i in self.target_from_name[name]: i.inject(self.node_data[name]) timer = threading.Timer(5, lambda i=i: i.set_state(False)) timer.start() def on_subscribe(self, client, userdata, mid, reason_code_list, properties): if reason_code_list[0].is_failure: warnings.warn(f"Broker rejected you subscription: {reason_code_list[0]}") if self.logger: self.logger.error(f"Broker rejected you subscription: {reason_code_list[0]}") else: if self.logger: self.logger.info(f"Broker granted the following QoS: {reason_code_list[0].value}") def on_message(self, client, userdata, message): # logger.debug(message.payload.decode()) data = json.loads(message.payload.decode()) if 'ccmNo' not in data or 'castState' not in data: warnings.warn('[MES]MQTT报文格式错误') if self.logger: self.logger.error('[MES]MQTT报文格式错误') return None if int(data['ccmNo']) == 6: if data['castState'] and data != self.node_data['6#开浇信号']: self.node_data['6#开浇信号'] = data self.send('6#开浇信号') elif not data['castState'] and data != self.node_data['6#停浇信号']: self.node_data['6#停浇信号'] = data self.send('6#停浇信号') elif int(data['ccmNo']) == 5: if data['castState'] and data != self.node_data['5#开浇信号']: self.node_data['5#开浇信号'] = data self.send('5#开浇信号') elif not data['castState'] and data != self.node_data['5#停浇信号']: self.node_data['5#停浇信号'] = data self.send('5#停浇信号') else: warnings.warn('[MES]MQTT收到未知铸机号') if self.logger: self.logger.error('[MES]MQTT收到未知铸机号') def on_connect(self, client, userdata, flags, reason_code, properties): if reason_code.is_failure: warnings.warn(f"Failed to connect: {reason_code}. loop_forever() will retry connection") if self.logger: self.logger.error(f"Failed to connect: {reason_code}. loop_forever() will retry connection") else: if self.logger: self.logger.info("MQTT connection succeeded") def make_point(self, name): if name not in self.node_data: raise ValueError("创建了未配置的点") if name not in self.target_from_name: self.target_from_name[name] = [] res = Statepoint() self.target_from_name[name].append(res) return res