17db96d56Sopenharmony_ci#
27db96d56Sopenharmony_ci# Module which deals with pickling of objects.
37db96d56Sopenharmony_ci#
47db96d56Sopenharmony_ci# multiprocessing/reduction.py
57db96d56Sopenharmony_ci#
67db96d56Sopenharmony_ci# Copyright (c) 2006-2008, R Oudkerk
77db96d56Sopenharmony_ci# Licensed to PSF under a Contributor Agreement.
87db96d56Sopenharmony_ci#
97db96d56Sopenharmony_ci
107db96d56Sopenharmony_cifrom abc import ABCMeta
117db96d56Sopenharmony_ciimport copyreg
127db96d56Sopenharmony_ciimport functools
137db96d56Sopenharmony_ciimport io
147db96d56Sopenharmony_ciimport os
157db96d56Sopenharmony_ciimport pickle
167db96d56Sopenharmony_ciimport socket
177db96d56Sopenharmony_ciimport sys
187db96d56Sopenharmony_ci
197db96d56Sopenharmony_cifrom . import context
207db96d56Sopenharmony_ci
217db96d56Sopenharmony_ci__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_ci
247db96d56Sopenharmony_ciHAVE_SEND_HANDLE = (sys.platform == 'win32' or
257db96d56Sopenharmony_ci                    (hasattr(socket, 'CMSG_LEN') and
267db96d56Sopenharmony_ci                     hasattr(socket, 'SCM_RIGHTS') and
277db96d56Sopenharmony_ci                     hasattr(socket.socket, 'sendmsg')))
287db96d56Sopenharmony_ci
297db96d56Sopenharmony_ci#
307db96d56Sopenharmony_ci# Pickler subclass
317db96d56Sopenharmony_ci#
327db96d56Sopenharmony_ci
337db96d56Sopenharmony_ciclass ForkingPickler(pickle.Pickler):
347db96d56Sopenharmony_ci    '''Pickler subclass used by multiprocessing.'''
357db96d56Sopenharmony_ci    _extra_reducers = {}
367db96d56Sopenharmony_ci    _copyreg_dispatch_table = copyreg.dispatch_table
377db96d56Sopenharmony_ci
387db96d56Sopenharmony_ci    def __init__(self, *args):
397db96d56Sopenharmony_ci        super().__init__(*args)
407db96d56Sopenharmony_ci        self.dispatch_table = self._copyreg_dispatch_table.copy()
417db96d56Sopenharmony_ci        self.dispatch_table.update(self._extra_reducers)
427db96d56Sopenharmony_ci
437db96d56Sopenharmony_ci    @classmethod
447db96d56Sopenharmony_ci    def register(cls, type, reduce):
457db96d56Sopenharmony_ci        '''Register a reduce function for a type.'''
467db96d56Sopenharmony_ci        cls._extra_reducers[type] = reduce
477db96d56Sopenharmony_ci
487db96d56Sopenharmony_ci    @classmethod
497db96d56Sopenharmony_ci    def dumps(cls, obj, protocol=None):
507db96d56Sopenharmony_ci        buf = io.BytesIO()
517db96d56Sopenharmony_ci        cls(buf, protocol).dump(obj)
527db96d56Sopenharmony_ci        return buf.getbuffer()
537db96d56Sopenharmony_ci
547db96d56Sopenharmony_ci    loads = pickle.loads
557db96d56Sopenharmony_ci
567db96d56Sopenharmony_ciregister = ForkingPickler.register
577db96d56Sopenharmony_ci
587db96d56Sopenharmony_cidef dump(obj, file, protocol=None):
597db96d56Sopenharmony_ci    '''Replacement for pickle.dump() using ForkingPickler.'''
607db96d56Sopenharmony_ci    ForkingPickler(file, protocol).dump(obj)
617db96d56Sopenharmony_ci
627db96d56Sopenharmony_ci#
637db96d56Sopenharmony_ci# Platform specific definitions
647db96d56Sopenharmony_ci#
657db96d56Sopenharmony_ci
667db96d56Sopenharmony_ciif sys.platform == 'win32':
677db96d56Sopenharmony_ci    # Windows
687db96d56Sopenharmony_ci    __all__ += ['DupHandle', 'duplicate', 'steal_handle']
697db96d56Sopenharmony_ci    import _winapi
707db96d56Sopenharmony_ci
717db96d56Sopenharmony_ci    def duplicate(handle, target_process=None, inheritable=False,
727db96d56Sopenharmony_ci                  *, source_process=None):
737db96d56Sopenharmony_ci        '''Duplicate a handle.  (target_process is a handle not a pid!)'''
747db96d56Sopenharmony_ci        current_process = _winapi.GetCurrentProcess()
757db96d56Sopenharmony_ci        if source_process is None:
767db96d56Sopenharmony_ci            source_process = current_process
777db96d56Sopenharmony_ci        if target_process is None:
787db96d56Sopenharmony_ci            target_process = current_process
797db96d56Sopenharmony_ci        return _winapi.DuplicateHandle(
807db96d56Sopenharmony_ci            source_process, handle, target_process,
817db96d56Sopenharmony_ci            0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
827db96d56Sopenharmony_ci
837db96d56Sopenharmony_ci    def steal_handle(source_pid, handle):
847db96d56Sopenharmony_ci        '''Steal a handle from process identified by source_pid.'''
857db96d56Sopenharmony_ci        source_process_handle = _winapi.OpenProcess(
867db96d56Sopenharmony_ci            _winapi.PROCESS_DUP_HANDLE, False, source_pid)
877db96d56Sopenharmony_ci        try:
887db96d56Sopenharmony_ci            return _winapi.DuplicateHandle(
897db96d56Sopenharmony_ci                source_process_handle, handle,
907db96d56Sopenharmony_ci                _winapi.GetCurrentProcess(), 0, False,
917db96d56Sopenharmony_ci                _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
927db96d56Sopenharmony_ci        finally:
937db96d56Sopenharmony_ci            _winapi.CloseHandle(source_process_handle)
947db96d56Sopenharmony_ci
957db96d56Sopenharmony_ci    def send_handle(conn, handle, destination_pid):
967db96d56Sopenharmony_ci        '''Send a handle over a local connection.'''
977db96d56Sopenharmony_ci        dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
987db96d56Sopenharmony_ci        conn.send(dh)
997db96d56Sopenharmony_ci
1007db96d56Sopenharmony_ci    def recv_handle(conn):
1017db96d56Sopenharmony_ci        '''Receive a handle over a local connection.'''
1027db96d56Sopenharmony_ci        return conn.recv().detach()
1037db96d56Sopenharmony_ci
1047db96d56Sopenharmony_ci    class DupHandle(object):
1057db96d56Sopenharmony_ci        '''Picklable wrapper for a handle.'''
1067db96d56Sopenharmony_ci        def __init__(self, handle, access, pid=None):
1077db96d56Sopenharmony_ci            if pid is None:
1087db96d56Sopenharmony_ci                # We just duplicate the handle in the current process and
1097db96d56Sopenharmony_ci                # let the receiving process steal the handle.
1107db96d56Sopenharmony_ci                pid = os.getpid()
1117db96d56Sopenharmony_ci            proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
1127db96d56Sopenharmony_ci            try:
1137db96d56Sopenharmony_ci                self._handle = _winapi.DuplicateHandle(
1147db96d56Sopenharmony_ci                    _winapi.GetCurrentProcess(),
1157db96d56Sopenharmony_ci                    handle, proc, access, False, 0)
1167db96d56Sopenharmony_ci            finally:
1177db96d56Sopenharmony_ci                _winapi.CloseHandle(proc)
1187db96d56Sopenharmony_ci            self._access = access
1197db96d56Sopenharmony_ci            self._pid = pid
1207db96d56Sopenharmony_ci
1217db96d56Sopenharmony_ci        def detach(self):
1227db96d56Sopenharmony_ci            '''Get the handle.  This should only be called once.'''
1237db96d56Sopenharmony_ci            # retrieve handle from process which currently owns it
1247db96d56Sopenharmony_ci            if self._pid == os.getpid():
1257db96d56Sopenharmony_ci                # The handle has already been duplicated for this process.
1267db96d56Sopenharmony_ci                return self._handle
1277db96d56Sopenharmony_ci            # We must steal the handle from the process whose pid is self._pid.
1287db96d56Sopenharmony_ci            proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
1297db96d56Sopenharmony_ci                                       self._pid)
1307db96d56Sopenharmony_ci            try:
1317db96d56Sopenharmony_ci                return _winapi.DuplicateHandle(
1327db96d56Sopenharmony_ci                    proc, self._handle, _winapi.GetCurrentProcess(),
1337db96d56Sopenharmony_ci                    self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
1347db96d56Sopenharmony_ci            finally:
1357db96d56Sopenharmony_ci                _winapi.CloseHandle(proc)
1367db96d56Sopenharmony_ci
1377db96d56Sopenharmony_cielse:
1387db96d56Sopenharmony_ci    # Unix
1397db96d56Sopenharmony_ci    __all__ += ['DupFd', 'sendfds', 'recvfds']
1407db96d56Sopenharmony_ci    import array
1417db96d56Sopenharmony_ci
1427db96d56Sopenharmony_ci    # On MacOSX we should acknowledge receipt of fds -- see Issue14669
1437db96d56Sopenharmony_ci    ACKNOWLEDGE = sys.platform == 'darwin'
1447db96d56Sopenharmony_ci
1457db96d56Sopenharmony_ci    def sendfds(sock, fds):
1467db96d56Sopenharmony_ci        '''Send an array of fds over an AF_UNIX socket.'''
1477db96d56Sopenharmony_ci        fds = array.array('i', fds)
1487db96d56Sopenharmony_ci        msg = bytes([len(fds) % 256])
1497db96d56Sopenharmony_ci        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
1507db96d56Sopenharmony_ci        if ACKNOWLEDGE and sock.recv(1) != b'A':
1517db96d56Sopenharmony_ci            raise RuntimeError('did not receive acknowledgement of fd')
1527db96d56Sopenharmony_ci
1537db96d56Sopenharmony_ci    def recvfds(sock, size):
1547db96d56Sopenharmony_ci        '''Receive an array of fds over an AF_UNIX socket.'''
1557db96d56Sopenharmony_ci        a = array.array('i')
1567db96d56Sopenharmony_ci        bytes_size = a.itemsize * size
1577db96d56Sopenharmony_ci        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size))
1587db96d56Sopenharmony_ci        if not msg and not ancdata:
1597db96d56Sopenharmony_ci            raise EOFError
1607db96d56Sopenharmony_ci        try:
1617db96d56Sopenharmony_ci            if ACKNOWLEDGE:
1627db96d56Sopenharmony_ci                sock.send(b'A')
1637db96d56Sopenharmony_ci            if len(ancdata) != 1:
1647db96d56Sopenharmony_ci                raise RuntimeError('received %d items of ancdata' %
1657db96d56Sopenharmony_ci                                   len(ancdata))
1667db96d56Sopenharmony_ci            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
1677db96d56Sopenharmony_ci            if (cmsg_level == socket.SOL_SOCKET and
1687db96d56Sopenharmony_ci                cmsg_type == socket.SCM_RIGHTS):
1697db96d56Sopenharmony_ci                if len(cmsg_data) % a.itemsize != 0:
1707db96d56Sopenharmony_ci                    raise ValueError
1717db96d56Sopenharmony_ci                a.frombytes(cmsg_data)
1727db96d56Sopenharmony_ci                if len(a) % 256 != msg[0]:
1737db96d56Sopenharmony_ci                    raise AssertionError(
1747db96d56Sopenharmony_ci                        "Len is {0:n} but msg[0] is {1!r}".format(
1757db96d56Sopenharmony_ci                            len(a), msg[0]))
1767db96d56Sopenharmony_ci                return list(a)
1777db96d56Sopenharmony_ci        except (ValueError, IndexError):
1787db96d56Sopenharmony_ci            pass
1797db96d56Sopenharmony_ci        raise RuntimeError('Invalid data received')
1807db96d56Sopenharmony_ci
1817db96d56Sopenharmony_ci    def send_handle(conn, handle, destination_pid):
1827db96d56Sopenharmony_ci        '''Send a handle over a local connection.'''
1837db96d56Sopenharmony_ci        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
1847db96d56Sopenharmony_ci            sendfds(s, [handle])
1857db96d56Sopenharmony_ci
1867db96d56Sopenharmony_ci    def recv_handle(conn):
1877db96d56Sopenharmony_ci        '''Receive a handle over a local connection.'''
1887db96d56Sopenharmony_ci        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
1897db96d56Sopenharmony_ci            return recvfds(s, 1)[0]
1907db96d56Sopenharmony_ci
1917db96d56Sopenharmony_ci    def DupFd(fd):
1927db96d56Sopenharmony_ci        '''Return a wrapper for an fd.'''
1937db96d56Sopenharmony_ci        popen_obj = context.get_spawning_popen()
1947db96d56Sopenharmony_ci        if popen_obj is not None:
1957db96d56Sopenharmony_ci            return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
1967db96d56Sopenharmony_ci        elif HAVE_SEND_HANDLE:
1977db96d56Sopenharmony_ci            from . import resource_sharer
1987db96d56Sopenharmony_ci            return resource_sharer.DupFd(fd)
1997db96d56Sopenharmony_ci        else:
2007db96d56Sopenharmony_ci            raise ValueError('SCM_RIGHTS appears not to be available')
2017db96d56Sopenharmony_ci
2027db96d56Sopenharmony_ci#
2037db96d56Sopenharmony_ci# Try making some callable types picklable
2047db96d56Sopenharmony_ci#
2057db96d56Sopenharmony_ci
2067db96d56Sopenharmony_cidef _reduce_method(m):
2077db96d56Sopenharmony_ci    if m.__self__ is None:
2087db96d56Sopenharmony_ci        return getattr, (m.__class__, m.__func__.__name__)
2097db96d56Sopenharmony_ci    else:
2107db96d56Sopenharmony_ci        return getattr, (m.__self__, m.__func__.__name__)
2117db96d56Sopenharmony_ciclass _C:
2127db96d56Sopenharmony_ci    def f(self):
2137db96d56Sopenharmony_ci        pass
2147db96d56Sopenharmony_ciregister(type(_C().f), _reduce_method)
2157db96d56Sopenharmony_ci
2167db96d56Sopenharmony_ci
2177db96d56Sopenharmony_cidef _reduce_method_descriptor(m):
2187db96d56Sopenharmony_ci    return getattr, (m.__objclass__, m.__name__)
2197db96d56Sopenharmony_ciregister(type(list.append), _reduce_method_descriptor)
2207db96d56Sopenharmony_ciregister(type(int.__add__), _reduce_method_descriptor)
2217db96d56Sopenharmony_ci
2227db96d56Sopenharmony_ci
2237db96d56Sopenharmony_cidef _reduce_partial(p):
2247db96d56Sopenharmony_ci    return _rebuild_partial, (p.func, p.args, p.keywords or {})
2257db96d56Sopenharmony_cidef _rebuild_partial(func, args, keywords):
2267db96d56Sopenharmony_ci    return functools.partial(func, *args, **keywords)
2277db96d56Sopenharmony_ciregister(functools.partial, _reduce_partial)
2287db96d56Sopenharmony_ci
2297db96d56Sopenharmony_ci#
2307db96d56Sopenharmony_ci# Make sockets picklable
2317db96d56Sopenharmony_ci#
2327db96d56Sopenharmony_ci
2337db96d56Sopenharmony_ciif sys.platform == 'win32':
2347db96d56Sopenharmony_ci    def _reduce_socket(s):
2357db96d56Sopenharmony_ci        from .resource_sharer import DupSocket
2367db96d56Sopenharmony_ci        return _rebuild_socket, (DupSocket(s),)
2377db96d56Sopenharmony_ci    def _rebuild_socket(ds):
2387db96d56Sopenharmony_ci        return ds.detach()
2397db96d56Sopenharmony_ci    register(socket.socket, _reduce_socket)
2407db96d56Sopenharmony_ci
2417db96d56Sopenharmony_cielse:
2427db96d56Sopenharmony_ci    def _reduce_socket(s):
2437db96d56Sopenharmony_ci        df = DupFd(s.fileno())
2447db96d56Sopenharmony_ci        return _rebuild_socket, (df, s.family, s.type, s.proto)
2457db96d56Sopenharmony_ci    def _rebuild_socket(df, family, type, proto):
2467db96d56Sopenharmony_ci        fd = df.detach()
2477db96d56Sopenharmony_ci        return socket.socket(family, type, proto, fileno=fd)
2487db96d56Sopenharmony_ci    register(socket.socket, _reduce_socket)
2497db96d56Sopenharmony_ci
2507db96d56Sopenharmony_ci
2517db96d56Sopenharmony_ciclass AbstractReducer(metaclass=ABCMeta):
2527db96d56Sopenharmony_ci    '''Abstract base class for use in implementing a Reduction class
2537db96d56Sopenharmony_ci    suitable for use in replacing the standard reduction mechanism
2547db96d56Sopenharmony_ci    used in multiprocessing.'''
2557db96d56Sopenharmony_ci    ForkingPickler = ForkingPickler
2567db96d56Sopenharmony_ci    register = register
2577db96d56Sopenharmony_ci    dump = dump
2587db96d56Sopenharmony_ci    send_handle = send_handle
2597db96d56Sopenharmony_ci    recv_handle = recv_handle
2607db96d56Sopenharmony_ci
2617db96d56Sopenharmony_ci    if sys.platform == 'win32':
2627db96d56Sopenharmony_ci        steal_handle = steal_handle
2637db96d56Sopenharmony_ci        duplicate = duplicate
2647db96d56Sopenharmony_ci        DupHandle = DupHandle
2657db96d56Sopenharmony_ci    else:
2667db96d56Sopenharmony_ci        sendfds = sendfds
2677db96d56Sopenharmony_ci        recvfds = recvfds
2687db96d56Sopenharmony_ci        DupFd = DupFd
2697db96d56Sopenharmony_ci
2707db96d56Sopenharmony_ci    _reduce_method = _reduce_method
2717db96d56Sopenharmony_ci    _reduce_method_descriptor = _reduce_method_descriptor
2727db96d56Sopenharmony_ci    _rebuild_partial = _rebuild_partial
2737db96d56Sopenharmony_ci    _reduce_socket = _reduce_socket
2747db96d56Sopenharmony_ci    _rebuild_socket = _rebuild_socket
2757db96d56Sopenharmony_ci
2767db96d56Sopenharmony_ci    def __init__(self, *args):
2777db96d56Sopenharmony_ci        register(type(_C().f), _reduce_method)
2787db96d56Sopenharmony_ci        register(type(list.append), _reduce_method_descriptor)
2797db96d56Sopenharmony_ci        register(type(int.__add__), _reduce_method_descriptor)
2807db96d56Sopenharmony_ci        register(functools.partial, _reduce_partial)
2817db96d56Sopenharmony_ci        register(socket.socket, _reduce_socket)
282