17db96d56Sopenharmony_ci###############################################################################
27db96d56Sopenharmony_ci# Server process to keep track of unlinked resources (like shared memory
37db96d56Sopenharmony_ci# segments, semaphores etc.) and clean them.
47db96d56Sopenharmony_ci#
57db96d56Sopenharmony_ci# On Unix we run a server process which keeps track of unlinked
67db96d56Sopenharmony_ci# resources. The server ignores SIGINT and SIGTERM and reads from a
77db96d56Sopenharmony_ci# pipe.  Every other process of the program has a copy of the writable
87db96d56Sopenharmony_ci# end of the pipe, so we get EOF when all other processes have exited.
97db96d56Sopenharmony_ci# Then the server process unlinks any remaining resource names.
107db96d56Sopenharmony_ci#
117db96d56Sopenharmony_ci# This is important because there may be system limits for such resources: for
127db96d56Sopenharmony_ci# instance, the system only supports a limited number of named semaphores, and
137db96d56Sopenharmony_ci# shared-memory segments live in the RAM. If a python process leaks such a
147db96d56Sopenharmony_ci# resource, this resource will not be removed till the next reboot.  Without
157db96d56Sopenharmony_ci# this resource tracker process, "killall python" would probably leave unlinked
167db96d56Sopenharmony_ci# resources.
177db96d56Sopenharmony_ci
187db96d56Sopenharmony_ciimport os
197db96d56Sopenharmony_ciimport signal
207db96d56Sopenharmony_ciimport sys
217db96d56Sopenharmony_ciimport threading
227db96d56Sopenharmony_ciimport warnings
237db96d56Sopenharmony_ci
247db96d56Sopenharmony_cifrom . import spawn
257db96d56Sopenharmony_cifrom . import util
267db96d56Sopenharmony_ci
277db96d56Sopenharmony_ci__all__ = ['ensure_running', 'register', 'unregister']
287db96d56Sopenharmony_ci
297db96d56Sopenharmony_ci_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
307db96d56Sopenharmony_ci_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
317db96d56Sopenharmony_ci
327db96d56Sopenharmony_ci_CLEANUP_FUNCS = {
337db96d56Sopenharmony_ci    'noop': lambda: None,
347db96d56Sopenharmony_ci}
357db96d56Sopenharmony_ci
367db96d56Sopenharmony_ciif os.name == 'posix':
377db96d56Sopenharmony_ci    import _multiprocessing
387db96d56Sopenharmony_ci    import _posixshmem
397db96d56Sopenharmony_ci
407db96d56Sopenharmony_ci    # Use sem_unlink() to clean up named semaphores.
417db96d56Sopenharmony_ci    #
427db96d56Sopenharmony_ci    # sem_unlink() may be missing if the Python build process detected the
437db96d56Sopenharmony_ci    # absence of POSIX named semaphores. In that case, no named semaphores were
447db96d56Sopenharmony_ci    # ever opened, so no cleanup would be necessary.
457db96d56Sopenharmony_ci    if hasattr(_multiprocessing, 'sem_unlink'):
467db96d56Sopenharmony_ci        _CLEANUP_FUNCS.update({
477db96d56Sopenharmony_ci            'semaphore': _multiprocessing.sem_unlink,
487db96d56Sopenharmony_ci        })
497db96d56Sopenharmony_ci    _CLEANUP_FUNCS.update({
507db96d56Sopenharmony_ci        'shared_memory': _posixshmem.shm_unlink,
517db96d56Sopenharmony_ci    })
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ci
547db96d56Sopenharmony_ciclass ResourceTracker(object):
557db96d56Sopenharmony_ci
567db96d56Sopenharmony_ci    def __init__(self):
577db96d56Sopenharmony_ci        self._lock = threading.Lock()
587db96d56Sopenharmony_ci        self._fd = None
597db96d56Sopenharmony_ci        self._pid = None
607db96d56Sopenharmony_ci
617db96d56Sopenharmony_ci    def _stop(self):
627db96d56Sopenharmony_ci        with self._lock:
637db96d56Sopenharmony_ci            if self._fd is None:
647db96d56Sopenharmony_ci                # not running
657db96d56Sopenharmony_ci                return
667db96d56Sopenharmony_ci
677db96d56Sopenharmony_ci            # closing the "alive" file descriptor stops main()
687db96d56Sopenharmony_ci            os.close(self._fd)
697db96d56Sopenharmony_ci            self._fd = None
707db96d56Sopenharmony_ci
717db96d56Sopenharmony_ci            os.waitpid(self._pid, 0)
727db96d56Sopenharmony_ci            self._pid = None
737db96d56Sopenharmony_ci
747db96d56Sopenharmony_ci    def getfd(self):
757db96d56Sopenharmony_ci        self.ensure_running()
767db96d56Sopenharmony_ci        return self._fd
777db96d56Sopenharmony_ci
787db96d56Sopenharmony_ci    def ensure_running(self):
797db96d56Sopenharmony_ci        '''Make sure that resource tracker process is running.
807db96d56Sopenharmony_ci
817db96d56Sopenharmony_ci        This can be run from any process.  Usually a child process will use
827db96d56Sopenharmony_ci        the resource created by its parent.'''
837db96d56Sopenharmony_ci        with self._lock:
847db96d56Sopenharmony_ci            if self._fd is not None:
857db96d56Sopenharmony_ci                # resource tracker was launched before, is it still running?
867db96d56Sopenharmony_ci                if self._check_alive():
877db96d56Sopenharmony_ci                    # => still alive
887db96d56Sopenharmony_ci                    return
897db96d56Sopenharmony_ci                # => dead, launch it again
907db96d56Sopenharmony_ci                os.close(self._fd)
917db96d56Sopenharmony_ci
927db96d56Sopenharmony_ci                # Clean-up to avoid dangling processes.
937db96d56Sopenharmony_ci                try:
947db96d56Sopenharmony_ci                    # _pid can be None if this process is a child from another
957db96d56Sopenharmony_ci                    # python process, which has started the resource_tracker.
967db96d56Sopenharmony_ci                    if self._pid is not None:
977db96d56Sopenharmony_ci                        os.waitpid(self._pid, 0)
987db96d56Sopenharmony_ci                except ChildProcessError:
997db96d56Sopenharmony_ci                    # The resource_tracker has already been terminated.
1007db96d56Sopenharmony_ci                    pass
1017db96d56Sopenharmony_ci                self._fd = None
1027db96d56Sopenharmony_ci                self._pid = None
1037db96d56Sopenharmony_ci
1047db96d56Sopenharmony_ci                warnings.warn('resource_tracker: process died unexpectedly, '
1057db96d56Sopenharmony_ci                              'relaunching.  Some resources might leak.')
1067db96d56Sopenharmony_ci
1077db96d56Sopenharmony_ci            fds_to_pass = []
1087db96d56Sopenharmony_ci            try:
1097db96d56Sopenharmony_ci                fds_to_pass.append(sys.stderr.fileno())
1107db96d56Sopenharmony_ci            except Exception:
1117db96d56Sopenharmony_ci                pass
1127db96d56Sopenharmony_ci            cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
1137db96d56Sopenharmony_ci            r, w = os.pipe()
1147db96d56Sopenharmony_ci            try:
1157db96d56Sopenharmony_ci                fds_to_pass.append(r)
1167db96d56Sopenharmony_ci                # process will out live us, so no need to wait on pid
1177db96d56Sopenharmony_ci                exe = spawn.get_executable()
1187db96d56Sopenharmony_ci                args = [exe] + util._args_from_interpreter_flags()
1197db96d56Sopenharmony_ci                args += ['-c', cmd % r]
1207db96d56Sopenharmony_ci                # bpo-33613: Register a signal mask that will block the signals.
1217db96d56Sopenharmony_ci                # This signal mask will be inherited by the child that is going
1227db96d56Sopenharmony_ci                # to be spawned and will protect the child from a race condition
1237db96d56Sopenharmony_ci                # that can make the child die before it registers signal handlers
1247db96d56Sopenharmony_ci                # for SIGINT and SIGTERM. The mask is unregistered after spawning
1257db96d56Sopenharmony_ci                # the child.
1267db96d56Sopenharmony_ci                try:
1277db96d56Sopenharmony_ci                    if _HAVE_SIGMASK:
1287db96d56Sopenharmony_ci                        signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
1297db96d56Sopenharmony_ci                    pid = util.spawnv_passfds(exe, args, fds_to_pass)
1307db96d56Sopenharmony_ci                finally:
1317db96d56Sopenharmony_ci                    if _HAVE_SIGMASK:
1327db96d56Sopenharmony_ci                        signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
1337db96d56Sopenharmony_ci            except:
1347db96d56Sopenharmony_ci                os.close(w)
1357db96d56Sopenharmony_ci                raise
1367db96d56Sopenharmony_ci            else:
1377db96d56Sopenharmony_ci                self._fd = w
1387db96d56Sopenharmony_ci                self._pid = pid
1397db96d56Sopenharmony_ci            finally:
1407db96d56Sopenharmony_ci                os.close(r)
1417db96d56Sopenharmony_ci
1427db96d56Sopenharmony_ci    def _check_alive(self):
1437db96d56Sopenharmony_ci        '''Check that the pipe has not been closed by sending a probe.'''
1447db96d56Sopenharmony_ci        try:
1457db96d56Sopenharmony_ci            # We cannot use send here as it calls ensure_running, creating
1467db96d56Sopenharmony_ci            # a cycle.
1477db96d56Sopenharmony_ci            os.write(self._fd, b'PROBE:0:noop\n')
1487db96d56Sopenharmony_ci        except OSError:
1497db96d56Sopenharmony_ci            return False
1507db96d56Sopenharmony_ci        else:
1517db96d56Sopenharmony_ci            return True
1527db96d56Sopenharmony_ci
1537db96d56Sopenharmony_ci    def register(self, name, rtype):
1547db96d56Sopenharmony_ci        '''Register name of resource with resource tracker.'''
1557db96d56Sopenharmony_ci        self._send('REGISTER', name, rtype)
1567db96d56Sopenharmony_ci
1577db96d56Sopenharmony_ci    def unregister(self, name, rtype):
1587db96d56Sopenharmony_ci        '''Unregister name of resource with resource tracker.'''
1597db96d56Sopenharmony_ci        self._send('UNREGISTER', name, rtype)
1607db96d56Sopenharmony_ci
1617db96d56Sopenharmony_ci    def _send(self, cmd, name, rtype):
1627db96d56Sopenharmony_ci        self.ensure_running()
1637db96d56Sopenharmony_ci        msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
1647db96d56Sopenharmony_ci        if len(msg) > 512:
1657db96d56Sopenharmony_ci            # posix guarantees that writes to a pipe of less than PIPE_BUF
1667db96d56Sopenharmony_ci            # bytes are atomic, and that PIPE_BUF >= 512
1677db96d56Sopenharmony_ci            raise ValueError('msg too long')
1687db96d56Sopenharmony_ci        nbytes = os.write(self._fd, msg)
1697db96d56Sopenharmony_ci        assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
1707db96d56Sopenharmony_ci            nbytes, len(msg))
1717db96d56Sopenharmony_ci
1727db96d56Sopenharmony_ci
1737db96d56Sopenharmony_ci_resource_tracker = ResourceTracker()
1747db96d56Sopenharmony_ciensure_running = _resource_tracker.ensure_running
1757db96d56Sopenharmony_ciregister = _resource_tracker.register
1767db96d56Sopenharmony_ciunregister = _resource_tracker.unregister
1777db96d56Sopenharmony_cigetfd = _resource_tracker.getfd
1787db96d56Sopenharmony_ci
1797db96d56Sopenharmony_cidef main(fd):
1807db96d56Sopenharmony_ci    '''Run resource tracker.'''
1817db96d56Sopenharmony_ci    # protect the process from ^C and "killall python" etc
1827db96d56Sopenharmony_ci    signal.signal(signal.SIGINT, signal.SIG_IGN)
1837db96d56Sopenharmony_ci    signal.signal(signal.SIGTERM, signal.SIG_IGN)
1847db96d56Sopenharmony_ci    if _HAVE_SIGMASK:
1857db96d56Sopenharmony_ci        signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
1867db96d56Sopenharmony_ci
1877db96d56Sopenharmony_ci    for f in (sys.stdin, sys.stdout):
1887db96d56Sopenharmony_ci        try:
1897db96d56Sopenharmony_ci            f.close()
1907db96d56Sopenharmony_ci        except Exception:
1917db96d56Sopenharmony_ci            pass
1927db96d56Sopenharmony_ci
1937db96d56Sopenharmony_ci    cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
1947db96d56Sopenharmony_ci    try:
1957db96d56Sopenharmony_ci        # keep track of registered/unregistered resources
1967db96d56Sopenharmony_ci        with open(fd, 'rb') as f:
1977db96d56Sopenharmony_ci            for line in f:
1987db96d56Sopenharmony_ci                try:
1997db96d56Sopenharmony_ci                    cmd, name, rtype = line.strip().decode('ascii').split(':')
2007db96d56Sopenharmony_ci                    cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
2017db96d56Sopenharmony_ci                    if cleanup_func is None:
2027db96d56Sopenharmony_ci                        raise ValueError(
2037db96d56Sopenharmony_ci                            f'Cannot register {name} for automatic cleanup: '
2047db96d56Sopenharmony_ci                            f'unknown resource type {rtype}')
2057db96d56Sopenharmony_ci
2067db96d56Sopenharmony_ci                    if cmd == 'REGISTER':
2077db96d56Sopenharmony_ci                        cache[rtype].add(name)
2087db96d56Sopenharmony_ci                    elif cmd == 'UNREGISTER':
2097db96d56Sopenharmony_ci                        cache[rtype].remove(name)
2107db96d56Sopenharmony_ci                    elif cmd == 'PROBE':
2117db96d56Sopenharmony_ci                        pass
2127db96d56Sopenharmony_ci                    else:
2137db96d56Sopenharmony_ci                        raise RuntimeError('unrecognized command %r' % cmd)
2147db96d56Sopenharmony_ci                except Exception:
2157db96d56Sopenharmony_ci                    try:
2167db96d56Sopenharmony_ci                        sys.excepthook(*sys.exc_info())
2177db96d56Sopenharmony_ci                    except:
2187db96d56Sopenharmony_ci                        pass
2197db96d56Sopenharmony_ci    finally:
2207db96d56Sopenharmony_ci        # all processes have terminated; cleanup any remaining resources
2217db96d56Sopenharmony_ci        for rtype, rtype_cache in cache.items():
2227db96d56Sopenharmony_ci            if rtype_cache:
2237db96d56Sopenharmony_ci                try:
2247db96d56Sopenharmony_ci                    warnings.warn('resource_tracker: There appear to be %d '
2257db96d56Sopenharmony_ci                                  'leaked %s objects to clean up at shutdown' %
2267db96d56Sopenharmony_ci                                  (len(rtype_cache), rtype))
2277db96d56Sopenharmony_ci                except Exception:
2287db96d56Sopenharmony_ci                    pass
2297db96d56Sopenharmony_ci            for name in rtype_cache:
2307db96d56Sopenharmony_ci                # For some reason the process which created and registered this
2317db96d56Sopenharmony_ci                # resource has failed to unregister it. Presumably it has
2327db96d56Sopenharmony_ci                # died.  We therefore unlink it.
2337db96d56Sopenharmony_ci                try:
2347db96d56Sopenharmony_ci                    try:
2357db96d56Sopenharmony_ci                        _CLEANUP_FUNCS[rtype](name)
2367db96d56Sopenharmony_ci                    except Exception as e:
2377db96d56Sopenharmony_ci                        warnings.warn('resource_tracker: %r: %s' % (name, e))
2387db96d56Sopenharmony_ci                finally:
2397db96d56Sopenharmony_ci                    pass
240