import paho.mqtt.client as mqtt import json, time, requests class Sender: def __init__(self, logger): self.logger = logger self.topic = { 'billet_add': 'trace/performance/billet/add', 'heat_add': 'trace/performance/converter/add', 'billet_union': 'trace/billet/billetAssemblyNumber/add', 'host_send': 'syn/billetHotsendBase/save', 'car_add': 'syn/storageBill/add', 'car_save': 'syn/billetHotsendBase/shipp/save', 'car_go': 'syn/billetHotsendBase/shipp/depart', 'stack_add': 'syn/billet/addStacking', 'plate_update': 'syn/storageBill/update' } self.url = { 'billet_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/billetActual/reportRealTimeBilletBasicInfo', 'heat_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/heatsActuals/reportRealTimeHeatsActuals', 'billet_union': '', 'host_send': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add', 'car_add': 'http://192.168.0.119:8181/storageBill/add', 'car_save': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add', 'car_go': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/rodLineDepart', 'stack_add': 'http://192.168.0.119:8181/billet/stackingAndLoadingVehicles/addStacking', 'plate_update': '' } self.heat_tangible_temp = { 'optype' : 0, 'casterCode' : '', 'emptyLadleWeight' : 0.0, 'fullLadleWeight' : 0.0, 'grade' : '', 'spec' : '', 'heatsCode' : '', 'firstCutTime' : '', 'lastCutTime' : '', 'ladleCode' : '', 'moltenSteelWeight' : 0.0, 'startPourTime' : '', 'stopPourTime' : '', 'blankOutput' : 0.0, 'billetSum' : 0 } self.billet_tangible_temp = { 'optype' : 0, 'heatNo' : '', 'billetNo' : '', 'grade' : '', 'ladleNo' : '', 'ccmNo' : '', 'strandNo' : '', 'heatnoIndex' : 0, 'length' : 0, 'strandnoIndex' : 0, 'castingSpeed' : 0.0, 'actualLength' : 0, 'spec' : '', 'width' : 170, 'thickness' : 170, 'weight' : 0.0, 'cutStartTime' : '', 'cutStopTime' : '', 'assemblyNumber' : '' } self.billet_union_temp = { "ccmNo": 0, # 铸机号 int数值类型 "heatNo": "", # 炉号 字符串类型 "billetsNo": "", # 坯号集合 字符串类型 逗号隔开 "assemblyNumber": "", # 组坯号 字符串类型 "assemblyTime": "", # 组坯时间 Date日期类型 "length": 0, # 定尺 int数值类型 "billetsNum": 0, # 钢坯数量 int数值类型 "billetWeight": 0 # 钢坯坯重 double高精度数值类型 } self.host_send_temp = { "ccmNo": "", "billetNos": "", "billetHotsendTypeConfigId": "", "vehicleNumber": "", "liftingTime": "", "location": "", "destination":"", "positionNum":"", "plateOrStack":"", "layer":"", "address":"" } self.car_add_temp = { "ccmNo": "", "licensePlate": "", "billetHotsendTypeConfigId": "", "positionNum": "" } self.car_save_temp = { "ccmNo": "", "billetNos": "", "billetHotsendTypeConfigId": "", "licensePlate": "", "vehicleNumber": "", "liftingTime": "", "location": "", "destination":"", "positionNum":"", "plateOrStack":"", "layer":"", "address":"" } self.car_go_temp = { "ccmNo": "", "positionNum": "", "licensePlate": "" } self.stack_add_temp = { "ccmNo": "", "billetHotsendTypeConfigId": "", "billetNoList": [] } self.stack_dict = { "601堆垛": '6', "602堆垛": '7', "604堆垛": '8', "步进冷床": '9', "501堆垛": '10', } self.mqtt_cli = None self.http_flag = True self._cache = {} self._billet = {} def set_mqtt_client(self, cli): self.mqtt_cli = cli def send(self, purpose, payload, qos=2): if not isinstance(payload, dict): self.logger.error('[SENDER]发送数据非dict类型') raise TypeError(f"Need a dict type but {type(payload)} given.") return None self.logger.info(f"[SENDER]使用 {'MQTT'if self.mqtt_cli else ''} {'HTTP'if self.http_flag else ''} 发送数据") self.logger.debug(f"[SENDER]{purpose}:{payload}") if self.mqtt_cli and purpose in self.topic: if not self.mqtt_cli.is_connected: self.logger.error('[SENDER]MQTT:发送失败,MQTT未连接') if self.mqtt_cli.publish(self.topic[purpose], json.dumps(payload), qos)[0]: self.logger.error('[SENDER]MQTT:数据包发送失败') else: self.logger.info('[SENDER]MQTT:数据包发送成功') if self.http_flag and purpose in self.url: try: self.logger.debug('[SENDER]HTTP:'+requests.post(self.url[purpose], json=payload, timeout=2).text) except: self.logger.error('[SENDER]HTTP:请求超时') def begin_pour(self, heat_data, bigbagweight): tmp = self.heat_tangible_temp.copy() tmp['optype'] = 1 tmp['casterCode'] = heat_data.get('ccmNo', '') tmp['emptyLadleWeight'] = bigbagweight - heat_data.get('netWeight', 0) tmp['fullLadleWeight'] = bigbagweight tmp['grade'] = heat_data.get('grade', '') tmp['spec'] = heat_data.get('spec', '') tmp['heatsCode'] = heat_data.get('heatNo', '') tmp['ladleCode'] = heat_data.get('ladleNo', '') tmp['moltenSteelWeight'] = heat_data.get('netWeight', 0) tmp['startPourTime'] = heat_data.get('sendTime', '') #此处应存储进数据库,暂时使用缓存处理 self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) def end_pour(self, heat_data): if heat_data['heatNo'] in self._cache: tmp = self._cache[heat_data['heatNo']] tmp['optype'] = 2 tmp['grade'] = heat_data.get('grade', '') tmp['spec'] = heat_data.get('spec', '') tmp['stopPourTime'] = heat_data.get('sendTime', '') #此处应存储进数据库,暂时使用缓存处理 self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) else: self.logger.error(f"[SENDER]停浇:找不到对应的开浇数据,炉号:{heat_data['heatNo']}") def heat_first(self, heat_data, cuttime = None): if not cuttime: cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp = self._cache[heat_data['heatNo']] tmp['optype'] = 2 tmp['firstCutTime'] = cuttime #此处应存储进数据库,暂时使用缓存处理 self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) def heat_last(self, heat_data, cuttime = None): if not cuttime: cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp = self._cache[heat_data['heatNo']] tmp['optype'] = 2 tmp['lastCutTime'] = cuttime #此处应存储进数据库,暂时使用缓存处理 self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) def begin_cut(self, heat_data, billetNo, heatnoIndex, sizing, speed): cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp = self.billet_tangible_temp.copy() tmp['optype'] = 1 tmp['heatNo'] = heat_data.get('heatNo', '') tmp['billetNo'] = billetNo tmp['grade'] = heat_data.get('grade', '') tmp['spec'] = heat_data.get('spec', '') tmp['ladleNo'] = heat_data.get('ladleNo', '') tmp['ccmNo'] = heat_data.get('ccmNo', '') tmp['strandNo'] = billetNo[-3] tmp['heatnoIndex'] = heatnoIndex tmp['length'] = sizing tmp['strandnoIndex'] = int(billetNo[-2:]) tmp['castingSpeed'] = speed tmp['weight'] = sizing / 1000 * 0.2265 tmp['cutStartTime'] = cuttime #此处应存储进数据库,暂时使用缓存处理 self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp self.send('billet_add', tmp) def end_cut(self, heat_data, heatnoIndex): cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) trycount = 4 tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {}) while not tmp: trycount -= 1 time.sleep(0.5) tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {}) if not tmp: self.logger.error(f"[SENDER]停切:找不到对应的开切数据,炉号:{heat_data['heatNo']},序号:{heatnoIndex}") return None tmp['optype'] = 2 tmp['cutStopTime'] = cuttime #此处应存储进数据库,暂时使用缓存处理 self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp self.send('billet_add', tmp) def billet_upload(self, heat_data, billetNo, heatnoIndex, sizing, speed, starttime, stoptime, unionNo): tmp = self.billet_tangible_temp.copy() tmp['optype'] = 1 tmp['heatNo'] = heat_data.get('heatNo', '') tmp['billetNo'] = billetNo tmp['grade'] = heat_data.get('grade', '') tmp['spec'] = heat_data.get('spec', '') tmp['ladleNo'] = heat_data.get('ladleNo', '') tmp['ccmNo'] = heat_data.get('ccmNo', '') tmp['strandNo'] = billetNo[-3] tmp['heatnoIndex'] = heatnoIndex tmp['length'] = sizing tmp['strandnoIndex'] = int(billetNo[-2:]) tmp['castingSpeed'] = speed tmp['weight'] = sizing / 1000 * 0.2265 tmp['cutStartTime'] = starttime tmp['cutStopTime'] = stoptime tmp['assemblyNumber'] = unionNo #此处应存储进数据库 self.send('billet_add', tmp) def billet_union(self, heat_data, unionNo, billetNos, sizing): tmp = self.billet_union_temp.copy() tmp['ccmNo'] = heat_data.get('ccmNo', '') tmp['heatNo'] = heat_data.get('heatNo', '') tmp['billetsNo'] = ','.join(billetNos) tmp['assemblyNumber'] = unionNo tmp['assemblyTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp['length'] = sizing tmp['billetsNum'] = len(billetNos) tmp['billetWeight'] = sizing / 1000 * 0.2265 * tmp['billetsNum'] #此处应存储进数据库 self.send('billet_union', tmp) def host_send(self, ccmNo, billetNos, dst_str, craneNo = "", fromaddr = ""): tmp = self.host_send_temp.copy() tmp['ccmNo'] = ccmNo tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos tmp['billetHotsendTypeConfigId'] = "1" if dst_str == "棒一" else "15" tmp['liftingTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp['vehicleNumber'] = craneNo tmp['location'] = fromaddr tmp['destination'] = dst_str #此处应存储进数据库 self.send('host_send', tmp) def car_add(self, ccmNo, carNo, plate: str): tmp = self.car_add_temp.copy() tmp['ccmNo'] = ccmNo tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else "" tmp['licensePlate'] = plate tmp['positionNum'] = carNo #此处应存储进数据库 self.send('car_add', tmp) def car_save(self, ccmNo, billetNos, plate, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0): tmp = self.car_save_temp.copy() tmp['ccmNo'] = ccmNo tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else "" tmp['licensePlate'] = plate tmp['vehicleNumber'] = craneNo tmp['liftingTime'] = Ltime tmp['location'] = fromaddr tmp['destination'] = toaddr tmp['positionNum'] = toaddr[-1] if len(toaddr) else "" tmp['plateOrStack'] = plate tmp['layer'] = str(layer) tmp['address'] = str(address) #此处应存储进数据库 self.send('car_save', tmp) def car_go(self, ccmNo, carNo, plate = ''): tmp = self.car_go_temp.copy() tmp['ccmNo'] = ccmNo tmp['positionNum'] = carNo tmp['licensePlate'] = plate #此处应存储进数据库 self.send('car_go', tmp) def stack_add(self, ccmNo, billetNos, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0): tmp = self.stack_add_temp.copy() tmp['ccmNo'] = ccmNo tmp['billetHotsendTypeConfigId'] = self.stack_dict.get(toaddr, '') tmp['billetNoList'] = [] tmp['billetNoList'].append({ 'billetNos' : ','.join(billetNos) if isinstance(billetNos, list) else billetNos, 'vehicleNumber' : craneNo, 'liftingTime' : Ltime, 'location' : fromaddr, 'destination' : toaddr, 'positionNum' : "", 'plateOrStack' : toaddr, 'layer' : str(layer), 'address' : str(address) }) #此处应存储进数据库 self.send('stack_add', tmp) def plate_update(self, ccmNo, carNo, plate): tmp = self.car_go_temp.copy() tmp['ccmNo'] = ccmNo tmp['positionNum'] = carNo tmp['licensePlate'] = plate #此处应存储进数据库 self.send('plate_update', tmp) if __name__ == '__main__': import logging logger = logging.getLogger(__name__) logger.setLevel(level = logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console = logging.StreamHandler() console.setLevel(logging.DEBUG) logger.addHandler(console) mqttcli = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 'python-mqtt-992-sender_test') mqttcli.username_pw_set('admin', '123456') mqttcli.connect('192.168.0.119', 1883) mqttcli.loop_start() test = Sender(logger) test.set_mqtt_client(mqttcli) test.http_flag = False testheat = {"netWeight":140.06,"ladleNo":"13","heatNo":"25501045","grade":"微氮铌钢","castState":1,"spec":"上若泰基","ccmNo":"5","sendTime":"2025-01-20 09:44:17"} #test.host_send("5", "255010455107", "棒一") #test.host_send("6", ["256009976601","256009976701","256009976801","256009976602"], "高线", "A2") test.car_add("6", "4", "陕E00901") test.car_save("6", "256012036107,256012036207,256012036407,256012036108,", "陕E00901", "A3", "2025-01-23 16:22:17", "6#小冷床(右)", "车位4") #test.car_go("6", "陕E00901")