import paho.mqtt.client as mqtt import json, time, requests, pymysql, threading class Sender: def __init__(self, logger): self.logger = logger self.topic = { 'billet_add': 'trace/performance/billet/add', 'heat_add': 'trace/performance/converter/add', 'billet_union': 'trace/billet/billetAssemblyNumber/add', 'host_send': 'syn/billetHotsendBase/save', 'car_add': 'syn/storageBill/add', 'car_save': 'syn/billetHotsendBase/shipp/save', 'car_go': 'syn/billetHotsendBase/shipp/depart', 'stack_add': 'syn/billet/addStacking', 'plate_update': 'syn/storageBill/update', 'stack_car_save': 'syn/billet/stackingAndLoadingVehicles/loading', 'jiaoban': 'syn/billet/changeShift', 'jiaoban_6': 'syn/billet/changeSixShift', 'plate_zhagang': 'trace/performance/car/arrive' } self.url = { 'billet_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/billetActual/reportRealTimeBilletBasicInfo', 'heat_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/heatsActuals/reportRealTimeHeatsActuals', 'billet_union': '', 'host_send': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add', 'car_add': 'http://192.168.0.119:8181/storageBill/add', 'car_save': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/add', 'car_go': 'http://192.168.0.119:8181/billetHotsendBase/billetHotsendBase/rodLineDepart', 'stack_add': 'http://192.168.0.119:8181/billet/stackingAndLoadingVehicles/addStacking', 'plate_update': '', 'stack_car_save': '', 'jiaoban': '', 'jiaoban_6': '', 'plate_zhagang': '' } self.heat_tangible_temp = { 'optype' : 0, 'casterCode' : '', 'emptyLadleWeight' : 0.0, 'fullLadleWeight' : 0.0, 'grade' : '', 'spec' : '', 'heatsCode' : '', 'firstCutTime' : '', 'lastCutTime' : '', 'ladleCode' : '', 'moltenSteelWeight' : 0.0, 'startPourTime' : '', 'stopPourTime' : '', 'blankOutput' : 0.0, 'billetSum' : 0 } self.billet_tangible_temp = { 'optype' : 0, 'heatNo' : '', 'billetNo' : '', 'grade' : '', 'ladleNo' : '', 'ccmNo' : '', 'strandNo' : '', 'heatnoIndex' : 0, 'length' : 0, 'strandnoIndex' : 0, 'castingSpeed' : 0.0, 'actualLength' : 0, 'spec' : '', 'width' : 170, 'thickness' : 170, 'weight' : 0.0, 'cutStartTime' : '', 'cutStopTime' : '', 'assemblyNumber' : '', 'sign': "1" } self.billet_union_temp = { "ccmNo": 0, # 铸机号 int数值类型 "heatNo": "", # 炉号 字符串类型 "billetsNo": "", # 坯号集合 字符串类型 逗号隔开 "assemblyNumber": "", # 组坯号 字符串类型 "assemblyTime": "", # 组坯时间 Date日期类型 "length": 0, # 定尺 int数值类型 "billetsNum": 0, # 钢坯数量 int数值类型 "billetWeight": 0 # 钢坯坯重 double高精度数值类型 } self.host_send_temp = { "ccmNo": "", "billetNos": "", "billetHotsendTypeConfigId": "", "vehicleNumber": "", "liftingTime": "", "location": "", "destination":"", "positionNum":"", "plateOrStack":"", "layer":"", "address":"" } self.car_add_temp = { "ccmNo": "", "licensePlate": "", "billetHotsendTypeConfigId": "", "positionNum": "" } self.car_save_temp = { "ccmNo": "", "billetNos": "", "billetHotsendTypeConfigId": "", "licensePlate": "", "vehicleNumber": "", "liftingTime": "", "location": "", "destination":"", "positionNum":"", "plateOrStack":"", "layer":"", "address":"" } self.car_go_temp = { "ccmNo": "", "positionNum": "", "licensePlate": "" } self.stack_add_temp = { "ccmNo": "", "billetHotsendTypeConfigId": "", "billetNoList": [] } self.stack_car_save_temp = { "ccmNo": "", "billetHotsendTypeConfigId": "", "address": "", "layer": "", "vehicleNumber": "", "liftingTime": "", "location": "", "positionNum":"", "plateOrStack":"" } self.stack_dict = { "601堆垛": '6', "602堆垛": '7', "604堆垛": '8', "步进冷床堆垛": '9', "501堆垛": '10', } self.mqtt_cli = None self.http_flag = False self.mysql_flag = False self._cache = {} self._billet = {} self.mysql_lock = threading.Lock() def set_mqtt_client(self, cli): self.mqtt_cli = cli def set_mysql_client(self, cli: pymysql.connections.Connection): self.mysql_cli = cli def send(self, purpose, payload, qos=2): if not isinstance(payload, dict): self.logger.error('[SENDER]发送数据非dict类型') raise TypeError(f"Need a dict type but {type(payload)} given.") return None self.logger.info(f"[SENDER]使用 {'MQTT'if self.mqtt_cli else ''} {'HTTP'if self.http_flag else ''} {'MYSQL'if self.mysql_flag else ''} 发送数据") self.logger.debug(f"[SENDER]{purpose}:{payload}") if self.mqtt_cli and purpose in self.topic: if not self.mqtt_cli.is_connected: self.logger.error('[SENDER]MQTT:发送失败,MQTT未连接') if self.mqtt_cli.publish(self.topic[purpose], json.dumps(payload), qos)[0]: self.logger.error('[SENDER]MQTT:数据包发送失败') else: self.logger.info('[SENDER]MQTT:数据包发送成功') if self.http_flag and purpose in self.url: try: self.logger.debug('[SENDER]HTTP:'+requests.post(self.url[purpose], json=payload, timeout=2).text) except: self.logger.error('[SENDER]HTTP:请求超时') if self.mysql_flag and hasattr(self, 'mysql_cli') and self.mysql_cli: if self.save_mysql(purpose, payload): self.logger.info('[SENDER]MYSQL:数据保存成功') else: self.logger.error('[SENDER]MYSQL:数据保存失败') def save_mysql(self, table, args: dict): if not hasattr(self, 'mysql_cli') or self.mysql_cli == None: return None keys = [] solved_values = [] for k, v in args.items(): keys.append(k) if 'time' in k.lower(): if v: solved_values.append(v) else: solved_values.append(None) elif isinstance(v, int) or isinstance(v, float) or isinstance(v, str): solved_values.append(v) else: solved_values.append(json.dumps(v)) sql = f"INSERT INTO {table}({', '.join(keys)}) VALUES({', '.join(['%s' for i in range(len(keys))])})" with self.mysql_lock: try: self.mysql_cli.ping() with self.mysql_cli.cursor() as cursor: cursor.execute(sql, tuple(solved_values)) self.mysql_cli.commit() return True except pymysql.Error as e: self.logger.error(f"[SENDER]MYSQL:{e}") return False def begin_pour(self, heat_data, bigbagweight): tmp = self.heat_tangible_temp.copy() tmp['optype'] = 1 tmp['casterCode'] = heat_data.get('ccmNo', '') tmp['emptyLadleWeight'] = bigbagweight - float(heat_data.get('netWeight', 0)) tmp['fullLadleWeight'] = bigbagweight tmp['grade'] = heat_data.get('grade', '') tmp['spec'] = heat_data.get('spec', '') tmp['heatsCode'] = heat_data.get('heatNo', '') tmp['ladleCode'] = heat_data.get('ladleNo', '') tmp['moltenSteelWeight'] = float(heat_data.get('netWeight', 0)) tmp['startPourTime'] = heat_data.get('sendTime', '') self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) def end_pour(self, heat_data): if heat_data['heatNo'] in self._cache: tmp = self._cache[heat_data['heatNo']] tmp['optype'] = 2 tmp['grade'] = heat_data.get('grade', '') tmp['spec'] = heat_data.get('spec', '') tmp['stopPourTime'] = heat_data.get('sendTime', '') self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) else: self.logger.error(f"[SENDER]停浇:找不到对应的开浇数据,炉号:{heat_data['heatNo']}") def heat_first(self, heat_data, cuttime = None): if not cuttime: cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp = self._cache[heat_data['heatNo']] tmp['optype'] = 2 tmp['firstCutTime'] = cuttime self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) def heat_last(self, heat_data, cuttime = None): if not cuttime: cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp = self._cache[heat_data['heatNo']] tmp['optype'] = 2 tmp['lastCutTime'] = cuttime self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) def begin_cut(self, heat_data, billetNo, heatnoIndex, sizing, speed, error=False): cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp = self.billet_tangible_temp.copy() tmp['optype'] = 1 tmp['heatNo'] = heat_data.get('heatNo', '') tmp['billetNo'] = billetNo tmp['grade'] = heat_data.get('grade', '') tmp['spec'] = heat_data.get('spec', '') tmp['ladleNo'] = heat_data.get('ladleNo', '') tmp['ccmNo'] = heat_data.get('ccmNo', '') tmp['strandNo'] = billetNo[-3] tmp['heatnoIndex'] = heatnoIndex tmp['length'] = sizing tmp['strandnoIndex'] = int(billetNo[-2:]) tmp['castingSpeed'] = speed tmp['weight'] = sizing / 1000 * 0.2265 tmp['cutStartTime'] = cuttime tmp['sign'] = "2" if error else "1" self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp self.send('billet_add', tmp) def end_cut(self, heat_data, heatnoIndex): cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) trycount = 4 tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {}) while not tmp: trycount -= 1 time.sleep(0.5) tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {}) if not tmp: self.logger.error(f"[SENDER]停切:找不到对应的开切数据,炉号:{heat_data['heatNo']},序号:{heatnoIndex}") return None tmp['optype'] = 2 tmp['cutStopTime'] = cuttime self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp self.send('billet_add', tmp) def billet_upload(self, heat_data, billetNo, heatnoIndex, sizing, speed, starttime, stoptime, unionNo, error=False): tmp = self.billet_tangible_temp.copy() tmp['optype'] = 1 tmp['heatNo'] = heat_data.get('heatNo', '') tmp['billetNo'] = billetNo tmp['grade'] = heat_data.get('grade', '') tmp['spec'] = heat_data.get('spec', '') tmp['ladleNo'] = heat_data.get('ladleNo', '') tmp['ccmNo'] = heat_data.get('ccmNo', '') tmp['strandNo'] = billetNo[-3] tmp['heatnoIndex'] = heatnoIndex tmp['length'] = sizing tmp['strandnoIndex'] = int(billetNo[-2:]) tmp['castingSpeed'] = speed tmp['weight'] = sizing / 1000 * 0.2265 tmp['cutStartTime'] = starttime tmp['cutStopTime'] = stoptime tmp['assemblyNumber'] = unionNo tmp['sign'] = "2" if error else "1" self.send('billet_add', tmp) def billet_union(self, heat_data, unionNo, billetNos, sizing): tmp = self.billet_union_temp.copy() tmp['ccmNo'] = heat_data.get('ccmNo', '') tmp['heatNo'] = heat_data.get('heatNo', '') tmp['billetsNo'] = ','.join(billetNos) tmp['assemblyNumber'] = unionNo tmp['assemblyTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp['length'] = sizing tmp['billetsNum'] = len(billetNos) tmp['billetWeight'] = sizing / 1000 * 0.2265 * tmp['billetsNum'] self.send('billet_union', tmp) def host_send(self, ccmNo, billetNos, dst_str, craneNo = "", fromaddr = ""): tmp = self.host_send_temp.copy() tmp['ccmNo'] = ccmNo tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos tmp['billetHotsendTypeConfigId'] = "1" if dst_str == "棒一" else "15" tmp['liftingTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp['vehicleNumber'] = craneNo tmp['location'] = fromaddr tmp['destination'] = dst_str self.send('host_send', tmp) def car_add(self, ccmNo, carNo, plate: str): tmp = self.car_add_temp.copy() tmp['ccmNo'] = ccmNo tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else "" tmp['licensePlate'] = plate tmp['positionNum'] = carNo self.send('car_add', tmp) def car_save(self, ccmNo, billetNos, plate, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0): tmp = self.car_save_temp.copy() tmp['ccmNo'] = ccmNo tmp['billetNos'] = ','.join(billetNos) if isinstance(billetNos, list) else billetNos tmp['billetHotsendTypeConfigId'] = "16" if plate.startswith("陕E") else "" tmp['licensePlate'] = plate tmp['vehicleNumber'] = craneNo tmp['liftingTime'] = Ltime tmp['location'] = fromaddr tmp['destination'] = toaddr tmp['positionNum'] = toaddr[-1] if len(toaddr) else "" tmp['plateOrStack'] = plate tmp['layer'] = str(layer) tmp['address'] = str(address) self.send('car_save', tmp) def car_go(self, ccmNo, carNo, plate = ''): tmp = self.car_go_temp.copy() tmp['ccmNo'] = ccmNo tmp['positionNum'] = carNo tmp['licensePlate'] = plate self.send('car_go', tmp) def stack_add(self, ccmNo, billetNos, craneNo, Ltime, fromaddr, toaddr, layer=0, address=0): tmp = self.stack_add_temp.copy() tmp['ccmNo'] = ccmNo tmp['billetHotsendTypeConfigId'] = self.stack_dict.get(toaddr, '') tmp['billetNoList'] = [] tmp['billetNoList'].append({ 'billetNos' : ','.join(billetNos) if isinstance(billetNos, list) else billetNos, 'vehicleNumber' : craneNo, 'liftingTime' : Ltime, 'location' : fromaddr, 'destination' : toaddr, 'positionNum' : "", 'plateOrStack' : toaddr, 'layer' : str(layer), 'address' : str(address) }) self.send('stack_add', tmp) def plate_update(self, ccmNo, carNo, plate): tmp = self.car_go_temp.copy() tmp['ccmNo'] = ccmNo tmp['positionNum'] = carNo tmp['licensePlate'] = plate self.send('plate_update', tmp) def stack_car_save(self, ccmNo, craneNo, Ltime, fromaddr, toaddr, layer, address): tmp = self.stack_car_save_temp.copy() tmp['ccmNo'] = ccmNo tmp['billetHotsendTypeConfigId'] = self.stack_dict[fromaddr] tmp['vehicleNumber'] = craneNo tmp['liftingTime'] = Ltime tmp['location'] = fromaddr tmp['positionNum'] = toaddr[-1] if len(toaddr) else "" tmp['plateOrStack'] = fromaddr tmp['layer'] = str(layer) tmp['address'] = str(address) self.send('stack_car_save', tmp) def jiaoban(self, ccmNo, ban): tmp = {} tmp['changeShiftTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp['ban'] = ban self.send('jiaoban', tmp) def jiaoban_6(self, ccmNo, ban): tmp = {} tmp['changeShiftTime'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp['ban'] = ban self.send('jiaoban_6', tmp) def plate_zhagang(self, carNo, plate): tmp = {} tmp['type'] = 'car_arrive' tmp['data'] = { 'car': plate, 'position': carNo } self.send('plate_zhagang', tmp) def mqtt_publish(self, topic, payload, qos=2): if not isinstance(payload, dict): self.logger.error('[SENDER]发送数据非dict类型') raise TypeError(f"Need a dict type but {type(payload)} given.") return False if self.mqtt_cli: if not self.mqtt_cli.is_connected(): return False return self.mqtt_cli.publish(topic, json.dumps(payload), qos) else: return False if __name__ == '__main__': import logging logger = logging.getLogger(__name__) logger.setLevel(level = logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console = logging.StreamHandler() console.setLevel(logging.DEBUG) logger.addHandler(console) mqttcli = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 'python-mqtt-992-sender_test') mqttcli.username_pw_set('admin', '123456') mqttcli.connect('192.168.0.119', 1883) mqttcli.loop_start() test = Sender(logger) test.set_mqtt_client(mqttcli) test.http_flag = False testheat = {"netWeight":140.06,"ladleNo":"13","heatNo":"25501045","grade":"微氮铌钢","castState":1,"spec":"上若泰基","ccmNo":"5","sendTime":"2025-01-20 09:44:17"} #test.host_send("5", "255010455107", "棒一") #test.host_send("6", ["256009976601","256009976701","256009976801","256009976602"], "高线", "A2") test.car_add("6", "4", "陕E00901") test.car_save("6", "256012036107,256012036207,256012036407,256012036108,", "陕E00901", "A3", "2025-01-23 16:22:17", "6#小冷床(右)", "车位4") #test.car_go("6", "陕E00901")