17db96d56Sopenharmony_ci# 27db96d56Sopenharmony_ci# We use a background thread for sharing fds on Unix, and for sharing sockets on 37db96d56Sopenharmony_ci# Windows. 47db96d56Sopenharmony_ci# 57db96d56Sopenharmony_ci# A client which wants to pickle a resource registers it with the resource 67db96d56Sopenharmony_ci# sharer and gets an identifier in return. The unpickling process will connect 77db96d56Sopenharmony_ci# to the resource sharer, sends the identifier and its pid, and then receives 87db96d56Sopenharmony_ci# the resource. 97db96d56Sopenharmony_ci# 107db96d56Sopenharmony_ci 117db96d56Sopenharmony_ciimport os 127db96d56Sopenharmony_ciimport signal 137db96d56Sopenharmony_ciimport socket 147db96d56Sopenharmony_ciimport sys 157db96d56Sopenharmony_ciimport threading 167db96d56Sopenharmony_ci 177db96d56Sopenharmony_cifrom . import process 187db96d56Sopenharmony_cifrom .context import reduction 197db96d56Sopenharmony_cifrom . import util 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_ci__all__ = ['stop'] 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ci 247db96d56Sopenharmony_ciif sys.platform == 'win32': 257db96d56Sopenharmony_ci __all__ += ['DupSocket'] 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ci class DupSocket(object): 287db96d56Sopenharmony_ci '''Picklable wrapper for a socket.''' 297db96d56Sopenharmony_ci def __init__(self, sock): 307db96d56Sopenharmony_ci new_sock = sock.dup() 317db96d56Sopenharmony_ci def send(conn, pid): 327db96d56Sopenharmony_ci share = new_sock.share(pid) 337db96d56Sopenharmony_ci conn.send_bytes(share) 347db96d56Sopenharmony_ci self._id = _resource_sharer.register(send, new_sock.close) 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_ci def detach(self): 377db96d56Sopenharmony_ci '''Get the socket. This should only be called once.''' 387db96d56Sopenharmony_ci with _resource_sharer.get_connection(self._id) as conn: 397db96d56Sopenharmony_ci share = conn.recv_bytes() 407db96d56Sopenharmony_ci return socket.fromshare(share) 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_cielse: 437db96d56Sopenharmony_ci __all__ += ['DupFd'] 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci class DupFd(object): 467db96d56Sopenharmony_ci '''Wrapper for fd which can be used at any time.''' 477db96d56Sopenharmony_ci def __init__(self, fd): 487db96d56Sopenharmony_ci new_fd = os.dup(fd) 497db96d56Sopenharmony_ci def send(conn, pid): 507db96d56Sopenharmony_ci reduction.send_handle(conn, new_fd, pid) 517db96d56Sopenharmony_ci def close(): 527db96d56Sopenharmony_ci os.close(new_fd) 537db96d56Sopenharmony_ci self._id = _resource_sharer.register(send, close) 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ci def detach(self): 567db96d56Sopenharmony_ci '''Get the fd. This should only be called once.''' 577db96d56Sopenharmony_ci with _resource_sharer.get_connection(self._id) as conn: 587db96d56Sopenharmony_ci return reduction.recv_handle(conn) 597db96d56Sopenharmony_ci 607db96d56Sopenharmony_ci 617db96d56Sopenharmony_ciclass _ResourceSharer(object): 627db96d56Sopenharmony_ci '''Manager for resources using background thread.''' 637db96d56Sopenharmony_ci def __init__(self): 647db96d56Sopenharmony_ci self._key = 0 657db96d56Sopenharmony_ci self._cache = {} 667db96d56Sopenharmony_ci self._lock = threading.Lock() 677db96d56Sopenharmony_ci self._listener = None 687db96d56Sopenharmony_ci self._address = None 697db96d56Sopenharmony_ci self._thread = None 707db96d56Sopenharmony_ci util.register_after_fork(self, _ResourceSharer._afterfork) 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci def register(self, send, close): 737db96d56Sopenharmony_ci '''Register resource, returning an identifier.''' 747db96d56Sopenharmony_ci with self._lock: 757db96d56Sopenharmony_ci if self._address is None: 767db96d56Sopenharmony_ci self._start() 777db96d56Sopenharmony_ci self._key += 1 787db96d56Sopenharmony_ci self._cache[self._key] = (send, close) 797db96d56Sopenharmony_ci return (self._address, self._key) 807db96d56Sopenharmony_ci 817db96d56Sopenharmony_ci @staticmethod 827db96d56Sopenharmony_ci def get_connection(ident): 837db96d56Sopenharmony_ci '''Return connection from which to receive identified resource.''' 847db96d56Sopenharmony_ci from .connection import Client 857db96d56Sopenharmony_ci address, key = ident 867db96d56Sopenharmony_ci c = Client(address, authkey=process.current_process().authkey) 877db96d56Sopenharmony_ci c.send((key, os.getpid())) 887db96d56Sopenharmony_ci return c 897db96d56Sopenharmony_ci 907db96d56Sopenharmony_ci def stop(self, timeout=None): 917db96d56Sopenharmony_ci '''Stop the background thread and clear registered resources.''' 927db96d56Sopenharmony_ci from .connection import Client 937db96d56Sopenharmony_ci with self._lock: 947db96d56Sopenharmony_ci if self._address is not None: 957db96d56Sopenharmony_ci c = Client(self._address, 967db96d56Sopenharmony_ci authkey=process.current_process().authkey) 977db96d56Sopenharmony_ci c.send(None) 987db96d56Sopenharmony_ci c.close() 997db96d56Sopenharmony_ci self._thread.join(timeout) 1007db96d56Sopenharmony_ci if self._thread.is_alive(): 1017db96d56Sopenharmony_ci util.sub_warning('_ResourceSharer thread did ' 1027db96d56Sopenharmony_ci 'not stop when asked') 1037db96d56Sopenharmony_ci self._listener.close() 1047db96d56Sopenharmony_ci self._thread = None 1057db96d56Sopenharmony_ci self._address = None 1067db96d56Sopenharmony_ci self._listener = None 1077db96d56Sopenharmony_ci for key, (send, close) in self._cache.items(): 1087db96d56Sopenharmony_ci close() 1097db96d56Sopenharmony_ci self._cache.clear() 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ci def _afterfork(self): 1127db96d56Sopenharmony_ci for key, (send, close) in self._cache.items(): 1137db96d56Sopenharmony_ci close() 1147db96d56Sopenharmony_ci self._cache.clear() 1157db96d56Sopenharmony_ci self._lock._at_fork_reinit() 1167db96d56Sopenharmony_ci if self._listener is not None: 1177db96d56Sopenharmony_ci self._listener.close() 1187db96d56Sopenharmony_ci self._listener = None 1197db96d56Sopenharmony_ci self._address = None 1207db96d56Sopenharmony_ci self._thread = None 1217db96d56Sopenharmony_ci 1227db96d56Sopenharmony_ci def _start(self): 1237db96d56Sopenharmony_ci from .connection import Listener 1247db96d56Sopenharmony_ci assert self._listener is None, "Already have Listener" 1257db96d56Sopenharmony_ci util.debug('starting listener and thread for sending handles') 1267db96d56Sopenharmony_ci self._listener = Listener(authkey=process.current_process().authkey) 1277db96d56Sopenharmony_ci self._address = self._listener.address 1287db96d56Sopenharmony_ci t = threading.Thread(target=self._serve) 1297db96d56Sopenharmony_ci t.daemon = True 1307db96d56Sopenharmony_ci t.start() 1317db96d56Sopenharmony_ci self._thread = t 1327db96d56Sopenharmony_ci 1337db96d56Sopenharmony_ci def _serve(self): 1347db96d56Sopenharmony_ci if hasattr(signal, 'pthread_sigmask'): 1357db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) 1367db96d56Sopenharmony_ci while 1: 1377db96d56Sopenharmony_ci try: 1387db96d56Sopenharmony_ci with self._listener.accept() as conn: 1397db96d56Sopenharmony_ci msg = conn.recv() 1407db96d56Sopenharmony_ci if msg is None: 1417db96d56Sopenharmony_ci break 1427db96d56Sopenharmony_ci key, destination_pid = msg 1437db96d56Sopenharmony_ci send, close = self._cache.pop(key) 1447db96d56Sopenharmony_ci try: 1457db96d56Sopenharmony_ci send(conn, destination_pid) 1467db96d56Sopenharmony_ci finally: 1477db96d56Sopenharmony_ci close() 1487db96d56Sopenharmony_ci except: 1497db96d56Sopenharmony_ci if not util.is_exiting(): 1507db96d56Sopenharmony_ci sys.excepthook(*sys.exc_info()) 1517db96d56Sopenharmony_ci 1527db96d56Sopenharmony_ci 1537db96d56Sopenharmony_ci_resource_sharer = _ResourceSharer() 1547db96d56Sopenharmony_cistop = _resource_sharer.stop 155