batcher.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. from utils.statepoint import *
  2. from utils.mqttdata import *
  3. from utils.s7data import *
  4. from models.data_sender import *
  5. from models.mysql_data import MysqlData, MysqlDataSizing
  6. import logging, datetime, queue, requests
  7. def _get_heat_init_strand(heat_no, retry = 3):
  8. url = f"http://192.168.0.119:7005/actualControl/billetActual/heatNoStats?heatNo={heat_no}"
  9. while retry:
  10. try:
  11. response = requests.get(url)
  12. result = response.json()
  13. if result["success"]:
  14. return result["result"]["strandTotal"]
  15. except Exception as e:
  16. warnings.warn(str(e))
  17. finally:
  18. retry -= 1
  19. return [0] * 8
  20. class Heat:
  21. def __init__(self, ccm_no, heat_no, start_pour_time: datetime.datetime = None, init_strands_status: list[int] = [0] * 8):
  22. self.ccm_no = str(ccm_no)
  23. self.heat_no = str(heat_no)
  24. self.start_pour_time = start_pour_time
  25. self.using_event = threading.Event()
  26. self.strands_req = init_strands_status
  27. self.total = sum(init_strands_status)
  28. self.billet_union_count = {}
  29. self.resource_lock = threading.Lock()
  30. @classmethod
  31. def from_dict(cls, src_dict: dict):
  32. return cls(
  33. src_dict["ccmNo"],
  34. src_dict["heatNo"],
  35. src_dict["sendTime"] if "sendTime" in src_dict and src_dict["sendTime"] else None
  36. )
  37. def to_dict(self):
  38. return {
  39. 'ccmNo': self.ccm_no,
  40. 'heatNo': self.heat_no,
  41. 'sendTime': self.start_pour_time.strftime("%Y-%m-%d %H:%M:%S")
  42. }
  43. def begin_use(self):
  44. self.using_event.set()
  45. def is_poured(self) -> bool:
  46. if self.start_pour_time:
  47. return True
  48. return False
  49. def is_using(self) -> bool:
  50. return self.using_event.is_set()
  51. def init_strands_status(self, init_status: list[int]):
  52. if self.is_using():
  53. raise ValueError("无法对已开始出坯的炉进行流初始化")
  54. with self.resource_lock:
  55. self.strands_req = init_status
  56. self.total = sum(init_status)
  57. def claim_strand_seq(self, strand_no) -> tuple[int]:
  58. if not self.is_using():
  59. return 0
  60. index = strand_no - 1
  61. with self.resource_lock:
  62. self.total += 1
  63. self.strands_req[index] += 1
  64. return self.strands_req[index], self.total
  65. def claim_union_seq(self, length: int) -> int:
  66. if not self.is_using():
  67. return 0
  68. with self.resource_lock:
  69. if length not in self.billet_union_count:
  70. self.billet_union_count[length] = 0
  71. self.billet_union_count[length] += 1
  72. return self.billet_union_count[length]
  73. class Billet:
  74. def __init__(self, cutting_time: datetime.datetime = None):
  75. if cutting_time:
  76. self.start_cutting_time: datetime.datetime = cutting_time
  77. else:
  78. self.start_cutting_time: datetime.datetime = datetime.datetime.now()
  79. self.heat: Heat = None
  80. self.stop_cutting_time: datetime.datetime = None
  81. self.strand_no: int = None
  82. self._strand_seq: int = None
  83. self._heat_seq: int = None
  84. self.sizing = None
  85. self.drawing_speed = None
  86. @property
  87. def billet_no(self):
  88. if self.heat and self.strand_no and self.strand_seq:
  89. return self.heat_no + self.ccm_no + str(self.strand_no) + f"{self.strand_seq:02d}"
  90. @property
  91. def ccm_no(self):
  92. if self.heat:
  93. return self.heat.ccm_no
  94. @property
  95. def heat_no(self):
  96. if self.heat:
  97. return self.heat.heat_no
  98. @property
  99. def strand_seq(self):
  100. return self._strand_seq
  101. @property
  102. def heat_seq(self):
  103. return self._heat_seq
  104. def batch_strand(self, strand_no = None) -> bool:
  105. if strand_no:
  106. self.strand_no = strand_no
  107. if not (self.strand_no and self.heat):
  108. return False
  109. res = self.heat.claim_strand_seq(self.strand_no)
  110. if res:
  111. self._strand_seq, self._heat_seq = res
  112. return True
  113. return False
  114. class BatchHeat:
  115. def __init__(self, ccm_no, manual_change_heat_sig: Statepoint,
  116. mysql_heat_point: Statepoint, mqtt_heat_point: Statepoint,
  117. ladle_weight_point_1: Statepoint, ladle_weight_point_2: Statepoint,
  118. sender: Sender, logger: logging.Logger, queue_max = 5):
  119. self.ccm_no = ccm_no
  120. self.manual_change_heat_sig = manual_change_heat_sig
  121. self.mysql_heat_point = mysql_heat_point
  122. self.mqtt_heat_point = mqtt_heat_point
  123. self.ladle_weight_point_1 = ladle_weight_point_1
  124. self.ladle_weight_point_2 = ladle_weight_point_2
  125. self.sender = sender
  126. self.logger = logger
  127. self.last_enqueue = ()
  128. self.heats_queue = queue.PriorityQueue(queue_max)
  129. self.heat_info_shared = {}
  130. self.heat_info_lock = threading.Lock()
  131. self.current_heat: Heat = None
  132. self._thread: threading.Thread = None
  133. self._thread_event = threading.Event()
  134. self.change_heat_lock = threading.Lock()
  135. self.manual_change_heat_sig.set_excite_action(self.manual_change_heat_action)
  136. self.mysql_heat_point.set_excite_action(self.heat_arrive_in_action)
  137. self.mqtt_heat_point.set_excite_action(self.heat_start_pour_action)
  138. self.current_heat_point = Statepoint(0)
  139. def heat_arrive_in_action(self):
  140. # 维护内部状态
  141. self.mysql_heat_point.set_state(False)
  142. # 获取炉次到站信息
  143. heat_no, into_time = self.mysql_heat_point.data
  144. time_for_check = into_time + datetime.timedelta(minutes=12)
  145. self.heat_info_shared[heat_no] = {}
  146. self.logger.debug(f"[Counter]检测到炉次进站{heat_no}")
  147. # 大包重量选择算法
  148. ladle_weight = max(self.ladle_weight_point_1.data, self.ladle_weight_point_2.data)
  149. # 等待诺德开浇信号判断
  150. while self.heat_info_shared[heat_no] == {}:
  151. # 如果超时,根据入站时间预测开浇时间
  152. if datetime.datetime.now() > time_for_check:
  153. with self.heat_info_lock:
  154. if "heatNo" not in self.heat_info_shared[heat_no]:
  155. self.heat_info_shared[heat_no]["ccmNo"] = self.ccm_no
  156. self.heat_info_shared[heat_no]["heatNo"] = heat_no
  157. self.heat_info_shared[heat_no]["sendTime"] = into_time + datetime.timedelta(minutes=5)
  158. break
  159. time.sleep(2)
  160. # 标志是否为铸机开机的第一次开浇
  161. first_heat_flag = self.current_heat == None and self.heats_queue.empty()
  162. # 写入日志
  163. if first_heat_flag:
  164. self.logger.info(f'[Counter]首次开浇:{heat_no}')
  165. else:
  166. self.logger.info(f'[Counter]炉次开浇:{heat_no}')
  167. # 维护浇铸队列
  168. try:
  169. self.last_enqueue = (self.heat_info_shared[heat_no]["sendTime"], Heat.from_dict(self.heat_info_shared[heat_no]))
  170. self.heats_queue.put(self.last_enqueue, True, 3)
  171. except TimeoutError as e:
  172. self.logger.error(f"[Counter]炉次队列已满({self.heats_queue.maxsize}),存在异常:{e}")
  173. return
  174. # 使用sender向外发送信号
  175. self.sender.begin_pour(self.heat_info_shared[heat_no], ladle_weight)
  176. self.heat_info_shared.pop(heat_no)
  177. def heat_start_pour_action(self):
  178. heat_info = self.mqtt_heat_point.data
  179. heat_no = str(heat_info['heatNo'])
  180. if "sendTime" in heat_info:
  181. heat_info["sendTime"] = datetime.datetime.strptime(heat_info["sendTime"], "%Y-%m-%d %H:%M:%S")
  182. if heat_no in self.heat_info_shared:
  183. with self.heat_info_lock:
  184. self.heat_info_shared[heat_no] = heat_info
  185. self.logger.debug(f'[Counter]收到诺德开浇信号:{heat_no}')
  186. elif self.last_enqueue[-1].heat_no == heat_no:
  187. self.logger.warning(f"[Counter]诺德开浇信号晚到{heat_no}")
  188. else:
  189. self.logger.info(f'[Counter]开浇信号表示倒浇操作:{heat_no}')
  190. # 维护浇铸队列
  191. try:
  192. self.last_enqueue = (heat_info["sendTime"], Heat.from_dict(heat_info))
  193. self.heats_queue.put(self.last_enqueue, True, 3)
  194. except TimeoutError as e:
  195. self.logger.error(f"[Counter]炉次队列已满({self.heats_queue.maxsize}),无法插入{heat_no},异常:{e}")
  196. return
  197. def manual_change_heat_action(self):
  198. with self.change_heat_lock:
  199. heat_info = self.manual_change_heat_sig.data
  200. if self.current_heat and heat_info['heatNo'] == self.current_heat.heat_no:
  201. return
  202. if "startPourTime" in heat_info:
  203. heat_info["sendTime"] = datetime.datetime.strptime(heat_info["startPourTime"], "%Y-%m-%d %H:%M:%S")
  204. self.logger.info(f"手动换炉触发:{self.current_heat.heat_no if self.current_heat else "未知"}->{heat_info['heatNo']}")
  205. self.sender.begin_pour(heat_info, max(self.ladle_weight_point_1.data, self.ladle_weight_point_2.data))
  206. self.current_heat = Heat.from_dict(heat_info)
  207. self.current_heat.init_strands_status(_get_heat_init_strand(self.current_heat.heat_no))
  208. self.current_heat.begin_use()
  209. self.current_heat_point.inject(int(self.current_heat.heat_no))
  210. def loop_forever(self):
  211. try:
  212. while self._thread_event.is_set():
  213. if not self.heats_queue.empty() and self.heats_queue.queue[0][0] + datetime.timedelta(minutes=10) <= datetime.datetime.now():
  214. with self.change_heat_lock:
  215. _, heat = self.heats_queue.get()
  216. if self.current_heat:
  217. self.logger.debug(f"[Counter]炉次划分{self.current_heat.heat_no}->{heat.heat_no}")
  218. else:
  219. self.logger.debug(f"[Counter]连铸机出坯启动,炉次为{heat.heat_no}")
  220. self.current_heat = heat
  221. self.current_heat.init_strands_status(_get_heat_init_strand(self.current_heat.heat_no))
  222. self.current_heat.begin_use()
  223. self.current_heat_point.inject(int(self.current_heat.heat_no))
  224. except Exception as e:
  225. self._thread_event.clear()
  226. self.logger.error(f"换炉线程意外退出:{e}")
  227. def loop_start(self):
  228. if self._thread and self._thread.is_alive():
  229. if self._thread_event.is_set():
  230. return
  231. raise ChildProcessError("换炉线程异常持续运行")
  232. self._thread = threading.Thread(target=self.loop_forever)
  233. self._thread_event.set()
  234. self._thread.start()
  235. def loop_stop(self):
  236. self._thread_event.clear()
  237. if self._thread and self._thread.is_alive():
  238. self._thread.join(3)
  239. def get_current_heat_info(self) -> dict:
  240. # 当换炉进行中时,获取炉次信息阻塞等待
  241. with self.change_heat_lock:
  242. return self.current_heat
  243. def __del__(self):
  244. self.loop_stop()
  245. class BatchBillet:
  246. def __init__(self, heat: BatchHeat,
  247. cutting_sig_point_list: list[Statepoint], short_sizing_point_list: list[Statepoint],
  248. sizing_point_list: list[Statepoint], drawing_speed_point_list: list[Statepoint],
  249. logger: logging.Logger):
  250. self.heat = heat
  251. self.cutting_sig_point_list = cutting_sig_point_list
  252. self.short_sizing_point_list = short_sizing_point_list
  253. self.sizing_point_list = sizing_point_list
  254. self.drawing_speed_point_list = drawing_speed_point_list
  255. self.logger = logger
  256. self.output_billet_queues = [queue.Queue(2) for _ in range(8)]
  257. self.last_enqueue: list[Billet] = [None for _ in range(8)]
  258. self._backfill_on_off = threading.Event()
  259. self.dspeed_integrate_point_list = None
  260. for i in range(8):
  261. self.cutting_sig_point_list[i].set_excite_action(lambda index=i: self.cutting_action(index))
  262. def cutting_action(self, index: int):
  263. # 先获取实时信息
  264. sizing = self.sizing_point_list[index].data
  265. drawing_speed = self.drawing_speed_point_list[index].data
  266. # 维护拉速回填机制,顺便利用拉速判断短尺
  267. if self._backfill_on_off.is_set():
  268. predict_length = self.dspeed_integrate_point_list[index].data
  269. self.dspeed_integrate_point_list[index].data = 0
  270. predict_length *= 1000
  271. if predict_length < 3000:
  272. predict_length += sizing
  273. self.logger.debug(f"[Counter]{index+1}流钢坯预测长度为{predict_length}")
  274. if abs(self.short_sizing_point_list[index].data - predict_length) < abs(sizing - predict_length):
  275. sizing = self.short_sizing_point_list[index].data
  276. self.logger.info(f"[Counter]{index+1}流钢坯开始切割")
  277. billet = Billet()
  278. billet.sizing = sizing
  279. billet.drawing_speed = drawing_speed
  280. billet.strand_no = index + 1
  281. billet.heat = self.heat.get_current_heat_info()
  282. if billet.heat:
  283. self.trans_billet(billet)
  284. else:
  285. self.logger.warning(f"[Counter]{index+1}流钢坯未获取到炉次信息")
  286. def trans_billet(self, billet: Billet):
  287. try:
  288. self.output_billet_queues[billet.strand_no-1].put(billet, True, 10)
  289. self.last_enqueue[billet.strand_no-1] = billet
  290. except Exception as e:
  291. self.logger.error(f"{billet.strand_no}流队列加入钢坯失败:{e}")
  292. def set_dspeed_backfill(self, dspeed_integrate_point_list: list[Statepoint]):
  293. if self.dspeed_integrate_point_list:
  294. raise ValueError("拉速补偿数据点不可重复设置")
  295. self.dspeed_integrate_point_list = dspeed_integrate_point_list.copy()
  296. for i in range(8):
  297. self.dspeed_integrate_point_list[i].set_convertor(lambda data, index=i: self.dspeed_convertor(data, index))
  298. self.dspeed_integrate_point_list[i].data = 0
  299. self.dspeed_integrate_point_list[i].set_excite_action(lambda index=i: self.dspeed_backfill_action(index))
  300. def open_dspeed_backfill(self):
  301. if not self.drawing_speed_point_list:
  302. raise ValueError("未设置拉速补偿数据点")
  303. if self._backfill_on_off.is_set():
  304. return
  305. self._backfill_on_off.set()
  306. for i in self.drawing_speed_point_list:
  307. i.data = 0
  308. i.set_state(0)
  309. i.allow_update()
  310. def close_dspeed_backfill(self):
  311. self._backfill_on_off.clear()
  312. if self.drawing_speed_point_list:
  313. for i in self.drawing_speed_point_list:
  314. i.allow_update(0)
  315. def dspeed_convertor(self, data, index):
  316. return data * 1000 >= self.sizing_point_list[index].data
  317. def dspeed_backfill_action(self, index: int):
  318. # 维护内部状态
  319. self.dspeed_integrate_point_list[index].data = 0
  320. self.dspeed_integrate_point_list[index].set_state(False)
  321. in_time = datetime.datetime.now()
  322. while datetime.datetime.now() - in_time < datetime.timedelta(seconds=20):
  323. if self.last_enqueue[index] and datetime.datetime.now() - self.last_enqueue[index].start_cutting_time < datetime.timedelta(seconds=90):
  324. return
  325. time.sleep(0.5)
  326. self.logger.info(f"[Counter]{index+1}流拉速判据回填钢坯")
  327. billet = Billet(in_time)
  328. billet.sizing = self.sizing_point_list[index].data
  329. billet.drawing_speed = self.drawing_speed_point_list[index].data
  330. billet.strand_no = index + 1
  331. # 炉次信息可能因换炉阻塞,上面先获取实时信息
  332. billet.heat = self.heat.get_current_heat_info()
  333. if billet.heat:
  334. self.trans_billet(billet)
  335. else:
  336. self.logger.warning(f"[Counter]{index+1}流钢坯未获取到炉次信息")
  337. class Batcher:
  338. @property
  339. def data_mqtt(self):
  340. if self._data_mqtt.cli == None:
  341. raise ValueError('The MQTT connection to MES has not been initialized.')
  342. return self._data_mqtt
  343. @property
  344. def data_s7(self):
  345. if self._data_s7 == None:
  346. raise ValueError('The S7 connection to the casting machine PLC has not been initialized.')
  347. return self._data_s7
  348. @property
  349. def sender(self):
  350. if self._sender == None:
  351. raise ValueError('The sender has not been set.')
  352. return self._sender
  353. def __init__(self, data_mqtt: Mqttdata, data_s7: S7data, data_web: Mqttdata, ccmNo, logger: logging.Logger, sender: Sender, data_nuo: MysqlData, data_qbc: MysqlDataSizing):
  354. self._data_mqtt = data_mqtt
  355. self._data_s7 = data_s7
  356. self.data_web = data_web
  357. self._sender = sender
  358. self.logger = logger
  359. self.ccm_no = ccmNo
  360. self.data_nuo = data_nuo
  361. self.data_qbc = data_qbc
  362. self.logger.info(f"[Counter]分炉分坯模块:{ccmNo}号机模块启动")
  363. self.ladle_weight_1 = self.data_s7.make_point('大包重量1')
  364. self.ladle_weight_2 = self.data_s7.make_point('大包重量2')
  365. self.begin_pour_sig = self.data_mqtt.make_point(f'{ccmNo}#开浇信号')
  366. self.end_pour_sig = self.data_mqtt.make_point(f'{ccmNo}#停浇信号')
  367. self.nuo_heat_point = self.data_nuo.make_point(f"{ccmNo}#到站信息")
  368. self.manual_change_heat_sig = data_web.make_point(f"{ccmNo}#手动换炉")
  369. self.begin_cutting_sig = [self.data_s7.make_point(f'L{i+1}切割信号[0]') for i in range(8)]
  370. self.end_cutting_sig = [self.data_s7.make_point(f'L{i+1}切割信号[1]') for i in range(8)]
  371. if ccmNo == '5':
  372. self.length_cutting = [self.data_qbc.make_point(f'{i+1}流定尺') for i in range(8)]
  373. elif ccmNo == '6':
  374. self.length_cutting = [self.data_s7.make_point(f'L{i+1}定尺') for i in range(8)]
  375. self.short_length_cutting = [self.data_s7.make_point(f'L{i+1}短尺') for i in range(8)]
  376. self.drawing_speed = [self.data_s7.make_point(f'L{i+1}拉速') for i in range(8)]
  377. # 拉速补偿点位
  378. self.dspeed_backfill_point = [self.data_s7.make_point(f"L{i+1}拉速", Integration_speed_mpmin) for i in range(8)]
  379. # 主逻辑对象
  380. self.batch_heat = BatchHeat(self.ccm_no, self.manual_change_heat_sig,
  381. self.nuo_heat_point, self.begin_pour_sig,
  382. self.ladle_weight_1, self.ladle_weight_2,
  383. self.sender, self.logger)
  384. self.batch_heat.loop_start()
  385. self.batch_billet = BatchBillet(self.batch_heat, self.begin_cutting_sig, self.short_length_cutting,
  386. self.length_cutting, self.drawing_speed, self.logger)
  387. self.batch_billet.set_dspeed_backfill(self.dspeed_backfill_point)
  388. self.batch_billet.open_dspeed_backfill()