tcp_data.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import socket
  2. from utils.statepoint import *
  3. class Tcp_server(socket.socket):
  4. def __init__(self, ip, port, backlog = 5, cli_max = 5, encoding = 'utf-8', point_t = Statepoint):
  5. super().__init__()
  6. self.backlog = backlog
  7. self.cli_max = cli_max
  8. self.encoding = encoding
  9. self.point_t = point_t
  10. self.run_flag = False
  11. self.main_thread = None
  12. self.convertor = lambda data: bool(data)
  13. self.excite_action = lambda: None
  14. self.clients = {}
  15. self.datas = {}
  16. self.points = {}
  17. self.threads = {}
  18. self.bind((ip, port))
  19. self.listen(backlog)
  20. def accept_action(self):
  21. while self.run_flag:
  22. if len(self.clients) < self.cli_max:
  23. try:
  24. cli, addr = self.accept()
  25. except OSError as e:
  26. print('客户端等待连接服务被打断:', str(e))
  27. self.run_flag = False
  28. return None
  29. tmp_point = self.point_t()
  30. tmp_point.set_convertor(self.convertor)
  31. tmp_point.set_excite_action(lambda t=tmp_point: (t.set_state(False), self.excite_action()))
  32. self.clients[addr] = cli
  33. self.datas[addr] = b''
  34. self.points[addr] = [tmp_point]
  35. self.threads[addr] = threading.Thread(target=self.update_action, args=(addr,))
  36. self.threads[addr].start()
  37. print(f'{addr}客户端已连接')
  38. else:
  39. time.sleep(1)
  40. def update_action(self, addr):
  41. cli = self.clients[addr]
  42. noerror = True
  43. while self.run_flag:
  44. try:
  45. tmp = cli.recv(1024)
  46. except Exception as e:
  47. noerror = False
  48. tmp = b''
  49. print(f'{addr}异常断连:{e}')
  50. if tmp:
  51. self.datas[addr] = tmp
  52. self.send(addr)
  53. else:
  54. break
  55. if noerror:
  56. print(f'{addr}正常断连')
  57. self.clients.pop(addr, None)
  58. self.datas.pop(addr, None)
  59. self.points.pop(addr, None)
  60. self.threads.pop(addr, None)
  61. def send(self, addr):
  62. if self.encoding:
  63. data = self.datas[addr].decode(encoding=self.encoding, errors='replace')
  64. else:
  65. data = self.datas[addr]
  66. for i in self.points[addr]:
  67. i.inject(data)
  68. def start(self):
  69. if self.main_thread and self.main_thread.is_alive():
  70. return None
  71. self.getsockname()
  72. self.main_thread = threading.Thread(target=self.accept_action)
  73. self.run_flag = True
  74. self.main_thread.start()
  75. def stop(self):
  76. if self.main_thread == None:
  77. return None
  78. self.run_flag = False
  79. try:
  80. addr = self.getsockname()
  81. cli_tmp = socket.socket()
  82. cli_tmp.connect(addr)
  83. cli_tmp.close()
  84. finally:
  85. self.main_thread.join()
  86. self.main_thread = None
  87. def set_convertor(self, func):
  88. self.convertor = func
  89. def set_excite_action(self, func):
  90. self.excite_action = func
  91. def close(self):
  92. self.stop()
  93. return super().close()
  94. def send_to_all_clients(self, bytes_data: bytes):
  95. for i, j in self.clients.items():
  96. try:
  97. j.send(bytes_data)
  98. except Exception as e:
  99. print(f'{i}发送数据时产生异常:{e}')