data_sender.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. import paho.mqtt.client as mqtt
  2. import json, time, requests
  3. class Sender:
  4. def __init__(self, logger):
  5. self.logger = logger
  6. self.topic = {
  7. 'billet_add': 'trace/performance/billet/add',
  8. 'heat_add': 'trace/performance/converter/add'
  9. }
  10. self.url = {
  11. 'billet_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/billetActual/reportRealTimeBilletBasicInfo',
  12. 'heat_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/heatsActuals/reportRealTimeHeatsActuals'
  13. }
  14. self.heat_tangible_temp = {
  15. 'optype' : 0,
  16. 'casterCode' : '',
  17. 'emptyLadleWeight' : 0.0,
  18. 'fullLadleWeight' : 0.0,
  19. 'grade' : '',
  20. 'spec' : '',
  21. 'heatsCode' : '',
  22. 'firstCutTime' : '',
  23. 'lastCutTime' : '',
  24. 'ladleCode' : '',
  25. 'moltenSteelWeight' : 0.0,
  26. 'startPourTime' : '',
  27. 'stopPourTime' : '',
  28. 'blankOutput' : 0.0,
  29. 'billetSum' : 0
  30. }
  31. self.billet_tangible_temp = {
  32. 'optype' : 0,
  33. 'heatNo' : '',
  34. 'billetNo' : '',
  35. 'grade' : '',
  36. 'ladleNo' : '',
  37. 'ccmNo' : '',
  38. 'strandNo' : '',
  39. 'heatnoIndex' : 0,
  40. 'length' : 0,
  41. 'strandnoIndex' : 0,
  42. 'castingSpeed' : 0.0,
  43. 'actualLength' : 0,
  44. 'spec' : '',
  45. 'width' : 170,
  46. 'thickness' : 170,
  47. 'weight' : 0.0,
  48. 'cutStartTime' : '',
  49. 'cutStopTime' : ''
  50. }
  51. self.mqtt_cli = None
  52. self.http_flag = True
  53. self._cache = {}
  54. self._billet = {}
  55. def set_mqtt_client(self, cli):
  56. self.mqtt_cli = cli
  57. def send(self, purpose, payload, qos=2):
  58. if not isinstance(payload, dict):
  59. self.logger.error('[SENDER]发送数据非dict类型')
  60. raise TypeError(f"Need a dict type but {type(payload)} given.")
  61. return None
  62. self.logger.info(f"[SENDER]使用 {'MQTT'if self.mqtt_cli else ''} {'HTTP'if self.http_flag else ''} 发送数据")
  63. self.logger.debug(f"[SENDER]{purpose}:{payload}")
  64. if self.mqtt_cli:
  65. if not self.mqtt_cli.is_connected:
  66. self.logger.error('[SENDER]MQTT:发送失败,MQTT未连接')
  67. if self.mqtt_cli.publish(self.topic[purpose], json.dumps(payload), qos)[0]:
  68. self.logger.error('[SENDER]MQTT:数据包发送失败')
  69. else:
  70. self.logger.info('[SENDER]MQTT:数据包发送成功')
  71. if self.http_flag:
  72. try:
  73. self.logger.debug('[SENDER]HTTP:'+requests.post(self.url[purpose], json=payload, timeout=2).text)
  74. except:
  75. self.logger.error('[SENDER]HTTP:请求超时')
  76. def begin_pour(self, heat_data, bigbagweight):
  77. tmp = self.heat_tangible_temp.copy()
  78. tmp['optype'] = 1
  79. tmp['casterCode'] = heat_data.get('ccmNo', '')
  80. tmp['emptyLadleWeight'] = bigbagweight - heat_data.get('netWeight', 0)
  81. tmp['fullLadleWeight'] = bigbagweight
  82. tmp['grade'] = heat_data.get('grade', '')
  83. tmp['spec'] = heat_data.get('spec', '')
  84. tmp['heatsCode'] = heat_data.get('heatNo', '')
  85. tmp['ladleCode'] = heat_data.get('ladleNo', '')
  86. tmp['moltenSteelWeight'] = heat_data.get('netWeight', 0)
  87. tmp['startPourTime'] = heat_data.get('sendTime', '')
  88. #此处应存储进数据库,暂时使用缓存处理
  89. self._cache[heat_data.get('heatNo', 'error')] = tmp
  90. self.send('heat_add', tmp)
  91. def end_pour(self, heat_data):
  92. tmp = self._cache[heat_data['heatNo']]
  93. tmp['optype'] = 2
  94. tmp['grade'] = heat_data.get('grade', '')
  95. tmp['spec'] = heat_data.get('spec', '')
  96. tmp['stopPourTime'] = heat_data.get('sendTime', '')
  97. #此处应存储进数据库,暂时使用缓存处理
  98. self._cache[heat_data.get('heatNo', 'error')] = tmp
  99. self.send('heat_add', tmp)
  100. def heat_first(self, heat_data):
  101. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  102. tmp = self._cache[heat_data['heatNo']]
  103. tmp['optype'] = 2
  104. tmp['firstCutTime'] = cuttime
  105. #此处应存储进数据库,暂时使用缓存处理
  106. self._cache[heat_data.get('heatNo', 'error')] = tmp
  107. self.send('heat_add', tmp)
  108. def heat_last(self, heat_data):
  109. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  110. tmp = self._cache[heat_data['heatNo']]
  111. tmp['optype'] = 2
  112. tmp['lastCutTime'] = cuttime
  113. #此处应存储进数据库,暂时使用缓存处理
  114. self._cache[heat_data.get('heatNo', 'error')] = tmp
  115. self.send('heat_add', tmp)
  116. def begin_cut(self, heat_data, billetNo, heatnoIndex, sizing, speed):
  117. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  118. tmp = self.billet_tangible_temp.copy()
  119. tmp['optype'] = 1
  120. tmp['heatNo'] = heat_data.get('heatNo', '')
  121. tmp['billetNo'] = billetNo
  122. tmp['grade'] = heat_data.get('grade', '')
  123. tmp['spec'] = heat_data.get('spec', '')
  124. tmp['ladleNo'] = heat_data.get('ladleNo', '')
  125. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  126. tmp['strandNo'] = billetNo[-3]
  127. tmp['heatnoIndex'] = heatnoIndex
  128. tmp['length'] = sizing
  129. tmp['strandnoIndex'] = int(billetNo[-2:])
  130. tmp['castingSpeed'] = speed
  131. tmp['weight'] = sizing / 1000 * 0.2265
  132. tmp['cutStartTime'] = cuttime
  133. #此处应存储进数据库,暂时使用缓存处理
  134. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  135. self.send('billet_add', tmp)
  136. def end_cut(self, heat_data, heatnoIndex):
  137. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  138. trycount = 4
  139. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  140. while not tmp:
  141. trycount -= 1
  142. time.sleep(0.5)
  143. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  144. if not tmp:
  145. self.logger.error(f"[SENDER]停切:找不到对应的开切数据,炉号:{heat_data['heatNo']},序号:{heatnoIndex}")
  146. return None
  147. tmp['optype'] = 2
  148. tmp['cutStopTime'] = cuttime
  149. #此处应存储进数据库,暂时使用缓存处理
  150. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  151. self.send('billet_add', tmp)
  152. if __name__ == '__main__':
  153. mqttcli = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 'python-mqtt-992-sender_test')
  154. mqttcli.username_pw_set('admin', '123456')
  155. mqttcli.connect('192.168.0.119', 1883)
  156. mqttcli.loop_start()
  157. test = Sender()
  158. test.set_mqtt_client(mqttcli)
  159. testheat = {"netWeight":140.06,"ladleNo":"13","heatNo":"24617099","grade":"微氮铌钢","castState":1,"spec":"上若泰基","ccmNo":"6","sendTime":"2024-11-28 12:57:09"}
  160. test.begin_pour(testheat, 210.0)
  161. time.sleep(1)
  162. test.heat_first(testheat)
  163. time.sleep(1)
  164. test.begin_cut(testheat, "246170996301", 5, 11000, 2.85)
  165. time.sleep(1)
  166. test.end_cut(testheat, 5)
  167. time.sleep(1)
  168. test.end_pour({"netWeight":0.0,"ladleNo":"","heatNo":"24617099","grade":"微氮铌钢","castState":0,"spec":"上若泰基","ccmNo":"6","sendTime":"2024-11-28 13:21:20"})
  169. time.sleep(1)
  170. test.heat_last(testheat)