17db96d56Sopenharmony_ci#
27db96d56Sopenharmony_ci# We use a background thread for sharing fds on Unix, and for sharing sockets on
37db96d56Sopenharmony_ci# Windows.
47db96d56Sopenharmony_ci#
57db96d56Sopenharmony_ci# A client which wants to pickle a resource registers it with the resource
67db96d56Sopenharmony_ci# sharer and gets an identifier in return.  The unpickling process will connect
77db96d56Sopenharmony_ci# to the resource sharer, sends the identifier and its pid, and then receives
87db96d56Sopenharmony_ci# the resource.
97db96d56Sopenharmony_ci#
107db96d56Sopenharmony_ci
117db96d56Sopenharmony_ciimport os
127db96d56Sopenharmony_ciimport signal
137db96d56Sopenharmony_ciimport socket
147db96d56Sopenharmony_ciimport sys
157db96d56Sopenharmony_ciimport threading
167db96d56Sopenharmony_ci
177db96d56Sopenharmony_cifrom . import process
187db96d56Sopenharmony_cifrom .context import reduction
197db96d56Sopenharmony_cifrom . import util
207db96d56Sopenharmony_ci
217db96d56Sopenharmony_ci__all__ = ['stop']
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_ci
247db96d56Sopenharmony_ciif sys.platform == 'win32':
257db96d56Sopenharmony_ci    __all__ += ['DupSocket']
267db96d56Sopenharmony_ci
277db96d56Sopenharmony_ci    class DupSocket(object):
287db96d56Sopenharmony_ci        '''Picklable wrapper for a socket.'''
297db96d56Sopenharmony_ci        def __init__(self, sock):
307db96d56Sopenharmony_ci            new_sock = sock.dup()
317db96d56Sopenharmony_ci            def send(conn, pid):
327db96d56Sopenharmony_ci                share = new_sock.share(pid)
337db96d56Sopenharmony_ci                conn.send_bytes(share)
347db96d56Sopenharmony_ci            self._id = _resource_sharer.register(send, new_sock.close)
357db96d56Sopenharmony_ci
367db96d56Sopenharmony_ci        def detach(self):
377db96d56Sopenharmony_ci            '''Get the socket.  This should only be called once.'''
387db96d56Sopenharmony_ci            with _resource_sharer.get_connection(self._id) as conn:
397db96d56Sopenharmony_ci                share = conn.recv_bytes()
407db96d56Sopenharmony_ci                return socket.fromshare(share)
417db96d56Sopenharmony_ci
427db96d56Sopenharmony_cielse:
437db96d56Sopenharmony_ci    __all__ += ['DupFd']
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci    class DupFd(object):
467db96d56Sopenharmony_ci        '''Wrapper for fd which can be used at any time.'''
477db96d56Sopenharmony_ci        def __init__(self, fd):
487db96d56Sopenharmony_ci            new_fd = os.dup(fd)
497db96d56Sopenharmony_ci            def send(conn, pid):
507db96d56Sopenharmony_ci                reduction.send_handle(conn, new_fd, pid)
517db96d56Sopenharmony_ci            def close():
527db96d56Sopenharmony_ci                os.close(new_fd)
537db96d56Sopenharmony_ci            self._id = _resource_sharer.register(send, close)
547db96d56Sopenharmony_ci
557db96d56Sopenharmony_ci        def detach(self):
567db96d56Sopenharmony_ci            '''Get the fd.  This should only be called once.'''
577db96d56Sopenharmony_ci            with _resource_sharer.get_connection(self._id) as conn:
587db96d56Sopenharmony_ci                return reduction.recv_handle(conn)
597db96d56Sopenharmony_ci
607db96d56Sopenharmony_ci
617db96d56Sopenharmony_ciclass _ResourceSharer(object):
627db96d56Sopenharmony_ci    '''Manager for resources using background thread.'''
637db96d56Sopenharmony_ci    def __init__(self):
647db96d56Sopenharmony_ci        self._key = 0
657db96d56Sopenharmony_ci        self._cache = {}
667db96d56Sopenharmony_ci        self._lock = threading.Lock()
677db96d56Sopenharmony_ci        self._listener = None
687db96d56Sopenharmony_ci        self._address = None
697db96d56Sopenharmony_ci        self._thread = None
707db96d56Sopenharmony_ci        util.register_after_fork(self, _ResourceSharer._afterfork)
717db96d56Sopenharmony_ci
727db96d56Sopenharmony_ci    def register(self, send, close):
737db96d56Sopenharmony_ci        '''Register resource, returning an identifier.'''
747db96d56Sopenharmony_ci        with self._lock:
757db96d56Sopenharmony_ci            if self._address is None:
767db96d56Sopenharmony_ci                self._start()
777db96d56Sopenharmony_ci            self._key += 1
787db96d56Sopenharmony_ci            self._cache[self._key] = (send, close)
797db96d56Sopenharmony_ci            return (self._address, self._key)
807db96d56Sopenharmony_ci
817db96d56Sopenharmony_ci    @staticmethod
827db96d56Sopenharmony_ci    def get_connection(ident):
837db96d56Sopenharmony_ci        '''Return connection from which to receive identified resource.'''
847db96d56Sopenharmony_ci        from .connection import Client
857db96d56Sopenharmony_ci        address, key = ident
867db96d56Sopenharmony_ci        c = Client(address, authkey=process.current_process().authkey)
877db96d56Sopenharmony_ci        c.send((key, os.getpid()))
887db96d56Sopenharmony_ci        return c
897db96d56Sopenharmony_ci
907db96d56Sopenharmony_ci    def stop(self, timeout=None):
917db96d56Sopenharmony_ci        '''Stop the background thread and clear registered resources.'''
927db96d56Sopenharmony_ci        from .connection import Client
937db96d56Sopenharmony_ci        with self._lock:
947db96d56Sopenharmony_ci            if self._address is not None:
957db96d56Sopenharmony_ci                c = Client(self._address,
967db96d56Sopenharmony_ci                           authkey=process.current_process().authkey)
977db96d56Sopenharmony_ci                c.send(None)
987db96d56Sopenharmony_ci                c.close()
997db96d56Sopenharmony_ci                self._thread.join(timeout)
1007db96d56Sopenharmony_ci                if self._thread.is_alive():
1017db96d56Sopenharmony_ci                    util.sub_warning('_ResourceSharer thread did '
1027db96d56Sopenharmony_ci                                     'not stop when asked')
1037db96d56Sopenharmony_ci                self._listener.close()
1047db96d56Sopenharmony_ci                self._thread = None
1057db96d56Sopenharmony_ci                self._address = None
1067db96d56Sopenharmony_ci                self._listener = None
1077db96d56Sopenharmony_ci                for key, (send, close) in self._cache.items():
1087db96d56Sopenharmony_ci                    close()
1097db96d56Sopenharmony_ci                self._cache.clear()
1107db96d56Sopenharmony_ci
1117db96d56Sopenharmony_ci    def _afterfork(self):
1127db96d56Sopenharmony_ci        for key, (send, close) in self._cache.items():
1137db96d56Sopenharmony_ci            close()
1147db96d56Sopenharmony_ci        self._cache.clear()
1157db96d56Sopenharmony_ci        self._lock._at_fork_reinit()
1167db96d56Sopenharmony_ci        if self._listener is not None:
1177db96d56Sopenharmony_ci            self._listener.close()
1187db96d56Sopenharmony_ci        self._listener = None
1197db96d56Sopenharmony_ci        self._address = None
1207db96d56Sopenharmony_ci        self._thread = None
1217db96d56Sopenharmony_ci
1227db96d56Sopenharmony_ci    def _start(self):
1237db96d56Sopenharmony_ci        from .connection import Listener
1247db96d56Sopenharmony_ci        assert self._listener is None, "Already have Listener"
1257db96d56Sopenharmony_ci        util.debug('starting listener and thread for sending handles')
1267db96d56Sopenharmony_ci        self._listener = Listener(authkey=process.current_process().authkey)
1277db96d56Sopenharmony_ci        self._address = self._listener.address
1287db96d56Sopenharmony_ci        t = threading.Thread(target=self._serve)
1297db96d56Sopenharmony_ci        t.daemon = True
1307db96d56Sopenharmony_ci        t.start()
1317db96d56Sopenharmony_ci        self._thread = t
1327db96d56Sopenharmony_ci
1337db96d56Sopenharmony_ci    def _serve(self):
1347db96d56Sopenharmony_ci        if hasattr(signal, 'pthread_sigmask'):
1357db96d56Sopenharmony_ci            signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
1367db96d56Sopenharmony_ci        while 1:
1377db96d56Sopenharmony_ci            try:
1387db96d56Sopenharmony_ci                with self._listener.accept() as conn:
1397db96d56Sopenharmony_ci                    msg = conn.recv()
1407db96d56Sopenharmony_ci                    if msg is None:
1417db96d56Sopenharmony_ci                        break
1427db96d56Sopenharmony_ci                    key, destination_pid = msg
1437db96d56Sopenharmony_ci                    send, close = self._cache.pop(key)
1447db96d56Sopenharmony_ci                    try:
1457db96d56Sopenharmony_ci                        send(conn, destination_pid)
1467db96d56Sopenharmony_ci                    finally:
1477db96d56Sopenharmony_ci                        close()
1487db96d56Sopenharmony_ci            except:
1497db96d56Sopenharmony_ci                if not util.is_exiting():
1507db96d56Sopenharmony_ci                    sys.excepthook(*sys.exc_info())
1517db96d56Sopenharmony_ci
1527db96d56Sopenharmony_ci
1537db96d56Sopenharmony_ci_resource_sharer = _ResourceSharer()
1547db96d56Sopenharmony_cistop = _resource_sharer.stop
155