data_sender.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. import paho.mqtt.client as mqtt
  2. import json, time, requests, pymysql, threading
  3. class Sender:
  4. def __init__(self, logger):
  5. self.logger = logger
  6. self.topic = {
  7. 'billet_add': 'trace/performance/billet/add',
  8. 'heat_add': 'trace/performance/converter/add',
  9. 'billet_union': 'trace/billet/billetAssemblyNumber/add',
  10. 'host_send': 'syn/billetHotsendBase/save',
  11. 'car_add': 'syn/storageBill/add',
  12. 'car_save': 'syn/billetHotsendBase/shipp/save',
  13. 'car_go': 'syn/billetHotsendBase/shipp/depart',
  14. 'stack_add': 'syn/billet/addStacking',
  15. 'plate_update': 'syn/storageBill/update',
  16. 'stack_car_save': 'syn/billet/stackingAndLoadingVehicles/loading',
  17. 'jiaoban': 'syn/billet/changeShift',
  18. 'jiaoban_6': 'syn/billet/changeSixShift',
  19. 'plate_zhagang': 'trace/performance/car/arrive'
  20. }
  21. self.url = {
  22. 'billet_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/billetActual/reportRealTimeBilletBasicInfo',
  23. 'heat_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/heatsActuals/reportRealTimeHeatsActuals',
  24. 'billet_union': '',
  25. 'host_send': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add',
  26. 'car_add': 'http://192.168.0.119:8181/storageBill/add',
  27. 'car_save': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add',
  28. 'car_go': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/rodLineDepart',
  29. 'stack_add': 'http://192.168.0.119:8181/billet/stackingAndLoadingVehicles/addStacking',
  30. 'plate_update': '',
  31. 'stack_car_save': '',
  32. 'jiaoban': '',
  33. 'jiaoban_6': '',
  34. 'plate_zhagang': ''
  35. }
  36. self.heat_tangible_temp = {
  37. 'optype' : 0,
  38. 'casterCode' : '',
  39. 'emptyLadleWeight' : 0.0,
  40. 'fullLadleWeight' : 0.0,
  41. 'grade' : '',
  42. 'spec' : '',
  43. 'heatsCode' : '',
  44. 'firstCutTime' : '',
  45. 'lastCutTime' : '',
  46. 'ladleCode' : '',
  47. 'moltenSteelWeight' : 0.0,
  48. 'startPourTime' : '',
  49. 'stopPourTime' : '',
  50. 'blankOutput' : 0.0,
  51. 'billetSum' : 0
  52. }
  53. self.billet_tangible_temp = {
  54. 'optype' : 0,
  55. 'heatNo' : '',
  56. 'billetNo' : '',
  57. 'grade' : '',
  58. 'ladleNo' : '',
  59. 'ccmNo' : '',
  60. 'strandNo' : '',
  61. 'heatnoIndex' : 0,
  62. 'length' : 0,
  63. 'strandnoIndex' : 0,
  64. 'castingSpeed' : 0.0,
  65. 'actualLength' : 0,
  66. 'spec' : '',
  67. 'width' : 170,
  68. 'thickness' : 170,
  69. 'weight' : 0.0,
  70. 'cutStartTime' : '',
  71. 'cutStopTime' : '',
  72. 'assemblyNumber' : '',
  73. 'sign': "1"
  74. }
  75. self.billet_union_temp = {
  76. "ccmNo": 0, # 铸机号 int数值类型
  77. "heatNo": "", # 炉号 字符串类型
  78. "billetsNo": "", # 坯号集合 字符串类型 逗号隔开
  79. "assemblyNumber": "", # 组坯号 字符串类型
  80. "assemblyTime": "", # 组坯时间 Date日期类型
  81. "length": 0, # 定尺 int数值类型
  82. "billetsNum": 0, # 钢坯数量 int数值类型
  83. "billetWeight": 0 # 钢坯坯重 double高精度数值类型
  84. }
  85. self.host_send_temp = {
  86. "ccmNo": "",
  87. "billetNos": "",
  88. "billetHotsendTypeConfigId": "",
  89. "vehicleNumber": "",
  90. "liftingTime": "",
  91. "location": "",
  92. "destination":"",
  93. "positionNum":"",
  94. "plateOrStack":"",
  95. "layer":"",
  96. "address":""
  97. }
  98. self.car_add_temp = {
  99. "ccmNo": "",
  100. "licensePlate": "",
  101. "billetHotsendTypeConfigId": "",
  102. "positionNum": ""
  103. }
  104. self.car_save_temp = {
  105. "ccmNo": "",
  106. "billetNos": "",
  107. "billetHotsendTypeConfigId": "",
  108. "licensePlate": "",
  109. "vehicleNumber": "",
  110. "liftingTime": "",
  111. "location": "",
  112. "destination":"",
  113. "positionNum":"",
  114. "plateOrStack":"",
  115. "layer":"",
  116. "address":""
  117. }
  118. self.car_go_temp = {
  119. "ccmNo": "",
  120. "positionNum": "",
  121. "licensePlate": ""
  122. }
  123. self.stack_add_temp = {
  124. "ccmNo": "",
  125. "billetHotsendTypeConfigId": "",
  126. "billetNoList": []
  127. }
  128. self.stack_car_save_temp = {
  129. "ccmNo": "",
  130. "billetHotsendTypeConfigId": "",
  131. "address": "",
  132. "layer": "",
  133. "vehicleNumber": "",
  134. "liftingTime": "",
  135. "location": "",
  136. "positionNum":"",
  137. "plateOrStack":""
  138. }
  139. self.stack_dict = {
  140. "601堆垛": '6',
  141. "602堆垛": '7',
  142. "604堆垛": '8',
  143. "步进冷床堆垛": '9',
  144. "501堆垛": '10',
  145. }
  146. self.mqtt_cli = None
  147. self.http_flag = False
  148. self.mysql_flag = False
  149. self._cache = {}
  150. self._billet = {}
  151. self.mysql_lock = threading.Lock()
  152. def set_mqtt_client(self, cli):
  153. self.mqtt_cli = cli
  154. def set_mysql_client(self, cli: pymysql.connections.Connection):
  155. self.mysql_cli = cli
  156. def send(self, purpose, payload, qos=2):
  157. if not isinstance(payload, dict):
  158. self.logger.error('[SENDER]发送数据非dict类型')
  159. raise TypeError(f"Need a dict type but {type(payload)} given.")
  160. return None
  161. self.logger.info(f"[SENDER]使用 {'MQTT'if self.mqtt_cli else ''} {'HTTP'if self.http_flag else ''} {'MYSQL'if self.mysql_flag else ''} 发送数据")
  162. self.logger.debug(f"[SENDER]{purpose}:{payload}")
  163. if self.mqtt_cli and purpose in self.topic:
  164. if not self.mqtt_cli.is_connected:
  165. self.logger.error('[SENDER]MQTT:发送失败,MQTT未连接')
  166. if self.mqtt_cli.publish(self.topic[purpose], json.dumps(payload), qos)[0]:
  167. self.logger.error('[SENDER]MQTT:数据包发送失败')
  168. else:
  169. self.logger.info('[SENDER]MQTT:数据包发送成功')
  170. if self.http_flag and purpose in self.url:
  171. try:
  172. self.logger.debug('[SENDER]HTTP:'+requests.post(self.url[purpose], json=payload, timeout=2).text)
  173. except:
  174. self.logger.error('[SENDER]HTTP:请求超时')
  175. if self.mysql_flag and hasattr(self, 'mysql_cli') and self.mysql_cli:
  176. if self.save_mysql(purpose, payload):
  177. self.logger.info('[SENDER]MYSQL:数据保存成功')
  178. else:
  179. self.logger.error('[SENDER]MYSQL:数据保存失败')
  180. def save_mysql(self, table, args: dict):
  181. if not hasattr(self, 'mysql_cli') or self.mysql_cli == None:
  182. return None
  183. keys = []
  184. solved_values = []
  185. for k, v in args.items():
  186. keys.append(k)
  187. if 'time' in k.lower():
  188. if v:
  189. solved_values.append(v)
  190. else:
  191. solved_values.append(None)
  192. elif isinstance(v, int) or isinstance(v, float) or isinstance(v, str):
  193. solved_values.append(v)
  194. else:
  195. solved_values.append(json.dumps(v))
  196. sql = f"INSERT INTO {table}({', '.join(keys)}) VALUES({', '.join(['%s' for i in range(len(keys))])})"
  197. with self.mysql_lock:
  198. try:
  199. self.mysql_cli.ping()
  200. with self.mysql_cli.cursor() as cursor:
  201. cursor.execute(sql, tuple(solved_values))
  202. self.mysql_cli.commit()
  203. return True
  204. except pymysql.Error as e:
  205. self.logger.error(f"[SENDER]MYSQL:{e}")
  206. return False
  207. def begin_pour(self, heat_data, bigbagweight):
  208. tmp = self.heat_tangible_temp.copy()
  209. tmp['optype'] = 1
  210. tmp['casterCode'] = heat_data.get('ccmNo', '')
  211. tmp['emptyLadleWeight'] = bigbagweight - float(heat_data.get('netWeight', 0))
  212. tmp['fullLadleWeight'] = bigbagweight
  213. tmp['grade'] = heat_data.get('grade', '')
  214. tmp['spec'] = heat_data.get('spec', '')
  215. tmp['heatsCode'] = heat_data.get('heatNo', '')
  216. tmp['ladleCode'] = heat_data.get('ladleNo', '')
  217. tmp['moltenSteelWeight'] = float(heat_data.get('netWeight', 0))
  218. tmp['startPourTime'] = heat_data.get('sendTime', '')
  219. self._cache[heat_data.get('heatNo', 'error')] = tmp
  220. self.send('heat_add', tmp)
  221. def end_pour(self, heat_data):
  222. if heat_data['heatNo'] in self._cache:
  223. tmp = self._cache[heat_data['heatNo']]
  224. tmp['optype'] = 2
  225. tmp['grade'] = heat_data.get('grade', '')
  226. tmp['spec'] = heat_data.get('spec', '')
  227. tmp['stopPourTime'] = heat_data.get('sendTime', '')
  228. self._cache[heat_data.get('heatNo', 'error')] = tmp
  229. self.send('heat_add', tmp)
  230. else:
  231. self.logger.error(f"[SENDER]停浇:找不到对应的开浇数据,炉号:{heat_data['heatNo']}")
  232. def heat_first(self, heat_data, cuttime = None):
  233. if not cuttime:
  234. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  235. tmp = self._cache[heat_data['heatNo']]
  236. tmp['optype'] = 2
  237. tmp['firstCutTime'] = cuttime
  238. self._cache[heat_data.get('heatNo', 'error')] = tmp
  239. self.send('heat_add', tmp)
  240. def heat_last(self, heat_data, cuttime = None):
  241. if not cuttime:
  242. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  243. tmp = self._cache[heat_data['heatNo']]
  244. tmp['optype'] = 2
  245. tmp['lastCutTime'] = cuttime
  246. self._cache[heat_data.get('heatNo', 'error')] = tmp
  247. self.send('heat_add', tmp)
  248. def begin_cut(self, heat_data, billetNo, heatnoIndex, sizing, speed, error=False):
  249. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  250. tmp = self.billet_tangible_temp.copy()
  251. tmp['optype'] = 1
  252. tmp['heatNo'] = heat_data.get('heatNo', '')
  253. tmp['billetNo'] = billetNo
  254. tmp['grade'] = heat_data.get('grade', '')
  255. tmp['spec'] = heat_data.get('spec', '')
  256. tmp['ladleNo'] = heat_data.get('ladleNo', '')
  257. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  258. tmp['strandNo'] = billetNo[-3]
  259. tmp['heatnoIndex'] = heatnoIndex
  260. tmp['length'] = sizing
  261. tmp['strandnoIndex'] = int(billetNo[-2:])
  262. tmp['castingSpeed'] = speed
  263. tmp['weight'] = sizing / 1000 * 0.2265
  264. tmp['cutStartTime'] = cuttime
  265. tmp['sign'] = "2" if error else "1"
  266. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  267. self.send('billet_add', tmp)
  268. def end_cut(self, heat_data, heatnoIndex):
  269. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  270. trycount = 4
  271. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  272. while not tmp:
  273. trycount -= 1
  274. time.sleep(0.5)
  275. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  276. if not tmp:
  277. self.logger.error(f"[SENDER]停切:找不到对应的开切数据,炉号:{heat_data['heatNo']},序号:{heatnoIndex}")
  278. return None
  279. tmp['optype'] = 2
  280. tmp['cutStopTime'] = cuttime
  281. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  282. self.send('billet_add', tmp)
  283. def billet_upload(self, heat_data, billetNo, heatnoIndex, sizing, speed, starttime, stoptime, unionNo, error=False):
  284. tmp = self.billet_tangible_temp.copy()
  285. tmp['optype'] = 1
  286. tmp['heatNo'] = heat_data.get('heatNo', '')
  287. tmp['billetNo'] = billetNo
  288. tmp['grade'] = heat_data.get('grade', '')
  289. tmp['spec'] = heat_data.get('spec', '')
  290. tmp['ladleNo'] = heat_data.get('ladleNo', '')
  291. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  292. tmp['strandNo'] = billetNo[-3]
  293. tmp['heatnoIndex'] = heatnoIndex
  294. tmp['length'] = sizing
  295. tmp['strandnoIndex'] = int(billetNo[-2:])
  296. tmp['castingSpeed'] = speed
  297. tmp['weight'] = sizing / 1000 * 0.2265
  298. tmp['cutStartTime'] = starttime
  299. tmp['cutStopTime'] = stoptime
  300. tmp['assemblyNumber'] = unionNo
  301. tmp['sign'] = "2" if error else "1"
  302. self.send('billet_add', tmp)
  303. def billet_union(self, heat_data, unionNo, billetNos, sizing):
  304. tmp = self.billet_union_temp.copy()
  305. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  306. tmp['heatNo'] = heat_data.get('heatNo', '')
  307. tmp['billetsNo'] = ','.join(billetNos)
  308. tmp['assemblyNumber'] = unionNo
  309. tmp['assemblyTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  310. tmp['length'] = sizing
  311. tmp['billetsNum'] = len(billetNos)
  312. tmp['billetWeight'] = sizing / 1000 * 0.2265 * tmp['billetsNum']
  313. self.send('billet_union', tmp)
  314. def host_send(self, ccmNo, billetNos, dst_str, craneNo = "", fromaddr = ""):
  315. tmp = self.host_send_temp.copy()
  316. tmp['ccmNo'] = ccmNo
  317. tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos
  318. tmp['billetHotsendTypeConfigId'] = "1" if dst_str == "棒一" else "15"
  319. tmp['liftingTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  320. tmp['vehicleNumber'] = craneNo
  321. tmp['location'] = fromaddr
  322. tmp['destination'] = dst_str
  323. self.send('host_send', tmp)
  324. def car_add(self, ccmNo, carNo, plate: str):
  325. tmp = self.car_add_temp.copy()
  326. tmp['ccmNo'] = ccmNo
  327. tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else ""
  328. tmp['licensePlate'] = plate
  329. tmp['positionNum'] = carNo
  330. self.send('car_add', tmp)
  331. def car_save(self, ccmNo, billetNos, plate, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0):
  332. tmp = self.car_save_temp.copy()
  333. tmp['ccmNo'] = ccmNo
  334. tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos
  335. tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else ""
  336. tmp['licensePlate'] = plate
  337. tmp['vehicleNumber'] = craneNo
  338. tmp['liftingTime'] = Ltime
  339. tmp['location'] = fromaddr
  340. tmp['destination'] = toaddr
  341. tmp['positionNum'] = toaddr[-1] if len(toaddr) else ""
  342. tmp['plateOrStack'] = plate
  343. tmp['layer'] = str(layer)
  344. tmp['address'] = str(address)
  345. self.send('car_save', tmp)
  346. def car_go(self, ccmNo, carNo, plate = ''):
  347. tmp = self.car_go_temp.copy()
  348. tmp['ccmNo'] = ccmNo
  349. tmp['positionNum'] = carNo
  350. tmp['licensePlate'] = plate
  351. self.send('car_go', tmp)
  352. def stack_add(self, ccmNo, billetNos, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0):
  353. tmp = self.stack_add_temp.copy()
  354. tmp['ccmNo'] = ccmNo
  355. tmp['billetHotsendTypeConfigId'] = self.stack_dict.get(toaddr, '')
  356. tmp['billetNoList'] = []
  357. tmp['billetNoList'].append({
  358. 'billetNos' : ','.join(billetNos) if isinstance(billetNos, list) else billetNos,
  359. 'vehicleNumber' : craneNo,
  360. 'liftingTime' : Ltime,
  361. 'location' : fromaddr,
  362. 'destination' : toaddr,
  363. 'positionNum' : "",
  364. 'plateOrStack' : toaddr,
  365. 'layer' : str(layer),
  366. 'address' : str(address)
  367. })
  368. self.send('stack_add', tmp)
  369. def plate_update(self, ccmNo, carNo, plate):
  370. tmp = self.car_go_temp.copy()
  371. tmp['ccmNo'] = ccmNo
  372. tmp['positionNum'] = carNo
  373. tmp['licensePlate'] = plate
  374. self.send('plate_update', tmp)
  375. def stack_car_save(self, ccmNo, craneNo, Ltime, fromaddr, toaddr, layer, address):
  376. tmp = self.stack_car_save_temp.copy()
  377. tmp['ccmNo'] = ccmNo
  378. tmp['billetHotsendTypeConfigId'] = self.stack_dict[fromaddr]
  379. tmp['vehicleNumber'] = craneNo
  380. tmp['liftingTime'] = Ltime
  381. tmp['location'] = fromaddr
  382. tmp['positionNum'] = toaddr[-1] if len(toaddr) else ""
  383. tmp['plateOrStack'] = fromaddr
  384. tmp['layer'] = str(layer)
  385. tmp['address'] = str(address)
  386. self.send('stack_car_save', tmp)
  387. def jiaoban(self, ccmNo, ban):
  388. tmp = {}
  389. tmp['changeShiftTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  390. tmp['ban'] = ban
  391. self.send('jiaoban', tmp)
  392. def jiaoban_6(self, ccmNo, ban):
  393. tmp = {}
  394. tmp['changeShiftTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  395. tmp['ban'] = ban
  396. self.send('jiaoban_6', tmp)
  397. def plate_zhagang(self, carNo, plate):
  398. tmp = {}
  399. tmp['type'] = 'car_arrive'
  400. tmp['data'] = {
  401. 'car': plate,
  402. 'position': carNo
  403. }
  404. self.send('plate_zhagang', tmp)
  405. if __name__ == '__main__':
  406. import logging
  407. logger = logging.getLogger(__name__)
  408. logger.setLevel(level = logging.DEBUG)
  409. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  410. console = logging.StreamHandler()
  411. console.setLevel(logging.DEBUG)
  412. logger.addHandler(console)
  413. mqttcli = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 'python-mqtt-992-sender_test')
  414. mqttcli.username_pw_set('admin', '123456')
  415. mqttcli.connect('192.168.0.119', 1883)
  416. mqttcli.loop_start()
  417. test = Sender(logger)
  418. test.set_mqtt_client(mqttcli)
  419. test.http_flag = False
  420. testheat = {"netWeight":140.06,"ladleNo":"13","heatNo":"25501045","grade":"微氮铌钢","castState":1,"spec":"上若泰基","ccmNo":"5","sendTime":"2025-01-20 09:44:17"}
  421. #test.host_send("5", "255010455107", "棒一")
  422. #test.host_send("6", ["256009976601","256009976701","256009976801","256009976602"], "高线", "A2")
  423. test.car_add("6", "4", "陕E00901")
  424. test.car_save("6", "256012036107,256012036207,256012036407,256012036108,", "陕E00901", "A3", "2025-01-23 16:22:17", "6#小冷床(右)", "车位4")
  425. #test.car_go("6", "陕E00901")