mqttdata.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import paho.mqtt.client as mqtt
  2. import json, warnings
  3. from utils.statepoint import *
  4. class MqttClient(mqtt.Client):
  5. def __init__(self, client_id, username=None, password=None, version=mqtt.CallbackAPIVersion.VERSION2):
  6. super().__init__(version, client_id)
  7. if username and password:
  8. self.username_pw_set(username, password)
  9. class Mqttdata:
  10. def __init__(self):
  11. self.logger = None
  12. self.thread = None
  13. self.node_data = {'5#开浇信号': {}, '5#停浇信号': {}, '6#开浇信号': {}, '6#停浇信号': {}, '5#手动换炉': {}, '6#手动换炉': {}}
  14. self.target_from_name = {}
  15. def set_logger(self, logger):
  16. self.logger = logger
  17. def set_mqtt_client(self, cli):
  18. self.cli = cli
  19. self.cli.on_connect = self.on_connect
  20. self.cli.on_message = self.on_message
  21. self.cli.subscribe('data/service/cast/info', qos=2)
  22. self.cli.subscribe('syn/pushbillethotsend/nexthosend', qos=2)
  23. def start_auto_update(self):
  24. if self.thread == None:
  25. self.thread = threading.Thread(target=self.cli.loop_forever)
  26. self.thread.start()
  27. def send(self, name):
  28. if name in self.target_from_name:
  29. for i in self.target_from_name[name]:
  30. i.inject(self.node_data[name])
  31. timer = threading.Timer(5, lambda i=i: i.set_state(False))
  32. timer.start()
  33. def on_subscribe(self, client, userdata, mid, reason_code_list, properties):
  34. if reason_code_list[0].is_failure:
  35. warnings.warn(f"Broker rejected you subscription: {reason_code_list[0]}")
  36. if self.logger:
  37. self.logger.error(f"Broker rejected you subscription: {reason_code_list[0]}")
  38. else:
  39. if self.logger:
  40. self.logger.info(f"Broker granted the following QoS: {reason_code_list[0].value}")
  41. def on_message(self, client, userdata, message):
  42. # logger.debug(message.payload.decode())
  43. topic = message.topic
  44. if topic == 'syn/pushbillethotsend/nexthosend':
  45. data = json.loads(message.payload.decode())
  46. if 'ccmNo' not in data:
  47. warnings.warn('[MES]MQTT报文格式错误')
  48. if self.logger:
  49. self.logger.error('[MES]MQTT报文格式错误')
  50. return None
  51. if int(data['ccmNo']) == 6:
  52. if data != self.node_data['6#手动换炉']:
  53. self.node_data['6#手动换炉'] = data
  54. self.send('6#手动换炉')
  55. elif int(data['ccmNo']) == 5:
  56. if data != self.node_data['5#手动换炉']:
  57. self.node_data['5#手动换炉'] = data
  58. self.send('5#手动换炉')
  59. else:
  60. warnings.warn('[MES]MQTT收到未知铸机号')
  61. if self.logger:
  62. self.logger.error('[MES]MQTT收到未知铸机号')
  63. elif topic == 'data/service/cast/info':
  64. data = json.loads(message.payload.decode())
  65. if 'ccmNo' not in data or 'castState' not in data:
  66. warnings.warn('[MES]MQTT报文格式错误')
  67. if self.logger:
  68. self.logger.error('[MES]MQTT报文格式错误')
  69. return None
  70. if int(data['ccmNo']) == 6:
  71. if data['castState'] and data != self.node_data['6#开浇信号']:
  72. self.node_data['6#开浇信号'] = data
  73. self.send('6#开浇信号')
  74. elif not data['castState'] and data != self.node_data['6#停浇信号']:
  75. self.node_data['6#停浇信号'] = data
  76. self.send('6#停浇信号')
  77. elif int(data['ccmNo']) == 5:
  78. if data['castState'] and data != self.node_data['5#开浇信号']:
  79. self.node_data['5#开浇信号'] = data
  80. self.send('5#开浇信号')
  81. elif not data['castState'] and data != self.node_data['5#停浇信号']:
  82. self.node_data['5#停浇信号'] = data
  83. self.send('5#停浇信号')
  84. else:
  85. warnings.warn('[MES]MQTT收到未知铸机号')
  86. if self.logger:
  87. self.logger.error('[MES]MQTT收到未知铸机号')
  88. def on_connect(self, client, userdata, flags, reason_code, properties):
  89. if reason_code.is_failure:
  90. warnings.warn(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
  91. if self.logger:
  92. self.logger.error(f"Failed to connect: {reason_code}. loop_forever() will retry connection")
  93. else:
  94. client.subscribe('data/service/cast/info', qos=2)
  95. client.subscribe('syn/pushbillethotsend/nexthosend', qos=2)
  96. if self.logger:
  97. self.logger.info("MQTT connection succeeded")
  98. def make_point(self, name):
  99. if name not in self.node_data:
  100. raise ValueError("创建了未配置的点")
  101. if name not in self.target_from_name:
  102. self.target_from_name[name] = []
  103. res = Statepoint()
  104. self.target_from_name[name].append(res)
  105. return res