17db96d56Sopenharmony_ci# 27db96d56Sopenharmony_ci# Module which deals with pickling of objects. 37db96d56Sopenharmony_ci# 47db96d56Sopenharmony_ci# multiprocessing/reduction.py 57db96d56Sopenharmony_ci# 67db96d56Sopenharmony_ci# Copyright (c) 2006-2008, R Oudkerk 77db96d56Sopenharmony_ci# Licensed to PSF under a Contributor Agreement. 87db96d56Sopenharmony_ci# 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_cifrom abc import ABCMeta 117db96d56Sopenharmony_ciimport copyreg 127db96d56Sopenharmony_ciimport functools 137db96d56Sopenharmony_ciimport io 147db96d56Sopenharmony_ciimport os 157db96d56Sopenharmony_ciimport pickle 167db96d56Sopenharmony_ciimport socket 177db96d56Sopenharmony_ciimport sys 187db96d56Sopenharmony_ci 197db96d56Sopenharmony_cifrom . import context 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_ci__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ci 247db96d56Sopenharmony_ciHAVE_SEND_HANDLE = (sys.platform == 'win32' or 257db96d56Sopenharmony_ci (hasattr(socket, 'CMSG_LEN') and 267db96d56Sopenharmony_ci hasattr(socket, 'SCM_RIGHTS') and 277db96d56Sopenharmony_ci hasattr(socket.socket, 'sendmsg'))) 287db96d56Sopenharmony_ci 297db96d56Sopenharmony_ci# 307db96d56Sopenharmony_ci# Pickler subclass 317db96d56Sopenharmony_ci# 327db96d56Sopenharmony_ci 337db96d56Sopenharmony_ciclass ForkingPickler(pickle.Pickler): 347db96d56Sopenharmony_ci '''Pickler subclass used by multiprocessing.''' 357db96d56Sopenharmony_ci _extra_reducers = {} 367db96d56Sopenharmony_ci _copyreg_dispatch_table = copyreg.dispatch_table 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ci def __init__(self, *args): 397db96d56Sopenharmony_ci super().__init__(*args) 407db96d56Sopenharmony_ci self.dispatch_table = self._copyreg_dispatch_table.copy() 417db96d56Sopenharmony_ci self.dispatch_table.update(self._extra_reducers) 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_ci @classmethod 447db96d56Sopenharmony_ci def register(cls, type, reduce): 457db96d56Sopenharmony_ci '''Register a reduce function for a type.''' 467db96d56Sopenharmony_ci cls._extra_reducers[type] = reduce 477db96d56Sopenharmony_ci 487db96d56Sopenharmony_ci @classmethod 497db96d56Sopenharmony_ci def dumps(cls, obj, protocol=None): 507db96d56Sopenharmony_ci buf = io.BytesIO() 517db96d56Sopenharmony_ci cls(buf, protocol).dump(obj) 527db96d56Sopenharmony_ci return buf.getbuffer() 537db96d56Sopenharmony_ci 547db96d56Sopenharmony_ci loads = pickle.loads 557db96d56Sopenharmony_ci 567db96d56Sopenharmony_ciregister = ForkingPickler.register 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_cidef dump(obj, file, protocol=None): 597db96d56Sopenharmony_ci '''Replacement for pickle.dump() using ForkingPickler.''' 607db96d56Sopenharmony_ci ForkingPickler(file, protocol).dump(obj) 617db96d56Sopenharmony_ci 627db96d56Sopenharmony_ci# 637db96d56Sopenharmony_ci# Platform specific definitions 647db96d56Sopenharmony_ci# 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ciif sys.platform == 'win32': 677db96d56Sopenharmony_ci # Windows 687db96d56Sopenharmony_ci __all__ += ['DupHandle', 'duplicate', 'steal_handle'] 697db96d56Sopenharmony_ci import _winapi 707db96d56Sopenharmony_ci 717db96d56Sopenharmony_ci def duplicate(handle, target_process=None, inheritable=False, 727db96d56Sopenharmony_ci *, source_process=None): 737db96d56Sopenharmony_ci '''Duplicate a handle. (target_process is a handle not a pid!)''' 747db96d56Sopenharmony_ci current_process = _winapi.GetCurrentProcess() 757db96d56Sopenharmony_ci if source_process is None: 767db96d56Sopenharmony_ci source_process = current_process 777db96d56Sopenharmony_ci if target_process is None: 787db96d56Sopenharmony_ci target_process = current_process 797db96d56Sopenharmony_ci return _winapi.DuplicateHandle( 807db96d56Sopenharmony_ci source_process, handle, target_process, 817db96d56Sopenharmony_ci 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) 827db96d56Sopenharmony_ci 837db96d56Sopenharmony_ci def steal_handle(source_pid, handle): 847db96d56Sopenharmony_ci '''Steal a handle from process identified by source_pid.''' 857db96d56Sopenharmony_ci source_process_handle = _winapi.OpenProcess( 867db96d56Sopenharmony_ci _winapi.PROCESS_DUP_HANDLE, False, source_pid) 877db96d56Sopenharmony_ci try: 887db96d56Sopenharmony_ci return _winapi.DuplicateHandle( 897db96d56Sopenharmony_ci source_process_handle, handle, 907db96d56Sopenharmony_ci _winapi.GetCurrentProcess(), 0, False, 917db96d56Sopenharmony_ci _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE) 927db96d56Sopenharmony_ci finally: 937db96d56Sopenharmony_ci _winapi.CloseHandle(source_process_handle) 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci def send_handle(conn, handle, destination_pid): 967db96d56Sopenharmony_ci '''Send a handle over a local connection.''' 977db96d56Sopenharmony_ci dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) 987db96d56Sopenharmony_ci conn.send(dh) 997db96d56Sopenharmony_ci 1007db96d56Sopenharmony_ci def recv_handle(conn): 1017db96d56Sopenharmony_ci '''Receive a handle over a local connection.''' 1027db96d56Sopenharmony_ci return conn.recv().detach() 1037db96d56Sopenharmony_ci 1047db96d56Sopenharmony_ci class DupHandle(object): 1057db96d56Sopenharmony_ci '''Picklable wrapper for a handle.''' 1067db96d56Sopenharmony_ci def __init__(self, handle, access, pid=None): 1077db96d56Sopenharmony_ci if pid is None: 1087db96d56Sopenharmony_ci # We just duplicate the handle in the current process and 1097db96d56Sopenharmony_ci # let the receiving process steal the handle. 1107db96d56Sopenharmony_ci pid = os.getpid() 1117db96d56Sopenharmony_ci proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) 1127db96d56Sopenharmony_ci try: 1137db96d56Sopenharmony_ci self._handle = _winapi.DuplicateHandle( 1147db96d56Sopenharmony_ci _winapi.GetCurrentProcess(), 1157db96d56Sopenharmony_ci handle, proc, access, False, 0) 1167db96d56Sopenharmony_ci finally: 1177db96d56Sopenharmony_ci _winapi.CloseHandle(proc) 1187db96d56Sopenharmony_ci self._access = access 1197db96d56Sopenharmony_ci self._pid = pid 1207db96d56Sopenharmony_ci 1217db96d56Sopenharmony_ci def detach(self): 1227db96d56Sopenharmony_ci '''Get the handle. This should only be called once.''' 1237db96d56Sopenharmony_ci # retrieve handle from process which currently owns it 1247db96d56Sopenharmony_ci if self._pid == os.getpid(): 1257db96d56Sopenharmony_ci # The handle has already been duplicated for this process. 1267db96d56Sopenharmony_ci return self._handle 1277db96d56Sopenharmony_ci # We must steal the handle from the process whose pid is self._pid. 1287db96d56Sopenharmony_ci proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, 1297db96d56Sopenharmony_ci self._pid) 1307db96d56Sopenharmony_ci try: 1317db96d56Sopenharmony_ci return _winapi.DuplicateHandle( 1327db96d56Sopenharmony_ci proc, self._handle, _winapi.GetCurrentProcess(), 1337db96d56Sopenharmony_ci self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) 1347db96d56Sopenharmony_ci finally: 1357db96d56Sopenharmony_ci _winapi.CloseHandle(proc) 1367db96d56Sopenharmony_ci 1377db96d56Sopenharmony_cielse: 1387db96d56Sopenharmony_ci # Unix 1397db96d56Sopenharmony_ci __all__ += ['DupFd', 'sendfds', 'recvfds'] 1407db96d56Sopenharmony_ci import array 1417db96d56Sopenharmony_ci 1427db96d56Sopenharmony_ci # On MacOSX we should acknowledge receipt of fds -- see Issue14669 1437db96d56Sopenharmony_ci ACKNOWLEDGE = sys.platform == 'darwin' 1447db96d56Sopenharmony_ci 1457db96d56Sopenharmony_ci def sendfds(sock, fds): 1467db96d56Sopenharmony_ci '''Send an array of fds over an AF_UNIX socket.''' 1477db96d56Sopenharmony_ci fds = array.array('i', fds) 1487db96d56Sopenharmony_ci msg = bytes([len(fds) % 256]) 1497db96d56Sopenharmony_ci sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) 1507db96d56Sopenharmony_ci if ACKNOWLEDGE and sock.recv(1) != b'A': 1517db96d56Sopenharmony_ci raise RuntimeError('did not receive acknowledgement of fd') 1527db96d56Sopenharmony_ci 1537db96d56Sopenharmony_ci def recvfds(sock, size): 1547db96d56Sopenharmony_ci '''Receive an array of fds over an AF_UNIX socket.''' 1557db96d56Sopenharmony_ci a = array.array('i') 1567db96d56Sopenharmony_ci bytes_size = a.itemsize * size 1577db96d56Sopenharmony_ci msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size)) 1587db96d56Sopenharmony_ci if not msg and not ancdata: 1597db96d56Sopenharmony_ci raise EOFError 1607db96d56Sopenharmony_ci try: 1617db96d56Sopenharmony_ci if ACKNOWLEDGE: 1627db96d56Sopenharmony_ci sock.send(b'A') 1637db96d56Sopenharmony_ci if len(ancdata) != 1: 1647db96d56Sopenharmony_ci raise RuntimeError('received %d items of ancdata' % 1657db96d56Sopenharmony_ci len(ancdata)) 1667db96d56Sopenharmony_ci cmsg_level, cmsg_type, cmsg_data = ancdata[0] 1677db96d56Sopenharmony_ci if (cmsg_level == socket.SOL_SOCKET and 1687db96d56Sopenharmony_ci cmsg_type == socket.SCM_RIGHTS): 1697db96d56Sopenharmony_ci if len(cmsg_data) % a.itemsize != 0: 1707db96d56Sopenharmony_ci raise ValueError 1717db96d56Sopenharmony_ci a.frombytes(cmsg_data) 1727db96d56Sopenharmony_ci if len(a) % 256 != msg[0]: 1737db96d56Sopenharmony_ci raise AssertionError( 1747db96d56Sopenharmony_ci "Len is {0:n} but msg[0] is {1!r}".format( 1757db96d56Sopenharmony_ci len(a), msg[0])) 1767db96d56Sopenharmony_ci return list(a) 1777db96d56Sopenharmony_ci except (ValueError, IndexError): 1787db96d56Sopenharmony_ci pass 1797db96d56Sopenharmony_ci raise RuntimeError('Invalid data received') 1807db96d56Sopenharmony_ci 1817db96d56Sopenharmony_ci def send_handle(conn, handle, destination_pid): 1827db96d56Sopenharmony_ci '''Send a handle over a local connection.''' 1837db96d56Sopenharmony_ci with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: 1847db96d56Sopenharmony_ci sendfds(s, [handle]) 1857db96d56Sopenharmony_ci 1867db96d56Sopenharmony_ci def recv_handle(conn): 1877db96d56Sopenharmony_ci '''Receive a handle over a local connection.''' 1887db96d56Sopenharmony_ci with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: 1897db96d56Sopenharmony_ci return recvfds(s, 1)[0] 1907db96d56Sopenharmony_ci 1917db96d56Sopenharmony_ci def DupFd(fd): 1927db96d56Sopenharmony_ci '''Return a wrapper for an fd.''' 1937db96d56Sopenharmony_ci popen_obj = context.get_spawning_popen() 1947db96d56Sopenharmony_ci if popen_obj is not None: 1957db96d56Sopenharmony_ci return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) 1967db96d56Sopenharmony_ci elif HAVE_SEND_HANDLE: 1977db96d56Sopenharmony_ci from . import resource_sharer 1987db96d56Sopenharmony_ci return resource_sharer.DupFd(fd) 1997db96d56Sopenharmony_ci else: 2007db96d56Sopenharmony_ci raise ValueError('SCM_RIGHTS appears not to be available') 2017db96d56Sopenharmony_ci 2027db96d56Sopenharmony_ci# 2037db96d56Sopenharmony_ci# Try making some callable types picklable 2047db96d56Sopenharmony_ci# 2057db96d56Sopenharmony_ci 2067db96d56Sopenharmony_cidef _reduce_method(m): 2077db96d56Sopenharmony_ci if m.__self__ is None: 2087db96d56Sopenharmony_ci return getattr, (m.__class__, m.__func__.__name__) 2097db96d56Sopenharmony_ci else: 2107db96d56Sopenharmony_ci return getattr, (m.__self__, m.__func__.__name__) 2117db96d56Sopenharmony_ciclass _C: 2127db96d56Sopenharmony_ci def f(self): 2137db96d56Sopenharmony_ci pass 2147db96d56Sopenharmony_ciregister(type(_C().f), _reduce_method) 2157db96d56Sopenharmony_ci 2167db96d56Sopenharmony_ci 2177db96d56Sopenharmony_cidef _reduce_method_descriptor(m): 2187db96d56Sopenharmony_ci return getattr, (m.__objclass__, m.__name__) 2197db96d56Sopenharmony_ciregister(type(list.append), _reduce_method_descriptor) 2207db96d56Sopenharmony_ciregister(type(int.__add__), _reduce_method_descriptor) 2217db96d56Sopenharmony_ci 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_cidef _reduce_partial(p): 2247db96d56Sopenharmony_ci return _rebuild_partial, (p.func, p.args, p.keywords or {}) 2257db96d56Sopenharmony_cidef _rebuild_partial(func, args, keywords): 2267db96d56Sopenharmony_ci return functools.partial(func, *args, **keywords) 2277db96d56Sopenharmony_ciregister(functools.partial, _reduce_partial) 2287db96d56Sopenharmony_ci 2297db96d56Sopenharmony_ci# 2307db96d56Sopenharmony_ci# Make sockets picklable 2317db96d56Sopenharmony_ci# 2327db96d56Sopenharmony_ci 2337db96d56Sopenharmony_ciif sys.platform == 'win32': 2347db96d56Sopenharmony_ci def _reduce_socket(s): 2357db96d56Sopenharmony_ci from .resource_sharer import DupSocket 2367db96d56Sopenharmony_ci return _rebuild_socket, (DupSocket(s),) 2377db96d56Sopenharmony_ci def _rebuild_socket(ds): 2387db96d56Sopenharmony_ci return ds.detach() 2397db96d56Sopenharmony_ci register(socket.socket, _reduce_socket) 2407db96d56Sopenharmony_ci 2417db96d56Sopenharmony_cielse: 2427db96d56Sopenharmony_ci def _reduce_socket(s): 2437db96d56Sopenharmony_ci df = DupFd(s.fileno()) 2447db96d56Sopenharmony_ci return _rebuild_socket, (df, s.family, s.type, s.proto) 2457db96d56Sopenharmony_ci def _rebuild_socket(df, family, type, proto): 2467db96d56Sopenharmony_ci fd = df.detach() 2477db96d56Sopenharmony_ci return socket.socket(family, type, proto, fileno=fd) 2487db96d56Sopenharmony_ci register(socket.socket, _reduce_socket) 2497db96d56Sopenharmony_ci 2507db96d56Sopenharmony_ci 2517db96d56Sopenharmony_ciclass AbstractReducer(metaclass=ABCMeta): 2527db96d56Sopenharmony_ci '''Abstract base class for use in implementing a Reduction class 2537db96d56Sopenharmony_ci suitable for use in replacing the standard reduction mechanism 2547db96d56Sopenharmony_ci used in multiprocessing.''' 2557db96d56Sopenharmony_ci ForkingPickler = ForkingPickler 2567db96d56Sopenharmony_ci register = register 2577db96d56Sopenharmony_ci dump = dump 2587db96d56Sopenharmony_ci send_handle = send_handle 2597db96d56Sopenharmony_ci recv_handle = recv_handle 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ci if sys.platform == 'win32': 2627db96d56Sopenharmony_ci steal_handle = steal_handle 2637db96d56Sopenharmony_ci duplicate = duplicate 2647db96d56Sopenharmony_ci DupHandle = DupHandle 2657db96d56Sopenharmony_ci else: 2667db96d56Sopenharmony_ci sendfds = sendfds 2677db96d56Sopenharmony_ci recvfds = recvfds 2687db96d56Sopenharmony_ci DupFd = DupFd 2697db96d56Sopenharmony_ci 2707db96d56Sopenharmony_ci _reduce_method = _reduce_method 2717db96d56Sopenharmony_ci _reduce_method_descriptor = _reduce_method_descriptor 2727db96d56Sopenharmony_ci _rebuild_partial = _rebuild_partial 2737db96d56Sopenharmony_ci _reduce_socket = _reduce_socket 2747db96d56Sopenharmony_ci _rebuild_socket = _rebuild_socket 2757db96d56Sopenharmony_ci 2767db96d56Sopenharmony_ci def __init__(self, *args): 2777db96d56Sopenharmony_ci register(type(_C().f), _reduce_method) 2787db96d56Sopenharmony_ci register(type(list.append), _reduce_method_descriptor) 2797db96d56Sopenharmony_ci register(type(int.__add__), _reduce_method_descriptor) 2807db96d56Sopenharmony_ci register(functools.partial, _reduce_partial) 2817db96d56Sopenharmony_ci register(socket.socket, _reduce_socket) 282