data_sender.py 3.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from utils.s7data import S7data
  2. from utils.logger import Logger
  3. from dbutils.pooled_db import PooledDB
  4. from datetime import datetime
  5. import pymysql, threading, time
  6. class Sender:
  7. def __init__(self, s7data: S7data, mysql_pool: PooledDB, logger: Logger, ipaddr):
  8. self.s7data = s7data
  9. self.mysql_pool = mysql_pool
  10. self.logger = logger
  11. self.point_info = self.get_init_node_info(ipaddr)
  12. self.thread_run = True
  13. self.thread = threading.Thread(target=self.update_all_forever)
  14. self.thread.start()
  15. def __del__(self):
  16. if isinstance(self.thread, threading.Thread) and self.thread.is_alive():
  17. self.thread_run = False
  18. self.thread.join()
  19. def update_all_forever(self, round_sleep=500):
  20. while self.thread_run:
  21. time.sleep(round_sleep/1000)
  22. threads = []
  23. for i in self.point_info.keys():
  24. thread = threading.Thread(target=self.update_point, args=(i,))
  25. thread.start()
  26. threads.append(thread)
  27. for i in threads:
  28. i.join()
  29. self.logger.debug("更新成功")
  30. def update_point(self, name):
  31. dataid = self.point_info[name][1]
  32. datatype = self.point_info[name][2]
  33. datatype = datatype if datatype != 'dint' else 'int'
  34. timestamp = datetime.now()
  35. datavalue = self.s7data.get_value(name)
  36. sql = "UPDATE realtime_data SET {}_value = %s, timestamp = %s WHERE point_id = %s;".format(datatype)
  37. sql2 = "INSERT INTO historical_data(point_id, {}_value, timestamp) VALUES(%s, %s, %s);".format(datatype)
  38. sql3 = "INSERT INTO recent_data(point_id, {}_value, timestamp) VALUES(%s, %s, %s);".format(datatype)
  39. with self.mysql_pool.connection() as conn:
  40. try:
  41. with conn.cursor() as cursor:
  42. cursor.execute(sql, (datavalue, timestamp, dataid))
  43. cursor.execute(sql2, (dataid, datavalue, timestamp))
  44. cursor.execute(sql3, (dataid, datavalue, timestamp))
  45. conn.commit()
  46. return True
  47. except pymysql.Error as e:
  48. self.logger.error(f"[SENDER]MYSQL:{e}")
  49. def get_init_node_info(self, ipaddr):
  50. sql = "SELECT name, id, type FROM data_points where ip_address = %s;"
  51. res_dict = {}
  52. with self.mysql_pool.connection() as conn:
  53. try:
  54. with conn.cursor() as cursor:
  55. cursor.execute(sql, (ipaddr,))
  56. res = cursor.fetchall()
  57. except pymysql.Error as e:
  58. self.logger.error(f"[SENDER]MYSQL:{e}")
  59. raise ConnectionError("mysql connection error."+str(e))
  60. for i in res:
  61. res_dict[i[0]] = i
  62. return res_dict
  63. def upload_billet(self, arg_dict: dict):
  64. sql = "INSERT INTO steel_billet_monitoring(strand_no, cutting_time, entry_time, exit_time, \
  65. water_temperature, water_pressure, water_volume, water_pressure_sd, steel_temperature, \
  66. drawing_speed, water_temperature_difference)\
  67. VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
  68. with self.mysql_pool.connection() as conn:
  69. try:
  70. with conn.cursor() as cursor:
  71. cursor.execute(sql, (arg_dict["strand_no"], arg_dict["cutting_time"], arg_dict["entry_time"], arg_dict["exit_time"],
  72. arg_dict.get("water_temperature"), arg_dict.get("water_pressure"), arg_dict.get("water_volume"), arg_dict.get("water_pressure_sd"),
  73. arg_dict.get("steel_temperature"), arg_dict.get("drawing_speed"), arg_dict.get("water_temperature_difference")))
  74. conn.commit()
  75. return True
  76. except pymysql.Error as e:
  77. self.logger.error(f"[SENDER]MYSQL:{e}")