data_sender.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. import paho.mqtt.client as mqtt
  2. import json, time, requests, pymysql, threading, datetime
  3. from utils.logger import Logger
  4. class Sender:
  5. def __init__(self, logger: Logger):
  6. self.logger = logger
  7. self.topic = {
  8. 'billet_add': 'trace/performance/billet/add',
  9. 'heat_add': 'trace/performance/converter/add',
  10. 'billet_union': 'trace/billet/billetAssemblyNumber/add',
  11. 'host_send': 'syn/billetHotsendBase/save',
  12. 'car_add': 'syn/storageBill/add',
  13. 'car_save': 'syn/billetHotsendBase/shipp/save',
  14. 'car_go': 'syn/billetHotsendBase/shipp/depart',
  15. 'stack_add': 'syn/billet/addStacking',
  16. 'plate_update': 'syn/storageBill/update',
  17. 'stack_car_save': 'syn/billet/stackingAndLoadingVehicles/loading',
  18. 'jiaoban': 'syn/billet/changeShift',
  19. 'jiaoban_6': 'syn/billet/changeSixShift',
  20. 'plate_zhagang': 'trace/performance/car/arrive'
  21. }
  22. self.url = {
  23. 'billet_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/billetActual/reportRealTimeBilletBasicInfo',
  24. 'heat_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/heatsActuals/reportRealTimeHeatsActuals',
  25. 'billet_union': '',
  26. 'host_send': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add',
  27. 'car_add': 'http://192.168.0.119:8181/storageBill/add',
  28. 'car_save': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add',
  29. 'car_go': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/rodLineDepart',
  30. 'stack_add': 'http://192.168.0.119:8181/billet/stackingAndLoadingVehicles/addStacking',
  31. 'plate_update': '',
  32. 'stack_car_save': '',
  33. 'jiaoban': '',
  34. 'jiaoban_6': '',
  35. 'plate_zhagang': ''
  36. }
  37. self.heat_tangible_temp = {
  38. 'optype' : 0,
  39. 'casterCode' : '',
  40. 'emptyLadleWeight' : 0.0,
  41. 'fullLadleWeight' : 0.0,
  42. 'grade' : '',
  43. 'spec' : '',
  44. 'heatsCode' : '',
  45. 'firstCutTime' : '',
  46. 'lastCutTime' : '',
  47. 'ladleCode' : '',
  48. 'moltenSteelWeight' : 0.0,
  49. 'startPourTime' : '',
  50. 'stopPourTime' : '',
  51. 'blankOutput' : 0.0,
  52. 'billetSum' : 0
  53. }
  54. self.billet_tangible_temp = {
  55. 'optype' : 0,
  56. 'heatNo' : '',
  57. 'billetNo' : '',
  58. 'grade' : '',
  59. 'ladleNo' : '',
  60. 'ccmNo' : '',
  61. 'strandNo' : '',
  62. 'heatnoIndex' : 0,
  63. 'length' : 0,
  64. 'strandnoIndex' : 0,
  65. 'castingSpeed' : 0.0,
  66. 'actualLength' : 0,
  67. 'spec' : '',
  68. 'width' : 170,
  69. 'thickness' : 170,
  70. 'weight' : 0.0,
  71. 'cutStartTime' : '',
  72. 'cutStopTime' : '',
  73. 'assemblyNumber' : '',
  74. 'sign': "1"
  75. }
  76. self.billet_union_temp = {
  77. "ccmNo": 0, # 铸机号 int数值类型
  78. "heatNo": "", # 炉号 字符串类型
  79. "billetsNo": "", # 坯号集合 字符串类型 逗号隔开
  80. "assemblyNumber": "", # 组坯号 字符串类型
  81. "assemblyTime": "", # 组坯时间 Date日期类型
  82. "length": 0, # 定尺 int数值类型
  83. "billetsNum": 0, # 钢坯数量 int数值类型
  84. "billetWeight": 0 # 钢坯坯重 double高精度数值类型
  85. }
  86. self.host_send_temp = {
  87. "ccmNo": "",
  88. "billetNos": "",
  89. "billetHotsendTypeConfigId": "",
  90. "vehicleNumber": "",
  91. "liftingTime": "",
  92. "location": "",
  93. "destination":"",
  94. "positionNum":"",
  95. "plateOrStack":"",
  96. "layer":"",
  97. "address":""
  98. }
  99. self.car_add_temp = {
  100. "ccmNo": "",
  101. "licensePlate": "",
  102. "billetHotsendTypeConfigId": "",
  103. "positionNum": ""
  104. }
  105. self.car_save_temp = {
  106. "ccmNo": "",
  107. "billetNos": "",
  108. "billetHotsendTypeConfigId": "",
  109. "licensePlate": "",
  110. "vehicleNumber": "",
  111. "liftingTime": "",
  112. "location": "",
  113. "destination":"",
  114. "positionNum":"",
  115. "plateOrStack":"",
  116. "layer":"",
  117. "address":""
  118. }
  119. self.car_go_temp = {
  120. "ccmNo": "",
  121. "positionNum": "",
  122. "licensePlate": ""
  123. }
  124. self.stack_add_temp = {
  125. "ccmNo": "",
  126. "billetHotsendTypeConfigId": "",
  127. "billetNoList": []
  128. }
  129. self.stack_car_save_temp = {
  130. "ccmNo": "",
  131. "billetHotsendTypeConfigId": "",
  132. "address": "",
  133. "layer": "",
  134. "vehicleNumber": "",
  135. "liftingTime": "",
  136. "location": "",
  137. "positionNum":"",
  138. "plateOrStack":""
  139. }
  140. self.stack_dict = {
  141. "601堆垛": '6',
  142. "602堆垛": '7',
  143. "604堆垛": '8',
  144. "步进冷床堆垛": '9',
  145. "501堆垛": '10',
  146. }
  147. self.mqtt_cli = None
  148. self.http_flag = False
  149. self.mysql_flag = False
  150. self._cache = {}
  151. self._billet = {}
  152. self.mysql_lock = threading.Lock()
  153. def set_mqtt_client(self, cli):
  154. self.mqtt_cli = cli
  155. def set_mysql_client(self, cli: pymysql.connections.Connection):
  156. self.mysql_cli = cli
  157. def send(self, purpose, payload, qos=2):
  158. if not isinstance(payload, dict):
  159. self.logger.error('[SENDER]发送数据非dict类型')
  160. raise TypeError(f"Need a dict type but {type(payload)} given.")
  161. return None
  162. self.logger.info(f"[SENDER]使用 {'MQTT'if self.mqtt_cli else ''} {'HTTP'if self.http_flag else ''} {'MYSQL'if self.mysql_flag else ''} 发送数据")
  163. self.logger.debug(f"[SENDER]{purpose}:{payload}")
  164. if self.mqtt_cli and purpose in self.topic:
  165. if not self.mqtt_cli.is_connected:
  166. self.logger.error('[SENDER]MQTT:发送失败,MQTT未连接')
  167. if self.mqtt_cli.publish(self.topic[purpose], json.dumps(payload), qos)[0]:
  168. self.logger.error('[SENDER]MQTT:数据包发送失败')
  169. else:
  170. self.logger.info('[SENDER]MQTT:数据包发送成功')
  171. if self.http_flag and purpose in self.url:
  172. try:
  173. self.logger.debug('[SENDER]HTTP:'+requests.post(self.url[purpose], json=payload, timeout=2).text)
  174. except:
  175. self.logger.error('[SENDER]HTTP:请求超时')
  176. if self.mysql_flag and hasattr(self, 'mysql_cli') and self.mysql_cli:
  177. if self.save_mysql(purpose, payload):
  178. self.logger.info('[SENDER]MYSQL:数据保存成功')
  179. else:
  180. self.logger.error('[SENDER]MYSQL:数据保存失败')
  181. def save_mysql(self, table, args: dict):
  182. if not hasattr(self, 'mysql_cli') or self.mysql_cli == None:
  183. return None
  184. keys = []
  185. solved_values = []
  186. for k, v in args.items():
  187. keys.append(k)
  188. if 'time' in k.lower():
  189. if v:
  190. solved_values.append(v)
  191. else:
  192. solved_values.append(None)
  193. elif isinstance(v, int) or isinstance(v, float) or isinstance(v, str):
  194. solved_values.append(v)
  195. else:
  196. solved_values.append(json.dumps(v))
  197. sql = f"INSERT INTO {table}({', '.join(keys)}) VALUES({', '.join(['%s' for i in range(len(keys))])})"
  198. with self.mysql_lock:
  199. try:
  200. self.mysql_cli.ping()
  201. with self.mysql_cli.cursor() as cursor:
  202. cursor.execute(sql, tuple(solved_values))
  203. self.mysql_cli.commit()
  204. return True
  205. except pymysql.Error as e:
  206. self.logger.error(f"[SENDER]MYSQL:{e}")
  207. return False
  208. def begin_pour(self, heat_data, bigbagweight):
  209. tmp = self.heat_tangible_temp.copy()
  210. tmp['optype'] = 1
  211. tmp['casterCode'] = heat_data.get('ccmNo', '')
  212. tmp['emptyLadleWeight'] = bigbagweight - float(heat_data.get('netWeight', 0))
  213. tmp['fullLadleWeight'] = bigbagweight
  214. tmp['grade'] = heat_data.get('grade', '')
  215. tmp['spec'] = heat_data.get('spec', '')
  216. tmp['heatsCode'] = heat_data.get('heatNo', '')
  217. tmp['ladleCode'] = heat_data.get('ladleNo', '')
  218. tmp['moltenSteelWeight'] = float(heat_data.get('netWeight', 0))
  219. tmp['startPourTime'] = heat_data.get('sendTime', '')
  220. if isinstance(tmp['startPourTime'], datetime.datetime):
  221. tmp['startPourTime'] = tmp['startPourTime'].strftime("%Y-%m-%d %H:%M:%S")
  222. self._cache[heat_data.get('heatNo', 'error')] = tmp
  223. self.send('heat_add', tmp)
  224. def end_pour(self, heat_data):
  225. if heat_data['heatNo'] in self._cache:
  226. tmp = self._cache[heat_data['heatNo']]
  227. tmp['optype'] = 2
  228. tmp['grade'] = heat_data.get('grade', '')
  229. tmp['spec'] = heat_data.get('spec', '')
  230. tmp['stopPourTime'] = heat_data.get('sendTime', '')
  231. self._cache[heat_data.get('heatNo', 'error')] = tmp
  232. self.send('heat_add', tmp)
  233. else:
  234. self.logger.error(f"[SENDER]停浇:找不到对应的开浇数据,炉号:{heat_data['heatNo']}")
  235. def heat_first(self, heat_data, cuttime = None):
  236. if not cuttime:
  237. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  238. tmp = self._cache[heat_data['heatNo']]
  239. tmp['optype'] = 2
  240. tmp['firstCutTime'] = cuttime
  241. self._cache[heat_data.get('heatNo', 'error')] = tmp
  242. self.send('heat_add', tmp)
  243. def heat_last(self, heat_data, cuttime = None):
  244. if not cuttime:
  245. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  246. tmp = self._cache[heat_data['heatNo']]
  247. tmp['optype'] = 2
  248. tmp['lastCutTime'] = cuttime
  249. self._cache[heat_data.get('heatNo', 'error')] = tmp
  250. self.send('heat_add', tmp)
  251. def begin_cut(self, heat_data, billetNo, heatnoIndex, sizing, speed, error=False):
  252. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  253. tmp = self.billet_tangible_temp.copy()
  254. tmp['optype'] = 1
  255. tmp['heatNo'] = heat_data.get('heatNo', '')
  256. tmp['billetNo'] = billetNo
  257. tmp['grade'] = heat_data.get('grade', '')
  258. tmp['spec'] = heat_data.get('spec', '')
  259. tmp['ladleNo'] = heat_data.get('ladleNo', '')
  260. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  261. tmp['strandNo'] = billetNo[-3]
  262. tmp['heatnoIndex'] = heatnoIndex
  263. tmp['length'] = sizing
  264. tmp['strandnoIndex'] = int(billetNo[-2:])
  265. tmp['castingSpeed'] = speed
  266. tmp['weight'] = sizing / 1000 * 0.2265
  267. tmp['cutStartTime'] = cuttime
  268. tmp['sign'] = "2" if error else "1"
  269. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  270. self.send('billet_add', tmp)
  271. def end_cut(self, heat_data, heatnoIndex):
  272. cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  273. trycount = 4
  274. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  275. while not tmp:
  276. trycount -= 1
  277. time.sleep(0.5)
  278. tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
  279. if not tmp:
  280. self.logger.error(f"[SENDER]停切:找不到对应的开切数据,炉号:{heat_data['heatNo']},序号:{heatnoIndex}")
  281. return None
  282. tmp['optype'] = 2
  283. tmp['cutStopTime'] = cuttime
  284. self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
  285. self.send('billet_add', tmp)
  286. def billet_upload(self, heat_data, billetNo, heatnoIndex, sizing, speed, starttime, stoptime, unionNo, error=False):
  287. tmp = self.billet_tangible_temp.copy()
  288. tmp['optype'] = 1
  289. tmp['heatNo'] = heat_data.get('heatNo', '')
  290. tmp['billetNo'] = billetNo
  291. tmp['grade'] = heat_data.get('grade', '')
  292. tmp['spec'] = heat_data.get('spec', '')
  293. tmp['ladleNo'] = heat_data.get('ladleNo', '')
  294. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  295. tmp['strandNo'] = billetNo[-3]
  296. tmp['heatnoIndex'] = heatnoIndex
  297. tmp['length'] = sizing
  298. tmp['strandnoIndex'] = int(billetNo[-2:])
  299. tmp['castingSpeed'] = speed
  300. tmp['weight'] = sizing / 1000 * 0.2265
  301. tmp['cutStartTime'] = starttime
  302. tmp['cutStopTime'] = stoptime
  303. tmp['assemblyNumber'] = unionNo
  304. tmp['sign'] = "2" if error else "1"
  305. self.send('billet_add', tmp)
  306. def billet_union(self, heat_data, unionNo, billetNos, sizing):
  307. tmp = self.billet_union_temp.copy()
  308. tmp['ccmNo'] = heat_data.get('ccmNo', '')
  309. tmp['heatNo'] = heat_data.get('heatNo', '')
  310. tmp['billetsNo'] = ','.join(billetNos)
  311. tmp['assemblyNumber'] = unionNo
  312. tmp['assemblyTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  313. tmp['length'] = sizing
  314. tmp['billetsNum'] = len(billetNos)
  315. tmp['billetWeight'] = sizing / 1000 * 0.2265 * tmp['billetsNum']
  316. self.send('billet_union', tmp)
  317. def host_send(self, ccmNo, billetNos, dst_str, craneNo = "", fromaddr = ""):
  318. tmp = self.host_send_temp.copy()
  319. tmp['ccmNo'] = ccmNo
  320. tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos
  321. tmp['billetHotsendTypeConfigId'] = "1" if dst_str == "棒一" else "15"
  322. tmp['liftingTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  323. tmp['vehicleNumber'] = craneNo
  324. tmp['location'] = fromaddr
  325. tmp['destination'] = dst_str
  326. self.send('host_send', tmp)
  327. def car_add(self, ccmNo, carNo, plate: str):
  328. tmp = self.car_add_temp.copy()
  329. tmp['ccmNo'] = ccmNo
  330. tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else ""
  331. tmp['licensePlate'] = plate
  332. tmp['positionNum'] = carNo
  333. self.send('car_add', tmp)
  334. def car_save(self, ccmNo, billetNos, plate, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0):
  335. tmp = self.car_save_temp.copy()
  336. tmp['ccmNo'] = ccmNo
  337. tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos
  338. tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else ""
  339. tmp['licensePlate'] = plate
  340. tmp['vehicleNumber'] = craneNo
  341. tmp['liftingTime'] = Ltime
  342. tmp['location'] = fromaddr
  343. tmp['destination'] = toaddr
  344. tmp['positionNum'] = toaddr[-1] if len(toaddr) else ""
  345. tmp['plateOrStack'] = plate
  346. tmp['layer'] = str(layer)
  347. tmp['address'] = str(address)
  348. self.send('car_save', tmp)
  349. def car_go(self, ccmNo, carNo, plate = ''):
  350. tmp = self.car_go_temp.copy()
  351. tmp['ccmNo'] = ccmNo
  352. tmp['positionNum'] = carNo
  353. tmp['licensePlate'] = plate
  354. self.send('car_go', tmp)
  355. def stack_add(self, ccmNo, billetNos, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0):
  356. tmp = self.stack_add_temp.copy()
  357. tmp['ccmNo'] = ccmNo
  358. tmp['billetHotsendTypeConfigId'] = self.stack_dict.get(toaddr, '')
  359. tmp['billetNoList'] = []
  360. tmp['billetNoList'].append({
  361. 'billetNos' : ','.join(billetNos) if isinstance(billetNos, list) else billetNos,
  362. 'vehicleNumber' : craneNo,
  363. 'liftingTime' : Ltime,
  364. 'location' : fromaddr,
  365. 'destination' : toaddr,
  366. 'positionNum' : "",
  367. 'plateOrStack' : toaddr,
  368. 'layer' : str(layer),
  369. 'address' : str(address)
  370. })
  371. self.send('stack_add', tmp)
  372. def plate_update(self, ccmNo, carNo, plate):
  373. tmp = self.car_go_temp.copy()
  374. tmp['ccmNo'] = ccmNo
  375. tmp['positionNum'] = carNo
  376. tmp['licensePlate'] = plate
  377. self.send('plate_update', tmp)
  378. def stack_car_save(self, ccmNo, craneNo, Ltime, fromaddr, toaddr, layer, address):
  379. tmp = self.stack_car_save_temp.copy()
  380. tmp['ccmNo'] = ccmNo
  381. tmp['billetHotsendTypeConfigId'] = self.stack_dict[fromaddr]
  382. tmp['vehicleNumber'] = craneNo
  383. tmp['liftingTime'] = Ltime
  384. tmp['location'] = fromaddr
  385. tmp['positionNum'] = toaddr[-1] if len(toaddr) else ""
  386. tmp['plateOrStack'] = fromaddr
  387. tmp['layer'] = str(layer)
  388. tmp['address'] = str(address)
  389. self.send('stack_car_save', tmp)
  390. def jiaoban(self, ccmNo, ban):
  391. tmp = {}
  392. tmp['changeShiftTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  393. tmp['ban'] = ban
  394. self.send('jiaoban', tmp)
  395. def jiaoban_6(self, ccmNo, ban):
  396. tmp = {}
  397. tmp['changeShiftTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
  398. tmp['ban'] = ban
  399. self.send('jiaoban_6', tmp)
  400. def plate_zhagang(self, carNo, plate):
  401. tmp = {}
  402. tmp['type'] = 'car_arrive'
  403. tmp['data'] = {
  404. 'car': plate,
  405. 'position': carNo
  406. }
  407. self.send('plate_zhagang', tmp)
  408. def mqtt_publish(self, topic, payload, qos=2):
  409. if not isinstance(payload, dict):
  410. self.logger.error('[SENDER]发送数据非dict类型')
  411. raise TypeError(f"Need a dict type but {type(payload)} given.")
  412. return False
  413. if self.mqtt_cli:
  414. if not self.mqtt_cli.is_connected():
  415. return False
  416. return self.mqtt_cli.publish(topic, json.dumps(payload), qos)
  417. else:
  418. return False
  419. if __name__ == '__main__':
  420. import logging
  421. logger = logging.getLogger(__name__)
  422. logger.setLevel(level = logging.DEBUG)
  423. formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  424. console = logging.StreamHandler()
  425. console.setLevel(logging.DEBUG)
  426. logger.addHandler(console)
  427. mqttcli = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 'python-mqtt-992-sender_test')
  428. mqttcli.username_pw_set('admin', '123456')
  429. mqttcli.connect('192.168.0.119', 1883)
  430. mqttcli.loop_start()
  431. test = Sender(logger)
  432. test.set_mqtt_client(mqttcli)
  433. test.http_flag = False
  434. testheat = {"netWeight":140.06,"ladleNo":"13","heatNo":"25501045","grade":"微氮铌钢","castState":1,"spec":"上若泰基","ccmNo":"5","sendTime":"2025-01-20 09:44:17"}
  435. #test.host_send("5", "255010455107", "棒一")
  436. #test.host_send("6", ["256009976601","256009976701","256009976801","256009976602"], "高线", "A2")
  437. test.car_add("6", "4", "陕E00901")
  438. test.car_save("6", "256012036107,256012036207,256012036407,256012036108,", "陕E00901", "A3", "2025-01-23 16:22:17", "6#小冷床(右)", "车位4")
  439. #test.car_go("6", "陕E00901")