data_sender.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. import paho.mqtt.client as mqtt
  2. import json, time, requests
  3. class Sender:
  4. def __init__(self, logger):
  5. self.logger = logger
  6. self.topic = {
  7. 'billet_add': 'trace/performance/billet/add',
  8. 'heat_add': 'trace/performance/converter/add',
  9. 'billet_union': 'trace/billet/billetAssemblyNumber/add',
  10. 'host_send': 'syn/billetHotsendBase/save',
  11. 'car_add': 'syn/storageBill/add',
  12. 'car_save': 'syn/billetHotsendBase/shipp/save',
  13. 'car_go': 'syn/billetHotsendBase/shipp/save/depart'
  14. }
  15. self.url = {
  16. 'billet_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/billetActual/reportRealTimeBilletBasicInfo',
  17. 'heat_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/heatsActuals/reportRealTimeHeatsActuals',
  18. 'host_send': '/billetHotsendBase/billetHotsendBase/add',
  19. 'car_add': '/storageBill/add',
  20. 'car_save': '/billetHotsendBase/billetHotsendBase/add',
  21. 'car_go': '/billetHotsendBase/billetHotsendBase/rodLineDepart'
  22. }
  23. self.heat_tangible_temp = {
  24. 'optype' : 0,
  25. 'casterCode' : '',
  26. 'emptyLadleWeight' : 0.0,
  27. 'fullLadleWeight' : 0.0,
  28. 'grade' : '',
  29. 'spec' : '',
  30. 'heatsCode' : '',
  31. 'firstCutTime' : '',
  32. 'lastCutTime' : '',
  33. 'ladleCode' : '',
  34. 'moltenSteelWeight' : 0.0,
  35. 'startPourTime' : '',
  36. 'stopPourTime' : '',
  37. 'blankOutput' : 0.0,
  38. 'billetSum' : 0
  39. }
  40. self.billet_tangible_temp = {
  41. 'optype' : 0,
  42. 'heatNo' : '',
  43. 'billetNo' : '',
  44. 'grade' : '',
  45. 'ladleNo' : '',
  46. 'ccmNo' : '',
  47. 'strandNo' : '',
  48. 'heatnoIndex' : 0,
  49. 'length' : 0,
  50. 'strandnoIndex' : 0,
  51. 'castingSpeed' : 0.0,
  52. 'actualLength' : 0,
  53. 'spec' : '',
  54. 'width' : 170,
  55. 'thickness' : 170,
  56. 'weight' : 0.0,
  57. 'cutStartTime' : '',
  58. 'cutStopTime' : '',
  59. 'assemblyNumber' : ''
  60. }
  61. self.billet_union_temp = {
  62. "ccmNo": 0, # 铸机号 int数值类型
  63. "heatNo": "", # 炉号 字符串类型
  64. "billetsNo": "", # 坯号集合 字符串类型 逗号隔开
  65. "assemblyNumber": "", # 组坯号 字符串类型
  66. "assemblyTime": "", # 组坯时间 Date日期类型
  67. "length": 0, # 定尺 int数值类型
  68. "billetsNum": 0, # 钢坯数量 int数值类型
  69. "billetWeight": 0 # 钢坯坯重 double高精度数值类型
  70. }
  71. self.host_send_temp = {
  72. "ccmNo": "",
  73. "billetNos": "",
  74. "billetHotsendTypeConfigId": "",
  75. "vehicleNumber": "",
  76. "liftingTime": "",
  77. "location": ""
  78. }
  79. self.car_add_temp = {
  80. "ccmNo": "",
  81. "licensePlate": "",
  82. "billetHotsendTypeConfigId": "",
  83. "positionNum": ""
  84. }
  85. self.car_save_temp = {
  86. "ccmNo": "",
  87. "billetNos": "",
  88. "billetHotsendTypeConfigId": "",
  89. "licensePlate": "",
  90. "vehicleNumber": "",
  91. "liftingTime": "",
  92. "location": ""
  93. }
  94. self.car_go_temp = {
  95. "ccmNo": "",
  96. "licensePlate": ""
  97. }
  98. self.mqtt_cli = None
  99. self.http_flag = True
  100. self._cache = {}
  101. self._billet = {}
  102. def set_mqtt_client(self, cli):
  103. self.mqtt_cli = cli
  104. def send(self, purpose, payload, qos=2):
  105. if not isinstance(payload, dict):
  106. self.logger.error('[SENDER]发送数据非dict类型')
  107. raise TypeError(f"Need a dict type but {type(payload)} given.")
  108. return None
  109. self.logger.info(f"[SENDER]使用 {'MQTT'if self.mqtt_cli else ''} {'HTTP'if self.http_flag else ''} 发送数据")
  110. self.logger.debug(f"[SENDER]{purpose}:{payload}")
  111. if self.mqtt_cli and purpose in self.topic:
  112. if not self.mqtt_cli.is_connected:
  113. self.logger.error('[SENDER]MQTT:发送失败,MQTT未连接')
  114. if self.mqtt_cli.publish(self.topic[purpose], json.dumps(payload), qos)[0]:
  115. self.logger.error('[SENDER]MQTT:数据包发送失败')
  116. else:
  117. self.logger.info('[SENDER]MQTT:数据包发送成功')
  118. if self.http_flag and purpose in self.url:
  119. try:
  120. self.logger.debug('[SENDER]HTTP:'+requests.post(self.url[purpose], json=payload, timeout=2).text)
  121. except:
  122. self.logger.error('[SENDER]HTTP:请求超时')
  123. def begin_pour(self, heat_data, bigbagweight):
  124. tmp = self.heat_tangible_temp.copy()
  125. tmp['optype'] = 1
  126. tmp['casterCode'] = heat_data.get('ccmNo', '')
  127. tmp['emptyLadleWeight'] = bigbagweight - heat_data.get('netWeight', 0)
  128. tmp['fullLadleWeight'] = bigbagweight
  129. tmp['grade'] = heat_data.get('grade', '')
  130. tmp['spec'] = heat_data.get('spec', '')
  131. tmp['heatsCode'] = heat_data.get('heatNo', '')
  132. tmp['ladleCode'] = heat_data.get('ladleNo', '')
  133. tmp['moltenSteelWeight'] = heat_data.get('netWeight', 0)
  134. tmp['startPourTime'] = heat_data.get('sendTime', '')
  135. #此处应存储进数据库,暂时使用缓存处理
  136. self._cache[heat_data.get('heatNo', 'error')] = tmp
  137. self.send('heat_add', tmp)
  138. def end_pour(self, heat_data):
  139. if heat_data['heatNo'] in self._cache:
  140. tmp = self._cache[heat_data['heatNo']]
  141. tmp['optype'] = 2
  142. tmp['grade'] = heat_data.get('grade', '')
  143. tmp['spec'] = heat_data.get('spec', '')
  144. tmp['stopPourTime'] = heat_data.get('sendTime', '')
  145. #此处应存储进数据库,暂时使用缓存处理
  146. self._cache[heat_data.get('heatNo', 'error')] = tmp
  147. self.send('heat_add', tmp)
  148. else:
  149. self.logger.error(f"[SENDER]停浇:找不到对应的开浇数据,炉号:{heat_data['heatNo']}")
  150. def heat_first(self, heat_data, cuttime = None):
  151. if not cuttime:
  152. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  153. tmp = self._cache[heat_data['heatNo']]
  154. tmp['optype'] = 2
  155. tmp['firstCutTime'] = cuttime
  156. #此处应存储进数据库,暂时使用缓存处理
  157. self._cache[heat_data.get('heatNo', 'error')] = tmp
  158. self.send('heat_add', tmp)
  159. def heat_last(self, heat_data, cuttime = None):
  160. if not cuttime:
  161. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  162. tmp = self._cache[heat_data['heatNo']]
  163. tmp['optype'] = 2
  164. tmp['lastCutTime'] = cuttime
  165. #此处应存储进数据库,暂时使用缓存处理
  166. self._cache[heat_data.get('heatNo', 'error')] = tmp
  167. self.send('heat_add', tmp)
  168. def begin_cut(self, heat_data, billetNo, heatnoIndex, sizing, speed):
  169. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  170. tmp = self.billet_tangible_temp.copy()
  171. tmp['optype'] = 1
  172. tmp['heatNo'] = heat_data.get('heatNo', '')
  173. tmp['billetNo'] = billetNo
  174. tmp['grade'] = heat_data.get('grade', '')
  175. tmp['spec'] = heat_data.get('spec', '')
  176. tmp['ladleNo'] = heat_data.get('ladleNo', '')
  177. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  178. tmp['strandNo'] = billetNo[-3]
  179. tmp['heatnoIndex'] = heatnoIndex
  180. tmp['length'] = sizing
  181. tmp['strandnoIndex'] = int(billetNo[-2:])
  182. tmp['castingSpeed'] = speed
  183. tmp['weight'] = sizing / 1000 * 0.2265
  184. tmp['cutStartTime'] = cuttime
  185. #此处应存储进数据库,暂时使用缓存处理
  186. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  187. self.send('billet_add', tmp)
  188. def end_cut(self, heat_data, heatnoIndex):
  189. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  190. trycount = 4
  191. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  192. while not tmp:
  193. trycount -= 1
  194. time.sleep(0.5)
  195. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  196. if not tmp:
  197. self.logger.error(f"[SENDER]停切:找不到对应的开切数据,炉号:{heat_data['heatNo']},序号:{heatnoIndex}")
  198. return None
  199. tmp['optype'] = 2
  200. tmp['cutStopTime'] = cuttime
  201. #此处应存储进数据库,暂时使用缓存处理
  202. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  203. self.send('billet_add', tmp)
  204. def billet_upload(self, heat_data, billetNo, heatnoIndex, sizing, speed, starttime, stoptime, unionNo):
  205. tmp = self.billet_tangible_temp.copy()
  206. tmp['optype'] = 1
  207. tmp['heatNo'] = heat_data.get('heatNo', '')
  208. tmp['billetNo'] = billetNo
  209. tmp['grade'] = heat_data.get('grade', '')
  210. tmp['spec'] = heat_data.get('spec', '')
  211. tmp['ladleNo'] = heat_data.get('ladleNo', '')
  212. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  213. tmp['strandNo'] = billetNo[-3]
  214. tmp['heatnoIndex'] = heatnoIndex
  215. tmp['length'] = sizing
  216. tmp['strandnoIndex'] = int(billetNo[-2:])
  217. tmp['castingSpeed'] = speed
  218. tmp['weight'] = sizing / 1000 * 0.2265
  219. tmp['cutStartTime'] = starttime
  220. tmp['cutStopTime'] = stoptime
  221. tmp['assemblyNumber'] = unionNo
  222. #此处应存储进数据库
  223. self.send('billet_add', tmp)
  224. def billet_union(self, heat_data, unionNo, billetNos, sizing):
  225. tmp = self.billet_union_temp.copy()
  226. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  227. tmp['heatNo'] = heat_data.get('heatNo', '')
  228. tmp['billetsNo'] = ','.join(billetNos)
  229. tmp['assemblyNumber'] = unionNo
  230. tmp['assemblyTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  231. tmp['length'] = sizing
  232. tmp['billetsNum'] = len(billetNos)
  233. tmp['billetWeight'] = sizing / 1000 * 0.2265 * tmp['billetsNum']
  234. #此处应存储进数据库
  235. self.send('billet_union', tmp)
  236. def host_send(self, ccmNo, billetNos, dst_str, craneNo = ""):
  237. tmp = self.host_send_temp.copy()
  238. tmp['ccmNo'] = ccmNo
  239. tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos
  240. tmp['billetHotsendTypeConfigId'] = "1" if dst_str == "棒一" else "15"
  241. tmp['liftingTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  242. tmp['vehicleNumber'] = craneNo
  243. tmp['location'] = "6#热送辊道" if dst_str == "高线" else ""
  244. #此处应存储进数据库
  245. self.send('host_send', tmp)
  246. def car_add(self, ccmNo, carNo, plate: str):
  247. tmp = self.car_add_temp.copy()
  248. tmp['ccmNo'] = ccmNo
  249. tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else ""
  250. tmp['licensePlate'] = plate
  251. tmp['positionNum'] = carNo
  252. #此处应存储进数据库
  253. self.send('car_add', tmp)
  254. def car_save(self, ccmNo, billetNos, plate, craneNo, Ltime):
  255. tmp = self.car_save_temp.copy()
  256. tmp['ccmNo'] = ccmNo
  257. tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos
  258. tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else ""
  259. tmp['licensePlate'] = plate
  260. tmp['vehicleNumber'] = craneNo
  261. tmp['liftingTime'] = Ltime
  262. tmp['location'] = '装车'
  263. #此处应存储进数据库
  264. self.send('car_save', tmp)
  265. def car_go(self, ccmNo, plate):
  266. tmp = self.car_go_temp.copy()
  267. tmp['ccmNo'] = ccmNo
  268. tmp['licensePlate'] = plate
  269. #此处应存储进数据库
  270. self.send('car_go', tmp)
  271. if __name__ == '__main__':
  272. import logging
  273. logger = logging.getLogger(__name__)
  274. logger.setLevel(level = logging.DEBUG)
  275. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  276. console = logging.StreamHandler()
  277. console.setLevel(logging.DEBUG)
  278. logger.addHandler(console)
  279. mqttcli = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 'python-mqtt-992-sender_test')
  280. mqttcli.username_pw_set('admin', '123456')
  281. mqttcli.connect('192.168.0.119', 1883)
  282. mqttcli.loop_start()
  283. test = Sender(logger)
  284. test.set_mqtt_client(mqttcli)
  285. test.http_flag = False
  286. testheat = {"netWeight":140.06,"ladleNo":"13","heatNo":"25501045","grade":"微氮铌钢","castState":1,"spec":"上若泰基","ccmNo":"5","sendTime":"2025-01-20 09:44:17"}
  287. test.host_send("5", "255010455107", "棒一")
  288. test.host_send("6", ["256009976601","256009976701","256009976801","256009976602"], "高线", "A2")