123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- import paho.mqtt.client as mqtt
- import json, time, requests
- class Sender:
- def __init__(self, logger):
- self.logger = logger
- self.topic = {
- 'billet_add': 'trace/performance/billet/add',
- 'heat_add': 'trace/performance/converter/add',
- 'billet_union': 'trace/billet/billetAssemblyNumber/add',
- 'host_send': 'syn/billetHotsendBase/save',
- 'car_add': 'syn/storageBill/add',
- 'car_save': 'syn/billetHotsendBase/shipp/save',
- 'car_go': 'syn/billetHotsendBase/shipp/depart',
- 'stack_add': 'syn/billet/addStacking',
- 'plate_update': 'syn/storageBill/update'
- }
- self.url = {
- 'billet_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/billetActual/reportRealTimeBilletBasicInfo',
- 'heat_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/heatsActuals/reportRealTimeHeatsActuals',
- 'billet_union': '',
- 'host_send': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add',
- 'car_add': 'http://192.168.0.119:8181/storageBill/add',
- 'car_save': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add',
- 'car_go': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/rodLineDepart',
- 'stack_add': 'http://192.168.0.119:8181/billet/stackingAndLoadingVehicles/addStacking',
- 'plate_update': ''
- }
- self.heat_tangible_temp = {
- 'optype' : 0,
- 'casterCode' : '',
- 'emptyLadleWeight' : 0.0,
- 'fullLadleWeight' : 0.0,
- 'grade' : '',
- 'spec' : '',
- 'heatsCode' : '',
- 'firstCutTime' : '',
- 'lastCutTime' : '',
- 'ladleCode' : '',
- 'moltenSteelWeight' : 0.0,
- 'startPourTime' : '',
- 'stopPourTime' : '',
- 'blankOutput' : 0.0,
- 'billetSum' : 0
- }
- self.billet_tangible_temp = {
- 'optype' : 0,
- 'heatNo' : '',
- 'billetNo' : '',
- 'grade' : '',
- 'ladleNo' : '',
- 'ccmNo' : '',
- 'strandNo' : '',
- 'heatnoIndex' : 0,
- 'length' : 0,
- 'strandnoIndex' : 0,
- 'castingSpeed' : 0.0,
- 'actualLength' : 0,
- 'spec' : '',
- 'width' : 170,
- 'thickness' : 170,
- 'weight' : 0.0,
- 'cutStartTime' : '',
- 'cutStopTime' : '',
- 'assemblyNumber' : ''
- }
- self.billet_union_temp = {
- "ccmNo": 0, # 铸机号 int数值类型
- "heatNo": "", # 炉号 字符串类型
- "billetsNo": "", # 坯号集合 字符串类型 逗号隔开
- "assemblyNumber": "", # 组坯号 字符串类型
- "assemblyTime": "", # 组坯时间 Date日期类型
- "length": 0, # 定尺 int数值类型
- "billetsNum": 0, # 钢坯数量 int数值类型
- "billetWeight": 0 # 钢坯坯重 double高精度数值类型
- }
- self.host_send_temp = {
- "ccmNo": "",
- "billetNos": "",
- "billetHotsendTypeConfigId": "",
- "vehicleNumber": "",
- "liftingTime": "",
- "location": "",
- "destination":"",
- "positionNum":"",
- "plateOrStack":"",
- "layer":"",
- "address":""
- }
- self.car_add_temp = {
- "ccmNo": "",
- "licensePlate": "",
- "billetHotsendTypeConfigId": "",
- "positionNum": ""
- }
- self.car_save_temp = {
- "ccmNo": "",
- "billetNos": "",
- "billetHotsendTypeConfigId": "",
- "licensePlate": "",
- "vehicleNumber": "",
- "liftingTime": "",
- "location": "",
- "destination":"",
- "positionNum":"",
- "plateOrStack":"",
- "layer":"",
- "address":""
- }
- self.car_go_temp = {
- "ccmNo": "",
- "positionNum": "",
- "licensePlate": ""
- }
- self.stack_add_temp = {
- "ccmNo": "",
- "billetHotsendTypeConfigId": "",
- "billetNoList": []
- }
- self.stack_dict = {
- "601堆垛": '6',
- "602堆垛": '7',
- "604堆垛": '8',
- "步进冷床": '9',
- "501堆垛": '10',
- }
- self.mqtt_cli = None
- self.http_flag = True
- self._cache = {}
- self._billet = {}
- def set_mqtt_client(self, cli):
- self.mqtt_cli = cli
- def send(self, purpose, payload, qos=2):
- if not isinstance(payload, dict):
- self.logger.error('[SENDER]发送数据非dict类型')
- raise TypeError(f"Need a dict type but {type(payload)} given.")
- return None
-
- self.logger.info(f"[SENDER]使用 {'MQTT'if self.mqtt_cli else ''} {'HTTP'if self.http_flag else ''} 发送数据")
- self.logger.debug(f"[SENDER]{purpose}:{payload}")
-
- if self.mqtt_cli and purpose in self.topic:
- if not self.mqtt_cli.is_connected:
- self.logger.error('[SENDER]MQTT:发送失败,MQTT未连接')
- if self.mqtt_cli.publish(self.topic[purpose], json.dumps(payload), qos)[0]:
- self.logger.error('[SENDER]MQTT:数据包发送失败')
- else:
- self.logger.info('[SENDER]MQTT:数据包发送成功')
- if self.http_flag and purpose in self.url:
- try:
- self.logger.debug('[SENDER]HTTP:'+requests.post(self.url[purpose], json=payload, timeout=2).text)
- except:
- self.logger.error('[SENDER]HTTP:请求超时')
- def begin_pour(self, heat_data, bigbagweight):
- tmp = self.heat_tangible_temp.copy()
- tmp['optype'] = 1
- tmp['casterCode'] = heat_data.get('ccmNo', '')
- tmp['emptyLadleWeight'] = bigbagweight - heat_data.get('netWeight', 0)
- tmp['fullLadleWeight'] = bigbagweight
- tmp['grade'] = heat_data.get('grade', '')
- tmp['spec'] = heat_data.get('spec', '')
- tmp['heatsCode'] = heat_data.get('heatNo', '')
- tmp['ladleCode'] = heat_data.get('ladleNo', '')
- tmp['moltenSteelWeight'] = heat_data.get('netWeight', 0)
- tmp['startPourTime'] = heat_data.get('sendTime', '')
-
- #此处应存储进数据库,暂时使用缓存处理
- self._cache[heat_data.get('heatNo', 'error')] = tmp
- self.send('heat_add', tmp)
- def end_pour(self, heat_data):
- if heat_data['heatNo'] in self._cache:
- tmp = self._cache[heat_data['heatNo']]
- tmp['optype'] = 2
- tmp['grade'] = heat_data.get('grade', '')
- tmp['spec'] = heat_data.get('spec', '')
- tmp['stopPourTime'] = heat_data.get('sendTime', '')
- #此处应存储进数据库,暂时使用缓存处理
- self._cache[heat_data.get('heatNo', 'error')] = tmp
- self.send('heat_add', tmp)
- else:
- self.logger.error(f"[SENDER]停浇:找不到对应的开浇数据,炉号:{heat_data['heatNo']}")
- def heat_first(self, heat_data, cuttime = None):
- if not cuttime:
- cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
- tmp = self._cache[heat_data['heatNo']]
- tmp['optype'] = 2
- tmp['firstCutTime'] = cuttime
- #此处应存储进数据库,暂时使用缓存处理
- self._cache[heat_data.get('heatNo', 'error')] = tmp
- self.send('heat_add', tmp)
- def heat_last(self, heat_data, cuttime = None):
- if not cuttime:
- cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
- tmp = self._cache[heat_data['heatNo']]
- tmp['optype'] = 2
- tmp['lastCutTime'] = cuttime
- #此处应存储进数据库,暂时使用缓存处理
- self._cache[heat_data.get('heatNo', 'error')] = tmp
- self.send('heat_add', tmp)
- def begin_cut(self, heat_data, billetNo, heatnoIndex, sizing, speed):
- cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
- tmp = self.billet_tangible_temp.copy()
- tmp['optype'] = 1
- tmp['heatNo'] = heat_data.get('heatNo', '')
- tmp['billetNo'] = billetNo
- tmp['grade'] = heat_data.get('grade', '')
- tmp['spec'] = heat_data.get('spec', '')
- tmp['ladleNo'] = heat_data.get('ladleNo', '')
- tmp['ccmNo'] = heat_data.get('ccmNo', '')
- tmp['strandNo'] = billetNo[-3]
- tmp['heatnoIndex'] = heatnoIndex
- tmp['length'] = sizing
- tmp['strandnoIndex'] = int(billetNo[-2:])
- tmp['castingSpeed'] = speed
- tmp['weight'] = sizing / 1000 * 0.2265
- tmp['cutStartTime'] = cuttime
- #此处应存储进数据库,暂时使用缓存处理
- self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
- self.send('billet_add', tmp)
- def end_cut(self, heat_data, heatnoIndex):
- cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
- trycount = 4
- tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
- while not tmp:
- trycount -= 1
- time.sleep(0.5)
- tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {})
- if not tmp:
- self.logger.error(f"[SENDER]停切:找不到对应的开切数据,炉号:{heat_data['heatNo']},序号:{heatnoIndex}")
- return None
- tmp['optype'] = 2
- tmp['cutStopTime'] = cuttime
- #此处应存储进数据库,暂时使用缓存处理
- self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp
- self.send('billet_add', tmp)
- def billet_upload(self, heat_data, billetNo, heatnoIndex, sizing, speed, starttime, stoptime, unionNo):
- tmp = self.billet_tangible_temp.copy()
- tmp['optype'] = 1
- tmp['heatNo'] = heat_data.get('heatNo', '')
- tmp['billetNo'] = billetNo
- tmp['grade'] = heat_data.get('grade', '')
- tmp['spec'] = heat_data.get('spec', '')
- tmp['ladleNo'] = heat_data.get('ladleNo', '')
- tmp['ccmNo'] = heat_data.get('ccmNo', '')
- tmp['strandNo'] = billetNo[-3]
- tmp['heatnoIndex'] = heatnoIndex
- tmp['length'] = sizing
- tmp['strandnoIndex'] = int(billetNo[-2:])
- tmp['castingSpeed'] = speed
- tmp['weight'] = sizing / 1000 * 0.2265
- tmp['cutStartTime'] = starttime
- tmp['cutStopTime'] = stoptime
- tmp['assemblyNumber'] = unionNo
- #此处应存储进数据库
- self.send('billet_add', tmp)
- def billet_union(self, heat_data, unionNo, billetNos, sizing):
- tmp = self.billet_union_temp.copy()
-
- tmp['ccmNo'] = heat_data.get('ccmNo', '')
- tmp['heatNo'] = heat_data.get('heatNo', '')
- tmp['billetsNo'] = ','.join(billetNos)
- tmp['assemblyNumber'] = unionNo
- tmp['assemblyTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
- tmp['length'] = sizing
- tmp['billetsNum'] = len(billetNos)
- tmp['billetWeight'] = sizing / 1000 * 0.2265 * tmp['billetsNum']
- #此处应存储进数据库
- self.send('billet_union', tmp)
- def host_send(self, ccmNo, billetNos, dst_str, craneNo = "", fromaddr = ""):
- tmp = self.host_send_temp.copy()
-
- tmp['ccmNo'] = ccmNo
- tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos
- tmp['billetHotsendTypeConfigId'] = "1" if dst_str == "棒一" else "15"
- tmp['liftingTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
- tmp['vehicleNumber'] = craneNo
- tmp['location'] = fromaddr
- tmp['destination'] = dst_str
- #此处应存储进数据库
- self.send('host_send', tmp)
- def car_add(self, ccmNo, carNo, plate: str):
- tmp = self.car_add_temp.copy()
-
- tmp['ccmNo'] = ccmNo
- tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else ""
- tmp['licensePlate'] = plate
- tmp['positionNum'] = carNo
- #此处应存储进数据库
- self.send('car_add', tmp)
- def car_save(self, ccmNo, billetNos, plate, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0):
- tmp = self.car_save_temp.copy()
-
- tmp['ccmNo'] = ccmNo
- tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos
- tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else ""
- tmp['licensePlate'] = plate
- tmp['vehicleNumber'] = craneNo
- tmp['liftingTime'] = Ltime
- tmp['location'] = fromaddr
- tmp['destination'] = toaddr
- tmp['positionNum'] = toaddr[-1] if len(toaddr) else ""
- tmp['plateOrStack'] = plate
- tmp['layer'] = str(layer)
- tmp['address'] = str(address)
- #此处应存储进数据库
- self.send('car_save', tmp)
- def car_go(self, ccmNo, carNo, plate = ''):
- tmp = self.car_go_temp.copy()
-
- tmp['ccmNo'] = ccmNo
- tmp['positionNum'] = carNo
- tmp['licensePlate'] = plate
- #此处应存储进数据库
- self.send('car_go', tmp)
- def stack_add(self, ccmNo, billetNos, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0):
- tmp = self.stack_add_temp.copy()
-
- tmp['ccmNo'] = ccmNo
- tmp['billetHotsendTypeConfigId'] = self.stack_dict.get(toaddr, '')
- tmp['billetNoList'] = []
- tmp['billetNoList'].append({
- 'billetNos' : ','.join(billetNos) if isinstance(billetNos, list) else billetNos,
- 'vehicleNumber' : craneNo,
- 'liftingTime' : Ltime,
- 'location' : fromaddr,
- 'destination' : toaddr,
- 'positionNum' : "",
- 'plateOrStack' : toaddr,
- 'layer' : str(layer),
- 'address' : str(address)
- })
- #此处应存储进数据库
- self.send('stack_add', tmp)
- def plate_update(self, ccmNo, carNo, plate):
- tmp = self.car_go_temp.copy()
-
- tmp['ccmNo'] = ccmNo
- tmp['positionNum'] = carNo
- tmp['licensePlate'] = plate
- #此处应存储进数据库
- self.send('plate_update', tmp)
- if __name__ == '__main__':
- import logging
- logger = logging.getLogger(__name__)
- logger.setLevel(level = logging.DEBUG)
- formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- console = logging.StreamHandler()
- console.setLevel(logging.DEBUG)
- logger.addHandler(console)
- mqttcli = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 'python-mqtt-992-sender_test')
- mqttcli.username_pw_set('admin', '123456')
- mqttcli.connect('192.168.0.119', 1883)
- mqttcli.loop_start()
- test = Sender(logger)
- test.set_mqtt_client(mqttcli)
- test.http_flag = False
- testheat = {"netWeight":140.06,"ladleNo":"13","heatNo":"25501045","grade":"微氮铌钢","castState":1,"spec":"上若泰基","ccmNo":"5","sendTime":"2025-01-20 09:44:17"}
- #test.host_send("5", "255010455107", "棒一")
- #test.host_send("6", ["256009976601","256009976701","256009976801","256009976602"], "高线", "A2")
- test.car_add("6", "4", "陕E00901")
- test.car_save("6", "256012036107,256012036207,256012036407,256012036108,", "陕E00901", "A3", "2025-01-23 16:22:17", "6#小冷床(右)", "车位4")
- #test.car_go("6", "陕E00901")
|