| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- from utils.s7data import S7data
- from utils.logger import Logger
- from dbutils.pooled_db import PooledDB
- from datetime import datetime
- import pymysql, threading, time
- class Sender:
- def __init__(self, s7data: S7data, mysql_pool: PooledDB, logger: Logger, ipaddr):
- self.s7data = s7data
- self.mysql_pool = mysql_pool
- self.logger = logger
- self.point_info = self.get_init_node_info(ipaddr)
- self.thread_run = True
- self.thread = threading.Thread(target=self.update_all_forever)
- self.thread.start()
- def __del__(self):
- if isinstance(self.thread, threading.Thread) and self.thread.is_alive():
- self.thread_run = False
- self.thread.join()
- def update_all_forever(self, round_sleep=500):
- while self.thread_run:
- time.sleep(round_sleep/1000)
- threads = []
- for i in self.point_info.keys():
- thread = threading.Thread(target=self.update_point, args=(i,))
- thread.start()
- threads.append(thread)
- for i in threads:
- i.join()
- self.logger.debug("更新成功")
- def update_point(self, name):
- dataid = self.point_info[name][1]
- datatype = self.point_info[name][2]
- datatype = datatype if datatype != 'dint' else 'int'
- timestamp = datetime.now()
- datavalue = self.s7data.get_value(name)
- sql = "UPDATE realtime_data SET {}_value = %s, timestamp = %s WHERE point_id = %s;".format(datatype)
- sql2 = "INSERT INTO historical_data(point_id, {}_value, timestamp) VALUES(%s, %s, %s);".format(datatype)
- sql3 = "INSERT INTO recent_data(point_id, {}_value, timestamp) VALUES(%s, %s, %s);".format(datatype)
- with self.mysql_pool.connection() as conn:
- try:
- with conn.cursor() as cursor:
- cursor.execute(sql, (datavalue, timestamp, dataid))
- cursor.execute(sql2, (dataid, datavalue, timestamp))
- cursor.execute(sql3, (dataid, datavalue, timestamp))
- conn.commit()
- return True
- except pymysql.Error as e:
- self.logger.error(f"[SENDER]MYSQL:{e}")
- def get_init_node_info(self, ipaddr):
- sql = "SELECT name, id, type FROM data_points where ip_address = %s;"
- res_dict = {}
- with self.mysql_pool.connection() as conn:
- try:
- with conn.cursor() as cursor:
- cursor.execute(sql, (ipaddr,))
- res = cursor.fetchall()
- except pymysql.Error as e:
- self.logger.error(f"[SENDER]MYSQL:{e}")
- raise ConnectionError("mysql connection error."+str(e))
- for i in res:
- res_dict[i[0]] = i
-
- return res_dict
- def upload_billet(self, arg_dict: dict):
- sql = "INSERT INTO steel_billet_monitoring(strand_no, cutting_time, entry_time, exit_time, \
- water_temperature, water_pressure, water_volume, water_pressure_sd, steel_temperature, \
- drawing_speed, water_temperature_difference)\
- VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
- with self.mysql_pool.connection() as conn:
- try:
- with conn.cursor() as cursor:
- cursor.execute(sql, (arg_dict["strand_no"], arg_dict["cutting_time"], arg_dict["entry_time"], arg_dict["exit_time"],
- arg_dict.get("water_temperature"), arg_dict.get("water_pressure"), arg_dict.get("water_volume"), arg_dict.get("water_pressure_sd"),
- arg_dict.get("steel_temperature"), arg_dict.get("drawing_speed"), arg_dict.get("water_temperature_difference")))
- conn.commit()
- return True
- except pymysql.Error as e:
- self.logger.error(f"[SENDER]MYSQL:{e}")
|