import time, threading from utils.logger import Logger from dbutils.pooled_db import PooledDB from utils.statepoint import Statepoint class MysqlData: def __init__(self, mysql_pool: PooledDB, logger: Logger): self.mysql_pool = mysql_pool self.logger = logger self.datas = {"5#到站信息": (), "6#到站信息": ()} self.points = {} self.thread_flag = True self.thread = threading.Thread(target=self.update_forever) self.thread.start() def get_value(self, ccm_no, default=()): sql = "SELECT heat_no, into_cc_time from nuo_cc_load_info where cc_no = %s order by into_cc_time desc limit 1;" try: with self.mysql_pool.connection() as conn: with conn.cursor() as cursor: cursor.execute(sql, (ccm_no,)) result = cursor.fetchall() if len(result) == 0: raise ValueError("Read no data from mysql.") return result[0] except Exception as e: self.logger.error(f"[mysql]:{e}") return default def update_forever(self, fru=500): while self.thread_flag: for i in self.datas.keys(): self.datas[i] = self.get_value(i[0]) if i in self.points.keys(): for j in self.points[i]: j.inject(self.datas[i]) time.sleep(fru/1000) def make_point(self, name, point_t=Statepoint): if name not in self.datas.keys(): raise NameError(f"Name {name} is not defined.") if name not in self.points.keys(): self.points[name] = [] tmp_point = point_t(self.datas[name], False) self.points[name].append(tmp_point) return tmp_point class MysqlDataSizing(MysqlData): def __init__(self, mysql_pool, logger): super().__init__(mysql_pool, logger) self.datas = {f"{i}流定尺": 0 for i in range(1, 9)} def get_value(self, strand_no, default=0): sql = "SELECT length FROM autocutwrite WHERE stream = %s ORDER BY cuttime DESC LIMIT 1;" try: with self.mysql_pool.connection() as conn: with conn.cursor() as cursor: cursor.execute(sql, (strand_no,)) result = cursor.fetchall() if len(result) == 0: raise ValueError("Read no data from mysql.") return result[0][0] except Exception as e: self.logger.error(f"[mysql]:{e}") return default