mqttdata.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import paho.mqtt.client as mqtt
  2. import json, warnings
  3. from utils.statepoint import *
  4. class MqttClient(mqtt.Client):
  5. def __init__(self, client_id, username=None, password=None, version=mqtt.CallbackAPIVersion.VERSION2):
  6. super().__init__(version, client_id)
  7. if username and password:
  8. self.username_pw_set(username, password)
  9. class Mqttdata:
  10. def __init__(self):
  11. self.logger = None
  12. self.thread = None
  13. self.node_data = {'5#开浇信号': {}, '5#停浇信号': {}, '6#开浇信号': {}, '6#停浇信号': {}}
  14. self.target_from_name = {}
  15. def set_logger(self, logger):
  16. self.logger = logger
  17. def set_mqtt_client(self, cli):
  18. self.cli = cli
  19. self.cli.on_connect = self.on_connect
  20. self.cli.on_message = self.on_message
  21. self.cli.subscribe('data/service/cast/info', qos=2)
  22. def start_auto_update(self):
  23. if self.thread == None:
  24. self.thread = threading.Thread(target=self.cli.loop_forever)
  25. self.thread.start()
  26. def send(self, name):
  27. if name in self.target_from_name:
  28. for i in self.target_from_name[name]:
  29. i.inject(self.node_data[name])
  30. timer = threading.Timer(5, lambda i=i: i.set_state(False))
  31. timer.start()
  32. def on_subscribe(self, client, userdata, mid, reason_code_list, properties):
  33. if reason_code_list[0].is_failure:
  34. warnings.warn(f"Broker rejected you subscription: {reason_code_list[0]}")
  35. if self.logger:
  36. self.logger.error(f"Broker rejected you subscription: {reason_code_list[0]}")
  37. else:
  38. if self.logger:
  39. self.logger.info(f"Broker granted the following QoS: {reason_code_list[0].value}")
  40. def on_message(self, client, userdata, message):
  41. # logger.debug(message.payload.decode())
  42. data = json.loads(message.payload.decode())
  43. if 'ccmNo' not in data or 'castState' not in data:
  44. warnings.warn('[MES]MQTT报文格式错误')
  45. if self.logger:
  46. self.logger.error('[MES]MQTT报文格式错误')
  47. return None
  48. if int(data['ccmNo']) == 6:
  49. if data['castState'] and data != self.node_data['6#开浇信号']:
  50. self.node_data['6#开浇信号'] = data
  51. self.send('6#开浇信号')
  52. elif not data['castState'] and data != self.node_data['6#停浇信号']:
  53. self.node_data['6#停浇信号'] = data
  54. self.send('6#停浇信号')
  55. elif int(data['ccmNo']) == 5:
  56. if data['castState'] and data != self.node_data['5#开浇信号']:
  57. self.node_data['5#开浇信号'] = data
  58. self.send('5#开浇信号')
  59. elif not data['castState'] and data != self.node_data['5#停浇信号']:
  60. self.node_data['5#停浇信号'] = data
  61. self.send('5#停浇信号')
  62. else:
  63. warnings.warn('[MES]MQTT收到未知铸机号')
  64. if self.logger:
  65. self.logger.error('[MES]MQTT收到未知铸机号')
  66. def on_connect(self, client, userdata, flags, reason_code, properties):
  67. if reason_code.is_failure:
  68. warnings.warn(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
  69. if self.logger:
  70. self.logger.error(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
  71. else:
  72. if self.logger:
  73. self.logger.info("MQTT connection succeeded")
  74. def make_point(self, name):
  75. if name not in self.node_data:
  76. raise ValueError("创建了未配置的点")
  77. if name not in self.target_from_name:
  78. self.target_from_name[name] = []
  79. res = Statepoint()
  80. self.target_from_name[name].append(res)
  81. return res