mqttdata.py 3.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import paho.mqtt.client as mqtt
  2. import json, warnings
  3. from utils.statepoint import *
  4. class MqttClient(mqtt.Client):
  5. def __init__(self, client_id, username=None, password=None, version=mqtt.CallbackAPIVersion.VERSION2):
  6. super().__init__(version, client_id)
  7. if username and password:
  8. self.username_pw_set(username, password)
  9. class Mqttdata:
  10. def __init__(self):
  11. self.logger = None
  12. self.thread = None
  13. self.node_data = {'5#开浇信号': {}, '5#停浇信号': {}, '6#开浇信号': {}, '6#停浇信号': {}}
  14. self.target_from_name = {}
  15. def set_logger(self, logger):
  16. self.logger = logger
  17. def set_mqtt_client(self, cli):
  18. self.cli = cli
  19. self.cli.on_connect = self.on_connect
  20. self.cli.on_message = self.on_message
  21. self.cli.subscribe('data/service/cast/info', qos=2)
  22. def start_auto_update(self):
  23. if self.thread == None:
  24. self.thread = threading.Thread(target=self.cli.loop_forever)
  25. self.thread.start()
  26. def send(self, name):
  27. if name in self.target_from_name:
  28. for i in self.target_from_name[name]:
  29. i.inject(self.node_data[name])
  30. timer = threading.Timer(5, lambda i=i: i.set_state(False))
  31. timer.start()
  32. def on_subscribe(self, client, userdata, mid, reason_code_list, properties):
  33. if reason_code_list[0].is_failure:
  34. warnings.warn(f"Broker rejected you subscription: {reason_code_list[0]}")
  35. if self.logger:
  36. self.logger.error(f"Broker rejected you subscription: {reason_code_list[0]}")
  37. else:
  38. if self.logger:
  39. self.logger.info(f"Broker granted the following QoS: {reason_code_list[0].value}")
  40. def on_message(self, client, userdata, message):
  41. # logger.debug(message.payload.decode())
  42. data = json.loads(message.payload.decode())
  43. if 'ccmNo' not in data or 'castState' not in data:
  44. warnings.warn('[MES]MQTT报文格式错误')
  45. if self.logger:
  46. self.logger.error('[MES]MQTT报文格式错误')
  47. return None
  48. if int(data['ccmNo']) == 6:
  49. if data['castState'] and data != self.node_data['6#开浇信号']:
  50. self.node_data['6#开浇信号'] = data
  51. self.send('6#开浇信号')
  52. elif not data['castState'] and data != self.node_data['6#停浇信号']:
  53. self.node_data['6#停浇信号'] = data
  54. self.send('6#停浇信号')
  55. elif int(data['ccmNo']) == 5:
  56. if data['castState'] and data != self.node_data['5#开浇信号']:
  57. self.node_data['5#开浇信号'] = data
  58. self.send('5#开浇信号')
  59. elif not data['castState'] and data != self.node_data['5#停浇信号']:
  60. self.node_data['5#停浇信号'] = data
  61. self.send('5#停浇信号')
  62. else:
  63. warnings.warn('[MES]MQTT收到未知铸机号')
  64. if self.logger:
  65. self.logger.error('[MES]MQTT收到未知铸机号')
  66. def on_connect(self, client, userdata, flags, reason_code, properties):
  67. if reason_code.is_failure:
  68. warnings.warn(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
  69. if self.logger:
  70. self.logger.error(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
  71. else:
  72. client.subscribe('data/service/cast/info', qos=2)
  73. if self.logger:
  74. self.logger.info("MQTT connection succeeded")
  75. def make_point(self, name):
  76. if name not in self.node_data:
  77. raise ValueError("创建了未配置的点")
  78. if name not in self.target_from_name:
  79. self.target_from_name[name] = []
  80. res = Statepoint()
  81. self.target_from_name[name].append(res)
  82. return res