mysql_data.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import time, threading, snap7.util
  2. from utils.s7data import S7Client
  3. from utils.logger import Logger
  4. from dbutils.pooled_db import PooledDB
  5. class MysqlData:
  6. def __init__(self, mysql_pool: PooledDB, s7conn: S7Client, logger: Logger):
  7. self.mysql_pool = mysql_pool
  8. self.s7conn = s7conn
  9. self.logger = logger
  10. self.datas = {"is_use_model": False, "is_use_length": False, "棒一变棒三定尺": False}
  11. self.model_datas = [0 for i in range(8)]
  12. self.thread_flag = True
  13. self.thread = threading.Thread(target=self.update_forever)
  14. self.thread.start()
  15. self.thread_write = threading.Thread(target=self.write_forever)
  16. self.thread_write.start()
  17. def get_value(self, name, default=0):
  18. if name == "棒一变棒三定尺":
  19. sql = f"SELECT bool_value from industrial_data.realtime_data WHERE point_id = 64;"
  20. else:
  21. sql = f"SELECT {name} from length_config;"
  22. try:
  23. with self.mysql_pool.connection() as conn:
  24. with conn.cursor() as cursor:
  25. cursor.execute(sql)
  26. result = cursor.fetchall()
  27. if len(result) == 0:
  28. raise ValueError("Read no data from mysql.")
  29. return result[0][0]
  30. except Exception as e:
  31. self.logger.error(f"[mysql]:{e}")
  32. return default
  33. def get_model_value(self, strand_no, default=0):
  34. sql = f"SELECT model_compensation FROM prediction_full_data.prediction{strand_no}_full_data WHERE model_compensation IS NOT NULL ORDER BY prediction_timestamp DESC LIMIT 1;"
  35. try:
  36. with self.mysql_pool.connection() as conn:
  37. with conn.cursor() as cursor:
  38. cursor.execute(sql)
  39. result = cursor.fetchall()
  40. if len(result) == 0:
  41. raise ValueError("Read no data from mysql.")
  42. return result[0][0]
  43. except Exception as e:
  44. self.logger.error(f"[mysql]:{e}")
  45. return default
  46. def update_forever(self, fru=500):
  47. while self.thread_flag:
  48. for i in self.datas.keys():
  49. self.datas[i] = self.get_value(i)
  50. for i in range(8):
  51. self.model_datas[i] = self.get_model_value(i+1)
  52. time.sleep(fru/1000)
  53. def write_forever(self, fru=500):
  54. while self.thread_flag:
  55. data = bytearray(1)
  56. snap7.util.set_bool(data, 0, 1, self.datas["is_use_model"])
  57. snap7.util.set_bool(data, 0, 0, not self.datas["棒一变棒三定尺"])
  58. snap7.util.set_bool(data, 0, 3, self.datas["棒一变棒三定尺"])
  59. self.s7conn.db_write(420, 32, data)
  60. self.s7conn.db_write(421, 32, data)
  61. data = bytearray(32)
  62. for i in range(8):
  63. snap7.util.set_real(data, i*4, self.model_datas[i])
  64. self.s7conn.db_write(420, 0, data)
  65. self.s7conn.db_write(421, 0, data)
  66. time.sleep(fru/1000)