data_sender.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. import paho.mqtt.client as mqtt
  2. import json, time, requests
  3. class Sender:
  4. def __init__(self, logger):
  5. self.logger = logger
  6. self.topic = {
  7. 'billet_add': 'trace/performance/billet/add',
  8. 'heat_add': 'trace/performance/converter/add',
  9. 'billet_union': 'trace/billet/billetAssemblyNumber/add'
  10. }
  11. self.url = {
  12. 'billet_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/billetActual/reportRealTimeBilletBasicInfo',
  13. 'heat_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/heatsActuals/reportRealTimeHeatsActuals'
  14. }
  15. self.heat_tangible_temp = {
  16. 'optype' : 0,
  17. 'casterCode' : '',
  18. 'emptyLadleWeight' : 0.0,
  19. 'fullLadleWeight' : 0.0,
  20. 'grade' : '',
  21. 'spec' : '',
  22. 'heatsCode' : '',
  23. 'firstCutTime' : '',
  24. 'lastCutTime' : '',
  25. 'ladleCode' : '',
  26. 'moltenSteelWeight' : 0.0,
  27. 'startPourTime' : '',
  28. 'stopPourTime' : '',
  29. 'blankOutput' : 0.0,
  30. 'billetSum' : 0
  31. }
  32. self.billet_tangible_temp = {
  33. 'optype' : 0,
  34. 'heatNo' : '',
  35. 'billetNo' : '',
  36. 'grade' : '',
  37. 'ladleNo' : '',
  38. 'ccmNo' : '',
  39. 'strandNo' : '',
  40. 'heatnoIndex' : 0,
  41. 'length' : 0,
  42. 'strandnoIndex' : 0,
  43. 'castingSpeed' : 0.0,
  44. 'actualLength' : 0,
  45. 'spec' : '',
  46. 'width' : 170,
  47. 'thickness' : 170,
  48. 'weight' : 0.0,
  49. 'cutStartTime' : '',
  50. 'cutStopTime' : '',
  51. 'assemblyNumber' : ''
  52. }
  53. self.billet_union_temp = {
  54. "ccmNo": 0, # 铸机号 int数值类型
  55. "heatNo": "", # 炉号 字符串类型
  56. "billetsNo": "", # 坯号集合 字符串类型 逗号隔开
  57. "assemblyNumber": "", # 组坯号 字符串类型
  58. "assemblyTime": "", # 组坯时间 Date日期类型
  59. "length": 0, # 定尺 int数值类型
  60. "billetsNum": 0, # 钢坯数量 int数值类型
  61. "billetWeight": 0 # 钢坯坯重 double高精度数值类型
  62. }
  63. self.mqtt_cli = None
  64. self.http_flag = True
  65. self._cache = {}
  66. self._billet = {}
  67. def set_mqtt_client(self, cli):
  68. self.mqtt_cli = cli
  69. def send(self, purpose, payload, qos=2):
  70. if not isinstance(payload, dict):
  71. self.logger.error('[SENDER]发送数据非dict类型')
  72. raise TypeError(f"Need a dict type but {type(payload)} given.")
  73. return None
  74. self.logger.info(f"[SENDER]使用 {'MQTT'if self.mqtt_cli else ''} {'HTTP'if self.http_flag else ''} 发送数据")
  75. self.logger.debug(f"[SENDER]{purpose}:{payload}")
  76. if self.mqtt_cli and purpose in self.topic:
  77. if not self.mqtt_cli.is_connected:
  78. self.logger.error('[SENDER]MQTT:发送失败,MQTT未连接')
  79. if self.mqtt_cli.publish(self.topic[purpose], json.dumps(payload), qos)[0]:
  80. self.logger.error('[SENDER]MQTT:数据包发送失败')
  81. else:
  82. self.logger.info('[SENDER]MQTT:数据包发送成功')
  83. if self.http_flag and purpose in self.url:
  84. try:
  85. self.logger.debug('[SENDER]HTTP:'+requests.post(self.url[purpose], json=payload, timeout=2).text)
  86. except:
  87. self.logger.error('[SENDER]HTTP:请求超时')
  88. def begin_pour(self, heat_data, bigbagweight):
  89. tmp = self.heat_tangible_temp.copy()
  90. tmp['optype'] = 1
  91. tmp['casterCode'] = heat_data.get('ccmNo', '')
  92. tmp['emptyLadleWeight'] = bigbagweight - heat_data.get('netWeight', 0)
  93. tmp['fullLadleWeight'] = bigbagweight
  94. tmp['grade'] = heat_data.get('grade', '')
  95. tmp['spec'] = heat_data.get('spec', '')
  96. tmp['heatsCode'] = heat_data.get('heatNo', '')
  97. tmp['ladleCode'] = heat_data.get('ladleNo', '')
  98. tmp['moltenSteelWeight'] = heat_data.get('netWeight', 0)
  99. tmp['startPourTime'] = heat_data.get('sendTime', '')
  100. #此处应存储进数据库,暂时使用缓存处理
  101. self._cache[heat_data.get('heatNo', 'error')] = tmp
  102. self.send('heat_add', tmp)
  103. def end_pour(self, heat_data):
  104. if heat_data['heatNo'] in self._cache:
  105. tmp = self._cache[heat_data['heatNo']]
  106. tmp['optype'] = 2
  107. tmp['grade'] = heat_data.get('grade', '')
  108. tmp['spec'] = heat_data.get('spec', '')
  109. tmp['stopPourTime'] = heat_data.get('sendTime', '')
  110. #此处应存储进数据库,暂时使用缓存处理
  111. self._cache[heat_data.get('heatNo', 'error')] = tmp
  112. self.send('heat_add', tmp)
  113. else:
  114. self.logger.error(f"[SENDER]停浇:找不到对应的开浇数据,炉号:{heat_data['heatNo']}")
  115. def heat_first(self, heat_data, cuttime = None):
  116. if not cuttime:
  117. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  118. tmp = self._cache[heat_data['heatNo']]
  119. tmp['optype'] = 2
  120. tmp['firstCutTime'] = cuttime
  121. #此处应存储进数据库,暂时使用缓存处理
  122. self._cache[heat_data.get('heatNo', 'error')] = tmp
  123. self.send('heat_add', tmp)
  124. def heat_last(self, heat_data, cuttime = None):
  125. if not cuttime:
  126. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  127. tmp = self._cache[heat_data['heatNo']]
  128. tmp['optype'] = 2
  129. tmp['lastCutTime'] = cuttime
  130. #此处应存储进数据库,暂时使用缓存处理
  131. self._cache[heat_data.get('heatNo', 'error')] = tmp
  132. self.send('heat_add', tmp)
  133. def begin_cut(self, heat_data, billetNo, heatnoIndex, sizing, speed):
  134. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  135. tmp = self.billet_tangible_temp.copy()
  136. tmp['optype'] = 1
  137. tmp['heatNo'] = heat_data.get('heatNo', '')
  138. tmp['billetNo'] = billetNo
  139. tmp['grade'] = heat_data.get('grade', '')
  140. tmp['spec'] = heat_data.get('spec', '')
  141. tmp['ladleNo'] = heat_data.get('ladleNo', '')
  142. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  143. tmp['strandNo'] = billetNo[-3]
  144. tmp['heatnoIndex'] = heatnoIndex
  145. tmp['length'] = sizing
  146. tmp['strandnoIndex'] = int(billetNo[-2:])
  147. tmp['castingSpeed'] = speed
  148. tmp['weight'] = sizing / 1000 * 0.2265
  149. tmp['cutStartTime'] = cuttime
  150. #此处应存储进数据库,暂时使用缓存处理
  151. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  152. self.send('billet_add', tmp)
  153. def end_cut(self, heat_data, heatnoIndex):
  154. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  155. trycount = 4
  156. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  157. while not tmp:
  158. trycount -= 1
  159. time.sleep(0.5)
  160. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  161. if not tmp:
  162. self.logger.error(f"[SENDER]停切:找不到对应的开切数据,炉号:{heat_data['heatNo']},序号:{heatnoIndex}")
  163. return None
  164. tmp['optype'] = 2
  165. tmp['cutStopTime'] = cuttime
  166. #此处应存储进数据库,暂时使用缓存处理
  167. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  168. self.send('billet_add', tmp)
  169. def billet_upload(self, heat_data, billetNo, heatnoIndex, sizing, speed, starttime, stoptime, unionNo):
  170. tmp = self.billet_tangible_temp.copy()
  171. tmp['optype'] = 1
  172. tmp['heatNo'] = heat_data.get('heatNo', '')
  173. tmp['billetNo'] = billetNo
  174. tmp['grade'] = heat_data.get('grade', '')
  175. tmp['spec'] = heat_data.get('spec', '')
  176. tmp['ladleNo'] = heat_data.get('ladleNo', '')
  177. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  178. tmp['strandNo'] = billetNo[-3]
  179. tmp['heatnoIndex'] = heatnoIndex
  180. tmp['length'] = sizing
  181. tmp['strandnoIndex'] = int(billetNo[-2:])
  182. tmp['castingSpeed'] = speed
  183. tmp['weight'] = sizing / 1000 * 0.2265
  184. tmp['cutStartTime'] = starttime
  185. tmp['cutStopTime'] = stoptime
  186. tmp['assemblyNumber'] = unionNo
  187. #此处应存储进数据库
  188. self.send('billet_add', tmp)
  189. def billet_union(self, heat_data, unionNo, billetNos, sizing):
  190. tmp = self.billet_union_temp.copy()
  191. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  192. tmp['heatNo'] = heat_data.get('heatNo', '')
  193. tmp['billetsNo'] = ','.join(billetNos)
  194. tmp['assemblyNumber'] = unionNo
  195. tmp['assemblyTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  196. tmp['length'] = sizing
  197. tmp['billetsNum'] = len(billetNos)
  198. tmp['billetWeight'] = sizing / 1000 * 0.2265 * tmp['billetsNum']
  199. #此处应存储进数据库
  200. self.send('billet_union', tmp)
  201. if __name__ == '__main__':
  202. mqttcli = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 'python-mqtt-992-sender_test')
  203. mqttcli.username_pw_set('admin', '123456')
  204. mqttcli.connect('192.168.0.119', 1883)
  205. mqttcli.loop_start()
  206. test = Sender()
  207. test.set_mqtt_client(mqttcli)
  208. testheat = {"netWeight":140.06,"ladleNo":"13","heatNo":"24617099","grade":"微氮铌钢","castState":1,"spec":"上若泰基","ccmNo":"6","sendTime":"2024-11-28 12:57:09"}
  209. test.begin_pour(testheat, 210.0)
  210. time.sleep(1)
  211. test.heat_first(testheat)
  212. time.sleep(1)
  213. test.begin_cut(testheat, "246170996301", 5, 11000, 2.85)
  214. time.sleep(1)
  215. test.end_cut(testheat, 5)
  216. time.sleep(1)
  217. test.end_pour({"netWeight":0.0,"ladleNo":"","heatNo":"24617099","grade":"微氮铌钢","castState":0,"spec":"上若泰基","ccmNo":"6","sendTime":"2024-11-28 13:21:20"})
  218. time.sleep(1)
  219. test.heat_last(testheat)