import paho.mqtt.client as mqtt import json, time, requests class Sender: def __init__(self, logger): self.logger = logger self.topic = { 'billet_add': 'trace/performance/billet/add', 'heat_add': 'trace/performance/converter/add' } self.url = { 'billet_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/billetActual/reportRealTimeBilletBasicInfo', 'heat_add': 'http://192.168.0.119:8181/jeecg-boot/actualControl/heatsActuals/reportRealTimeHeatsActuals' } self.heat_tangible_temp = { 'optype' : 0, 'casterCode' : '', 'emptyLadleWeight' : 0.0, 'fullLadleWeight' : 0.0, 'grade' : '', 'spec' : '', 'heatsCode' : '', 'firstCutTime' : '', 'lastCutTime' : '', 'ladleCode' : '', 'moltenSteelWeight' : 0.0, 'startPourTime' : '', 'stopPourTime' : '', 'blankOutput' : 0.0, 'billetSum' : 0 } self.billet_tangible_temp = { 'optype' : 0, 'heatNo' : '', 'billetNo' : '', 'grade' : '', 'ladleNo' : '', 'ccmNo' : '', 'strandNo' : '', 'heatnoIndex' : 0, 'length' : 0, 'strandnoIndex' : 0, 'castingSpeed' : 0.0, 'actualLength' : 0, 'spec' : '', 'width' : 170, 'thickness' : 170, 'weight' : 0.0, 'cutStartTime' : '', 'cutStopTime' : '' } self.mqtt_cli = None self.http_flag = True self._cache = {} self._billet = {} def set_mqtt_client(self, cli): self.mqtt_cli = cli def send(self, purpose, payload, qos=2): if not isinstance(payload, dict): self.logger.error('[SENDER]发送数据非dict类型') raise TypeError(f"Need a dict type but {type(payload)} given.") return None self.logger.info(f"[SENDER]使用 {'MQTT'if self.mqtt_cli else ''} {'HTTP'if self.http_flag else ''} 发送数据") self.logger.debug(f"[SENDER]{purpose}:{payload}") if self.mqtt_cli: if not self.mqtt_cli.is_connected: self.logger.error('[SENDER]MQTT:发送失败,MQTT未连接') if self.mqtt_cli.publish(self.topic[purpose], json.dumps(payload), qos)[0]: self.logger.error('[SENDER]MQTT:数据包发送失败') else: self.logger.info('[SENDER]MQTT:数据包发送成功') if self.http_flag: try: self.logger.debug('[SENDER]HTTP:'+requests.post(self.url[purpose], json=payload, timeout=2).text) except: self.logger.error('[SENDER]HTTP:请求超时') def begin_pour(self, heat_data, bigbagweight): tmp = self.heat_tangible_temp.copy() tmp['optype'] = 1 tmp['casterCode'] = heat_data.get('ccmNo', '') tmp['emptyLadleWeight'] = bigbagweight - heat_data.get('netWeight', 0) tmp['fullLadleWeight'] = bigbagweight tmp['grade'] = heat_data.get('grade', '') tmp['spec'] = heat_data.get('spec', '') tmp['heatsCode'] = heat_data.get('heatNo', '') tmp['ladleCode'] = heat_data.get('ladleNo', '') tmp['moltenSteelWeight'] = heat_data.get('netWeight', 0) tmp['startPourTime'] = heat_data.get('sendTime', '') #此处应存储进数据库,暂时使用缓存处理 self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) def end_pour(self, heat_data): tmp = self._cache[heat_data['heatNo']] tmp['optype'] = 2 tmp['grade'] = heat_data.get('grade', '') tmp['spec'] = heat_data.get('spec', '') tmp['stopPourTime'] = heat_data.get('sendTime', '') #此处应存储进数据库,暂时使用缓存处理 self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) def heat_first(self, heat_data): cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp = self._cache[heat_data['heatNo']] tmp['optype'] = 2 tmp['firstCutTime'] = cuttime #此处应存储进数据库,暂时使用缓存处理 self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) def heat_last(self, heat_data): cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp = self._cache[heat_data['heatNo']] tmp['optype'] = 2 tmp['lastCutTime'] = cuttime #此处应存储进数据库,暂时使用缓存处理 self._cache[heat_data.get('heatNo', 'error')] = tmp self.send('heat_add', tmp) def begin_cut(self, heat_data, billetNo, heatnoIndex, sizing, speed): cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) tmp = self.billet_tangible_temp.copy() tmp['optype'] = 1 tmp['heatNo'] = heat_data.get('heatNo', '') tmp['billetNo'] = billetNo tmp['grade'] = heat_data.get('grade', '') tmp['spec'] = heat_data.get('spec', '') tmp['ladleNo'] = heat_data.get('ladleNo', '') tmp['ccmNo'] = heat_data.get('ccmNo', '') tmp['strandNo'] = billetNo[-3] tmp['heatnoIndex'] = heatnoIndex tmp['length'] = sizing tmp['strandnoIndex'] = int(billetNo[-2:]) tmp['castingSpeed'] = speed tmp['weight'] = sizing / 1000 * 0.2265 tmp['cutStartTime'] = cuttime #此处应存储进数据库,暂时使用缓存处理 self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp self.send('billet_add', tmp) def end_cut(self, heat_data, heatnoIndex): cuttime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) trycount = 4 tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {}) while not tmp: trycount -= 1 time.sleep(0.5) tmp = self._billet.get(heat_data['heatNo']+str(heatnoIndex), {}) if not tmp: self.logger.error(f"[SENDER]停切:找不到对应的开切数据,炉号:{heat_data['heatNo']},序号:{heatnoIndex}") return None tmp['optype'] = 2 tmp['cutStopTime'] = cuttime #此处应存储进数据库,暂时使用缓存处理 self._billet[heat_data.get('heatNo', '')+str(heatnoIndex)] = tmp self.send('billet_add', tmp) if __name__ == '__main__': mqttcli = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, 'python-mqtt-992-sender_test') mqttcli.username_pw_set('admin', '123456') mqttcli.connect('192.168.0.119', 1883) mqttcli.loop_start() test = Sender() test.set_mqtt_client(mqttcli) testheat = {"netWeight":140.06,"ladleNo":"13","heatNo":"24617099","grade":"微氮铌钢","castState":1,"spec":"上若泰基","ccmNo":"6","sendTime":"2024-11-28 12:57:09"} test.begin_pour(testheat, 210.0) time.sleep(1) test.heat_first(testheat) time.sleep(1) test.begin_cut(testheat, "246170996301", 5, 11000, 2.85) time.sleep(1) test.end_cut(testheat, 5) time.sleep(1) test.end_pour({"netWeight":0.0,"ladleNo":"","heatNo":"24617099","grade":"微氮铌钢","castState":0,"spec":"上若泰基","ccmNo":"6","sendTime":"2024-11-28 13:21:20"}) time.sleep(1) test.heat_last(testheat)