data_sender.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. import paho.mqtt.client as mqtt
  2. import json, time, requests
  3. class Sender:
  4. def __init__(self, logger):
  5. self.logger = logger
  6. self.topic = {
  7. 'billet_add': 'trace/performance/billet/add',
  8. 'heat_add': 'trace/performance/converter/add',
  9. 'billet_union': 'trace/billet/billetAssemblyNumber/add',
  10. 'host_send': 'syn/billetHotsendBase/save',
  11. 'car_add': 'syn/storageBill/add',
  12. 'car_save': 'syn/billetHotsendBase/shipp/save',
  13. 'car_go': 'syn/billetHotsendBase/shipp/depart',
  14. 'stack_add': 'syn/billet/addStacking',
  15. 'plate_update': 'syn/storageBill/update'
  16. }
  17. self.url = {
  18. 'billet_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/billetActual/reportRealTimeBilletBasicInfo',
  19. 'heat_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/heatsActuals/reportRealTimeHeatsActuals',
  20. 'billet_union': '',
  21. 'host_send': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add',
  22. 'car_add': 'http://192.168.0.119:8181/storageBill/add',
  23. 'car_save': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add',
  24. 'car_go': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/rodLineDepart',
  25. 'stack_add': 'http://192.168.0.119:8181/billet/stackingAndLoadingVehicles/addStacking',
  26. 'plate_update': ''
  27. }
  28. self.heat_tangible_temp = {
  29. 'optype' : 0,
  30. 'casterCode' : '',
  31. 'emptyLadleWeight' : 0.0,
  32. 'fullLadleWeight' : 0.0,
  33. 'grade' : '',
  34. 'spec' : '',
  35. 'heatsCode' : '',
  36. 'firstCutTime' : '',
  37. 'lastCutTime' : '',
  38. 'ladleCode' : '',
  39. 'moltenSteelWeight' : 0.0,
  40. 'startPourTime' : '',
  41. 'stopPourTime' : '',
  42. 'blankOutput' : 0.0,
  43. 'billetSum' : 0
  44. }
  45. self.billet_tangible_temp = {
  46. 'optype' : 0,
  47. 'heatNo' : '',
  48. 'billetNo' : '',
  49. 'grade' : '',
  50. 'ladleNo' : '',
  51. 'ccmNo' : '',
  52. 'strandNo' : '',
  53. 'heatnoIndex' : 0,
  54. 'length' : 0,
  55. 'strandnoIndex' : 0,
  56. 'castingSpeed' : 0.0,
  57. 'actualLength' : 0,
  58. 'spec' : '',
  59. 'width' : 170,
  60. 'thickness' : 170,
  61. 'weight' : 0.0,
  62. 'cutStartTime' : '',
  63. 'cutStopTime' : '',
  64. 'assemblyNumber' : ''
  65. }
  66. self.billet_union_temp = {
  67. "ccmNo": 0, # 铸机号 int数值类型
  68. "heatNo": "", # 炉号 字符串类型
  69. "billetsNo": "", # 坯号集合 字符串类型 逗号隔开
  70. "assemblyNumber": "", # 组坯号 字符串类型
  71. "assemblyTime": "", # 组坯时间 Date日期类型
  72. "length": 0, # 定尺 int数值类型
  73. "billetsNum": 0, # 钢坯数量 int数值类型
  74. "billetWeight": 0 # 钢坯坯重 double高精度数值类型
  75. }
  76. self.host_send_temp = {
  77. "ccmNo": "",
  78. "billetNos": "",
  79. "billetHotsendTypeConfigId": "",
  80. "vehicleNumber": "",
  81. "liftingTime": "",
  82. "location": "",
  83. "destination":"",
  84. "positionNum":"",
  85. "plateOrStack":"",
  86. "layer":"",
  87. "address":""
  88. }
  89. self.car_add_temp = {
  90. "ccmNo": "",
  91. "licensePlate": "",
  92. "billetHotsendTypeConfigId": "",
  93. "positionNum": ""
  94. }
  95. self.car_save_temp = {
  96. "ccmNo": "",
  97. "billetNos": "",
  98. "billetHotsendTypeConfigId": "",
  99. "licensePlate": "",
  100. "vehicleNumber": "",
  101. "liftingTime": "",
  102. "location": "",
  103. "destination":"",
  104. "positionNum":"",
  105. "plateOrStack":"",
  106. "layer":"",
  107. "address":""
  108. }
  109. self.car_go_temp = {
  110. "ccmNo": "",
  111. "positionNum": "",
  112. "licensePlate": ""
  113. }
  114. self.stack_add_temp = {
  115. "ccmNo": "",
  116. "billetHotsendTypeConfigId": "",
  117. "billetNoList": []
  118. }
  119. self.stack_dict = {
  120. "601堆垛": '6',
  121. "602堆垛": '7',
  122. "604堆垛": '8',
  123. "步进冷床": '9',
  124. "501堆垛": '10',
  125. }
  126. self.mqtt_cli = None
  127. self.http_flag = True
  128. self._cache = {}
  129. self._billet = {}
  130. def set_mqtt_client(self, cli):
  131. self.mqtt_cli = cli
  132. def send(self, purpose, payload, qos=2):
  133. if not isinstance(payload, dict):
  134. self.logger.error('[SENDER]发送数据非dict类型')
  135. raise TypeError(f"Need a dict type but {type(payload)} given.")
  136. return None
  137. self.logger.info(f"[SENDER]使用 {'MQTT'if self.mqtt_cli else ''} {'HTTP'if self.http_flag else ''} 发送数据")
  138. self.logger.debug(f"[SENDER]{purpose}:{payload}")
  139. if self.mqtt_cli and purpose in self.topic:
  140. if not self.mqtt_cli.is_connected:
  141. self.logger.error('[SENDER]MQTT:发送失败,MQTT未连接')
  142. if self.mqtt_cli.publish(self.topic[purpose], json.dumps(payload), qos)[0]:
  143. self.logger.error('[SENDER]MQTT:数据包发送失败')
  144. else:
  145. self.logger.info('[SENDER]MQTT:数据包发送成功')
  146. if self.http_flag and purpose in self.url:
  147. try:
  148. self.logger.debug('[SENDER]HTTP:'+requests.post(self.url[purpose], json=payload, timeout=2).text)
  149. except:
  150. self.logger.error('[SENDER]HTTP:请求超时')
  151. def begin_pour(self, heat_data, bigbagweight):
  152. tmp = self.heat_tangible_temp.copy()
  153. tmp['optype'] = 1
  154. tmp['casterCode'] = heat_data.get('ccmNo', '')
  155. tmp['emptyLadleWeight'] = bigbagweight - heat_data.get('netWeight', 0)
  156. tmp['fullLadleWeight'] = bigbagweight
  157. tmp['grade'] = heat_data.get('grade', '')
  158. tmp['spec'] = heat_data.get('spec', '')
  159. tmp['heatsCode'] = heat_data.get('heatNo', '')
  160. tmp['ladleCode'] = heat_data.get('ladleNo', '')
  161. tmp['moltenSteelWeight'] = heat_data.get('netWeight', 0)
  162. tmp['startPourTime'] = heat_data.get('sendTime', '')
  163. #此处应存储进数据库,暂时使用缓存处理
  164. self._cache[heat_data.get('heatNo', 'error')] = tmp
  165. self.send('heat_add', tmp)
  166. def end_pour(self, heat_data):
  167. if heat_data['heatNo'] in self._cache:
  168. tmp = self._cache[heat_data['heatNo']]
  169. tmp['optype'] = 2
  170. tmp['grade'] = heat_data.get('grade', '')
  171. tmp['spec'] = heat_data.get('spec', '')
  172. tmp['stopPourTime'] = heat_data.get('sendTime', '')
  173. #此处应存储进数据库,暂时使用缓存处理
  174. self._cache[heat_data.get('heatNo', 'error')] = tmp
  175. self.send('heat_add', tmp)
  176. else:
  177. self.logger.error(f"[SENDER]停浇:找不到对应的开浇数据,炉号:{heat_data['heatNo']}")
  178. def heat_first(self, heat_data, cuttime = None):
  179. if not cuttime:
  180. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  181. tmp = self._cache[heat_data['heatNo']]
  182. tmp['optype'] = 2
  183. tmp['firstCutTime'] = cuttime
  184. #此处应存储进数据库,暂时使用缓存处理
  185. self._cache[heat_data.get('heatNo', 'error')] = tmp
  186. self.send('heat_add', tmp)
  187. def heat_last(self, heat_data, cuttime = None):
  188. if not cuttime:
  189. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  190. tmp = self._cache[heat_data['heatNo']]
  191. tmp['optype'] = 2
  192. tmp['lastCutTime'] = cuttime
  193. #此处应存储进数据库,暂时使用缓存处理
  194. self._cache[heat_data.get('heatNo', 'error')] = tmp
  195. self.send('heat_add', tmp)
  196. def begin_cut(self, heat_data, billetNo, heatnoIndex, sizing, speed):
  197. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  198. tmp = self.billet_tangible_temp.copy()
  199. tmp['optype'] = 1
  200. tmp['heatNo'] = heat_data.get('heatNo', '')
  201. tmp['billetNo'] = billetNo
  202. tmp['grade'] = heat_data.get('grade', '')
  203. tmp['spec'] = heat_data.get('spec', '')
  204. tmp['ladleNo'] = heat_data.get('ladleNo', '')
  205. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  206. tmp['strandNo'] = billetNo[-3]
  207. tmp['heatnoIndex'] = heatnoIndex
  208. tmp['length'] = sizing
  209. tmp['strandnoIndex'] = int(billetNo[-2:])
  210. tmp['castingSpeed'] = speed
  211. tmp['weight'] = sizing / 1000 * 0.2265
  212. tmp['cutStartTime'] = cuttime
  213. #此处应存储进数据库,暂时使用缓存处理
  214. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  215. self.send('billet_add', tmp)
  216. def end_cut(self, heat_data, heatnoIndex):
  217. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  218. trycount = 4
  219. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  220. while not tmp:
  221. trycount -= 1
  222. time.sleep(0.5)
  223. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  224. if not tmp:
  225. self.logger.error(f"[SENDER]停切:找不到对应的开切数据,炉号:{heat_data['heatNo']},序号:{heatnoIndex}")
  226. return None
  227. tmp['optype'] = 2
  228. tmp['cutStopTime'] = cuttime
  229. #此处应存储进数据库,暂时使用缓存处理
  230. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  231. self.send('billet_add', tmp)
  232. def billet_upload(self, heat_data, billetNo, heatnoIndex, sizing, speed, starttime, stoptime, unionNo):
  233. tmp = self.billet_tangible_temp.copy()
  234. tmp['optype'] = 1
  235. tmp['heatNo'] = heat_data.get('heatNo', '')
  236. tmp['billetNo'] = billetNo
  237. tmp['grade'] = heat_data.get('grade', '')
  238. tmp['spec'] = heat_data.get('spec', '')
  239. tmp['ladleNo'] = heat_data.get('ladleNo', '')
  240. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  241. tmp['strandNo'] = billetNo[-3]
  242. tmp['heatnoIndex'] = heatnoIndex
  243. tmp['length'] = sizing
  244. tmp['strandnoIndex'] = int(billetNo[-2:])
  245. tmp['castingSpeed'] = speed
  246. tmp['weight'] = sizing / 1000 * 0.2265
  247. tmp['cutStartTime'] = starttime
  248. tmp['cutStopTime'] = stoptime
  249. tmp['assemblyNumber'] = unionNo
  250. #此处应存储进数据库
  251. self.send('billet_add', tmp)
  252. def billet_union(self, heat_data, unionNo, billetNos, sizing):
  253. tmp = self.billet_union_temp.copy()
  254. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  255. tmp['heatNo'] = heat_data.get('heatNo', '')
  256. tmp['billetsNo'] = ','.join(billetNos)
  257. tmp['assemblyNumber'] = unionNo
  258. tmp['assemblyTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  259. tmp['length'] = sizing
  260. tmp['billetsNum'] = len(billetNos)
  261. tmp['billetWeight'] = sizing / 1000 * 0.2265 * tmp['billetsNum']
  262. #此处应存储进数据库
  263. self.send('billet_union', tmp)
  264. def host_send(self, ccmNo, billetNos, dst_str, craneNo = "", fromaddr = ""):
  265. tmp = self.host_send_temp.copy()
  266. tmp['ccmNo'] = ccmNo
  267. tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos
  268. tmp['billetHotsendTypeConfigId'] = "1" if dst_str == "棒一" else "15"
  269. tmp['liftingTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  270. tmp['vehicleNumber'] = craneNo
  271. tmp['location'] = fromaddr
  272. tmp['destination'] = dst_str
  273. #此处应存储进数据库
  274. self.send('host_send', tmp)
  275. def car_add(self, ccmNo, carNo, plate: str):
  276. tmp = self.car_add_temp.copy()
  277. tmp['ccmNo'] = ccmNo
  278. tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else ""
  279. tmp['licensePlate'] = plate
  280. tmp['positionNum'] = carNo
  281. #此处应存储进数据库
  282. self.send('car_add', tmp)
  283. def car_save(self, ccmNo, billetNos, plate, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0):
  284. tmp = self.car_save_temp.copy()
  285. tmp['ccmNo'] = ccmNo
  286. tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos
  287. tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else ""
  288. tmp['licensePlate'] = plate
  289. tmp['vehicleNumber'] = craneNo
  290. tmp['liftingTime'] = Ltime
  291. tmp['location'] = fromaddr
  292. tmp['destination'] = toaddr
  293. tmp['positionNum'] = toaddr[-1] if len(toaddr) else ""
  294. tmp['plateOrStack'] = plate
  295. tmp['layer'] = str(layer)
  296. tmp['address'] = str(address)
  297. #此处应存储进数据库
  298. self.send('car_save', tmp)
  299. def car_go(self, ccmNo, carNo, plate = ''):
  300. tmp = self.car_go_temp.copy()
  301. tmp['ccmNo'] = ccmNo
  302. tmp['positionNum'] = carNo
  303. tmp['licensePlate'] = plate
  304. #此处应存储进数据库
  305. self.send('car_go', tmp)
  306. def stack_add(self, ccmNo, billetNos, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0):
  307. tmp = self.stack_add_temp.copy()
  308. tmp['ccmNo'] = ccmNo
  309. tmp['billetHotsendTypeConfigId'] = self.stack_dict.get(toaddr, '')
  310. tmp['billetNoList'] = []
  311. tmp['billetNoList'].append({
  312. 'billetNos' : ','.join(billetNos) if isinstance(billetNos, list) else billetNos,
  313. 'vehicleNumber' : craneNo,
  314. 'liftingTime' : Ltime,
  315. 'location' : fromaddr,
  316. 'destination' : toaddr,
  317. 'positionNum' : "",
  318. 'plateOrStack' : toaddr,
  319. 'layer' : str(layer),
  320. 'address' : str(address)
  321. })
  322. #此处应存储进数据库
  323. self.send('stack_add', tmp)
  324. def plate_update(self, ccmNo, carNo, plate):
  325. tmp = self.car_go_temp.copy()
  326. tmp['ccmNo'] = ccmNo
  327. tmp['positionNum'] = carNo
  328. tmp['licensePlate'] = plate
  329. #此处应存储进数据库
  330. self.send('plate_update', tmp)
  331. if __name__ == '__main__':
  332. import logging
  333. logger = logging.getLogger(__name__)
  334. logger.setLevel(level = logging.DEBUG)
  335. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  336. console = logging.StreamHandler()
  337. console.setLevel(logging.DEBUG)
  338. logger.addHandler(console)
  339. mqttcli = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 'python-mqtt-992-sender_test')
  340. mqttcli.username_pw_set('admin', '123456')
  341. mqttcli.connect('192.168.0.119', 1883)
  342. mqttcli.loop_start()
  343. test = Sender(logger)
  344. test.set_mqtt_client(mqttcli)
  345. test.http_flag = False
  346. testheat = {"netWeight":140.06,"ladleNo":"13","heatNo":"25501045","grade":"微氮铌钢","castState":1,"spec":"上若泰基","ccmNo":"5","sendTime":"2025-01-20 09:44:17"}
  347. #test.host_send("5", "255010455107", "棒一")
  348. #test.host_send("6", ["256009976601","256009976701","256009976801","256009976602"], "高线", "A2")
  349. test.car_add("6", "4", "陕E00901")
  350. test.car_save("6", "256012036107,256012036207,256012036407,256012036108,", "陕E00901", "A3", "2025-01-23 16:22:17", "6#小冷床(右)", "车位4")
  351. #test.car_go("6", "陕E00901")