| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- import socket
- from utils.statepoint import *
- class Tcp_server(socket.socket):
- def __init__(self, ip, port, backlog = 5, cli_max = 5, encoding = 'utf-8', point_t = Statepoint):
- super().__init__()
- self.backlog = backlog
- self.cli_max = cli_max
- self.encoding = encoding
- self.point_t = point_t
- self.run_flag = False
- self.main_thread = None
- self.convertor = lambda data: bool(data)
- self.excite_action = lambda: None
- self.clients = {}
- self.datas = {}
- self.points = {}
- self.threads = {}
- self.bind((ip, port))
- self.listen(backlog)
- def accept_action(self):
- while self.run_flag:
- if len(self.clients) < self.cli_max:
- try:
- cli, addr = self.accept()
- except OSError as e:
- print('客户端等待连接服务被打断:', str(e))
- self.run_flag = False
- return None
- tmp_point = self.point_t()
- tmp_point.set_convertor(self.convertor)
- tmp_point.set_excite_action(lambda t=tmp_point: (t.set_state(False), self.excite_action()))
- self.clients[addr] = cli
- self.datas[addr] = b''
- self.points[addr] = [tmp_point]
- self.threads[addr] = threading.Thread(target=self.update_action, args=(addr,))
- self.threads[addr].start()
- print(f'{addr}客户端已连接')
- else:
- time.sleep(1)
- def update_action(self, addr):
- cli = self.clients[addr]
- noerror = True
- while self.run_flag:
- try:
- tmp = cli.recv(1024)
- except Exception as e:
- noerror = False
- tmp = b''
- print(f'{addr}异常断连:{e}')
- if tmp:
- self.datas[addr] = tmp
- self.send(addr)
- else:
- break
- if noerror:
- print(f'{addr}正常断连')
- self.clients.pop(addr, None)
- self.datas.pop(addr, None)
- self.points.pop(addr, None)
- self.threads.pop(addr, None)
- def send(self, addr):
- if self.encoding:
- data = self.datas[addr].decode(encoding=self.encoding, errors='replace')
- else:
- data = self.datas[addr]
- for i in self.points[addr]:
- i.inject(data)
- def start(self):
- if self.main_thread and self.main_thread.is_alive():
- return None
-
- self.getsockname()
- self.main_thread = threading.Thread(target=self.accept_action)
- self.run_flag = True
- self.main_thread.start()
- def stop(self):
- if self.main_thread == None:
- return None
-
- self.run_flag = False
- try:
- addr = self.getsockname()
- cli_tmp = socket.socket()
- cli_tmp.connect(addr)
- cli_tmp.close()
- finally:
- self.main_thread.join()
- self.main_thread = None
- def set_convertor(self, func):
- self.convertor = func
- def set_excite_action(self, func):
- self.excite_action = func
- def close(self):
- self.stop()
- return super().close()
- def send_to_all_clients(self, bytes_data: bytes):
- for i, j in self.clients.items():
- try:
- j.send(bytes_data)
- except Exception as e:
- print(f'{i}发送数据时产生异常:{e}')
|