17db96d56Sopenharmony_ci# 27db96d56Sopenharmony_ci# A higher level module for using sockets (or Windows named pipes) 37db96d56Sopenharmony_ci# 47db96d56Sopenharmony_ci# multiprocessing/connection.py 57db96d56Sopenharmony_ci# 67db96d56Sopenharmony_ci# Copyright (c) 2006-2008, R Oudkerk 77db96d56Sopenharmony_ci# Licensed to PSF under a Contributor Agreement. 87db96d56Sopenharmony_ci# 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_ci__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] 117db96d56Sopenharmony_ci 127db96d56Sopenharmony_ciimport io 137db96d56Sopenharmony_ciimport os 147db96d56Sopenharmony_ciimport sys 157db96d56Sopenharmony_ciimport socket 167db96d56Sopenharmony_ciimport struct 177db96d56Sopenharmony_ciimport time 187db96d56Sopenharmony_ciimport tempfile 197db96d56Sopenharmony_ciimport itertools 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_ciimport _multiprocessing 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_cifrom . import util 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_cifrom . import AuthenticationError, BufferTooShort 267db96d56Sopenharmony_cifrom .context import reduction 277db96d56Sopenharmony_ci_ForkingPickler = reduction.ForkingPickler 287db96d56Sopenharmony_ci 297db96d56Sopenharmony_citry: 307db96d56Sopenharmony_ci import _winapi 317db96d56Sopenharmony_ci from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE 327db96d56Sopenharmony_ciexcept ImportError: 337db96d56Sopenharmony_ci if sys.platform == 'win32': 347db96d56Sopenharmony_ci raise 357db96d56Sopenharmony_ci _winapi = None 367db96d56Sopenharmony_ci 377db96d56Sopenharmony_ci# 387db96d56Sopenharmony_ci# 397db96d56Sopenharmony_ci# 407db96d56Sopenharmony_ci 417db96d56Sopenharmony_ciBUFSIZE = 8192 427db96d56Sopenharmony_ci# A very generous timeout when it comes to local connections... 437db96d56Sopenharmony_ciCONNECTION_TIMEOUT = 20. 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci_mmap_counter = itertools.count() 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_cidefault_family = 'AF_INET' 487db96d56Sopenharmony_cifamilies = ['AF_INET'] 497db96d56Sopenharmony_ci 507db96d56Sopenharmony_ciif hasattr(socket, 'AF_UNIX'): 517db96d56Sopenharmony_ci default_family = 'AF_UNIX' 527db96d56Sopenharmony_ci families += ['AF_UNIX'] 537db96d56Sopenharmony_ci 547db96d56Sopenharmony_ciif sys.platform == 'win32': 557db96d56Sopenharmony_ci default_family = 'AF_PIPE' 567db96d56Sopenharmony_ci families += ['AF_PIPE'] 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ci 597db96d56Sopenharmony_cidef _init_timeout(timeout=CONNECTION_TIMEOUT): 607db96d56Sopenharmony_ci return time.monotonic() + timeout 617db96d56Sopenharmony_ci 627db96d56Sopenharmony_cidef _check_timeout(t): 637db96d56Sopenharmony_ci return time.monotonic() > t 647db96d56Sopenharmony_ci 657db96d56Sopenharmony_ci# 667db96d56Sopenharmony_ci# 677db96d56Sopenharmony_ci# 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_cidef arbitrary_address(family): 707db96d56Sopenharmony_ci ''' 717db96d56Sopenharmony_ci Return an arbitrary free address for the given family 727db96d56Sopenharmony_ci ''' 737db96d56Sopenharmony_ci if family == 'AF_INET': 747db96d56Sopenharmony_ci return ('localhost', 0) 757db96d56Sopenharmony_ci elif family == 'AF_UNIX': 767db96d56Sopenharmony_ci return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) 777db96d56Sopenharmony_ci elif family == 'AF_PIPE': 787db96d56Sopenharmony_ci return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % 797db96d56Sopenharmony_ci (os.getpid(), next(_mmap_counter)), dir="") 807db96d56Sopenharmony_ci else: 817db96d56Sopenharmony_ci raise ValueError('unrecognized family') 827db96d56Sopenharmony_ci 837db96d56Sopenharmony_cidef _validate_family(family): 847db96d56Sopenharmony_ci ''' 857db96d56Sopenharmony_ci Checks if the family is valid for the current environment. 867db96d56Sopenharmony_ci ''' 877db96d56Sopenharmony_ci if sys.platform != 'win32' and family == 'AF_PIPE': 887db96d56Sopenharmony_ci raise ValueError('Family %s is not recognized.' % family) 897db96d56Sopenharmony_ci 907db96d56Sopenharmony_ci if sys.platform == 'win32' and family == 'AF_UNIX': 917db96d56Sopenharmony_ci # double check 927db96d56Sopenharmony_ci if not hasattr(socket, family): 937db96d56Sopenharmony_ci raise ValueError('Family %s is not recognized.' % family) 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_cidef address_type(address): 967db96d56Sopenharmony_ci ''' 977db96d56Sopenharmony_ci Return the types of the address 987db96d56Sopenharmony_ci 997db96d56Sopenharmony_ci This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' 1007db96d56Sopenharmony_ci ''' 1017db96d56Sopenharmony_ci if type(address) == tuple: 1027db96d56Sopenharmony_ci return 'AF_INET' 1037db96d56Sopenharmony_ci elif type(address) is str and address.startswith('\\\\'): 1047db96d56Sopenharmony_ci return 'AF_PIPE' 1057db96d56Sopenharmony_ci elif type(address) is str or util.is_abstract_socket_namespace(address): 1067db96d56Sopenharmony_ci return 'AF_UNIX' 1077db96d56Sopenharmony_ci else: 1087db96d56Sopenharmony_ci raise ValueError('address type of %r unrecognized' % address) 1097db96d56Sopenharmony_ci 1107db96d56Sopenharmony_ci# 1117db96d56Sopenharmony_ci# Connection classes 1127db96d56Sopenharmony_ci# 1137db96d56Sopenharmony_ci 1147db96d56Sopenharmony_ciclass _ConnectionBase: 1157db96d56Sopenharmony_ci _handle = None 1167db96d56Sopenharmony_ci 1177db96d56Sopenharmony_ci def __init__(self, handle, readable=True, writable=True): 1187db96d56Sopenharmony_ci handle = handle.__index__() 1197db96d56Sopenharmony_ci if handle < 0: 1207db96d56Sopenharmony_ci raise ValueError("invalid handle") 1217db96d56Sopenharmony_ci if not readable and not writable: 1227db96d56Sopenharmony_ci raise ValueError( 1237db96d56Sopenharmony_ci "at least one of `readable` and `writable` must be True") 1247db96d56Sopenharmony_ci self._handle = handle 1257db96d56Sopenharmony_ci self._readable = readable 1267db96d56Sopenharmony_ci self._writable = writable 1277db96d56Sopenharmony_ci 1287db96d56Sopenharmony_ci # XXX should we use util.Finalize instead of a __del__? 1297db96d56Sopenharmony_ci 1307db96d56Sopenharmony_ci def __del__(self): 1317db96d56Sopenharmony_ci if self._handle is not None: 1327db96d56Sopenharmony_ci self._close() 1337db96d56Sopenharmony_ci 1347db96d56Sopenharmony_ci def _check_closed(self): 1357db96d56Sopenharmony_ci if self._handle is None: 1367db96d56Sopenharmony_ci raise OSError("handle is closed") 1377db96d56Sopenharmony_ci 1387db96d56Sopenharmony_ci def _check_readable(self): 1397db96d56Sopenharmony_ci if not self._readable: 1407db96d56Sopenharmony_ci raise OSError("connection is write-only") 1417db96d56Sopenharmony_ci 1427db96d56Sopenharmony_ci def _check_writable(self): 1437db96d56Sopenharmony_ci if not self._writable: 1447db96d56Sopenharmony_ci raise OSError("connection is read-only") 1457db96d56Sopenharmony_ci 1467db96d56Sopenharmony_ci def _bad_message_length(self): 1477db96d56Sopenharmony_ci if self._writable: 1487db96d56Sopenharmony_ci self._readable = False 1497db96d56Sopenharmony_ci else: 1507db96d56Sopenharmony_ci self.close() 1517db96d56Sopenharmony_ci raise OSError("bad message length") 1527db96d56Sopenharmony_ci 1537db96d56Sopenharmony_ci @property 1547db96d56Sopenharmony_ci def closed(self): 1557db96d56Sopenharmony_ci """True if the connection is closed""" 1567db96d56Sopenharmony_ci return self._handle is None 1577db96d56Sopenharmony_ci 1587db96d56Sopenharmony_ci @property 1597db96d56Sopenharmony_ci def readable(self): 1607db96d56Sopenharmony_ci """True if the connection is readable""" 1617db96d56Sopenharmony_ci return self._readable 1627db96d56Sopenharmony_ci 1637db96d56Sopenharmony_ci @property 1647db96d56Sopenharmony_ci def writable(self): 1657db96d56Sopenharmony_ci """True if the connection is writable""" 1667db96d56Sopenharmony_ci return self._writable 1677db96d56Sopenharmony_ci 1687db96d56Sopenharmony_ci def fileno(self): 1697db96d56Sopenharmony_ci """File descriptor or handle of the connection""" 1707db96d56Sopenharmony_ci self._check_closed() 1717db96d56Sopenharmony_ci return self._handle 1727db96d56Sopenharmony_ci 1737db96d56Sopenharmony_ci def close(self): 1747db96d56Sopenharmony_ci """Close the connection""" 1757db96d56Sopenharmony_ci if self._handle is not None: 1767db96d56Sopenharmony_ci try: 1777db96d56Sopenharmony_ci self._close() 1787db96d56Sopenharmony_ci finally: 1797db96d56Sopenharmony_ci self._handle = None 1807db96d56Sopenharmony_ci 1817db96d56Sopenharmony_ci def send_bytes(self, buf, offset=0, size=None): 1827db96d56Sopenharmony_ci """Send the bytes data from a bytes-like object""" 1837db96d56Sopenharmony_ci self._check_closed() 1847db96d56Sopenharmony_ci self._check_writable() 1857db96d56Sopenharmony_ci m = memoryview(buf) 1867db96d56Sopenharmony_ci if m.itemsize > 1: 1877db96d56Sopenharmony_ci m = m.cast('B') 1887db96d56Sopenharmony_ci n = m.nbytes 1897db96d56Sopenharmony_ci if offset < 0: 1907db96d56Sopenharmony_ci raise ValueError("offset is negative") 1917db96d56Sopenharmony_ci if n < offset: 1927db96d56Sopenharmony_ci raise ValueError("buffer length < offset") 1937db96d56Sopenharmony_ci if size is None: 1947db96d56Sopenharmony_ci size = n - offset 1957db96d56Sopenharmony_ci elif size < 0: 1967db96d56Sopenharmony_ci raise ValueError("size is negative") 1977db96d56Sopenharmony_ci elif offset + size > n: 1987db96d56Sopenharmony_ci raise ValueError("buffer length < offset + size") 1997db96d56Sopenharmony_ci self._send_bytes(m[offset:offset + size]) 2007db96d56Sopenharmony_ci 2017db96d56Sopenharmony_ci def send(self, obj): 2027db96d56Sopenharmony_ci """Send a (picklable) object""" 2037db96d56Sopenharmony_ci self._check_closed() 2047db96d56Sopenharmony_ci self._check_writable() 2057db96d56Sopenharmony_ci self._send_bytes(_ForkingPickler.dumps(obj)) 2067db96d56Sopenharmony_ci 2077db96d56Sopenharmony_ci def recv_bytes(self, maxlength=None): 2087db96d56Sopenharmony_ci """ 2097db96d56Sopenharmony_ci Receive bytes data as a bytes object. 2107db96d56Sopenharmony_ci """ 2117db96d56Sopenharmony_ci self._check_closed() 2127db96d56Sopenharmony_ci self._check_readable() 2137db96d56Sopenharmony_ci if maxlength is not None and maxlength < 0: 2147db96d56Sopenharmony_ci raise ValueError("negative maxlength") 2157db96d56Sopenharmony_ci buf = self._recv_bytes(maxlength) 2167db96d56Sopenharmony_ci if buf is None: 2177db96d56Sopenharmony_ci self._bad_message_length() 2187db96d56Sopenharmony_ci return buf.getvalue() 2197db96d56Sopenharmony_ci 2207db96d56Sopenharmony_ci def recv_bytes_into(self, buf, offset=0): 2217db96d56Sopenharmony_ci """ 2227db96d56Sopenharmony_ci Receive bytes data into a writeable bytes-like object. 2237db96d56Sopenharmony_ci Return the number of bytes read. 2247db96d56Sopenharmony_ci """ 2257db96d56Sopenharmony_ci self._check_closed() 2267db96d56Sopenharmony_ci self._check_readable() 2277db96d56Sopenharmony_ci with memoryview(buf) as m: 2287db96d56Sopenharmony_ci # Get bytesize of arbitrary buffer 2297db96d56Sopenharmony_ci itemsize = m.itemsize 2307db96d56Sopenharmony_ci bytesize = itemsize * len(m) 2317db96d56Sopenharmony_ci if offset < 0: 2327db96d56Sopenharmony_ci raise ValueError("negative offset") 2337db96d56Sopenharmony_ci elif offset > bytesize: 2347db96d56Sopenharmony_ci raise ValueError("offset too large") 2357db96d56Sopenharmony_ci result = self._recv_bytes() 2367db96d56Sopenharmony_ci size = result.tell() 2377db96d56Sopenharmony_ci if bytesize < offset + size: 2387db96d56Sopenharmony_ci raise BufferTooShort(result.getvalue()) 2397db96d56Sopenharmony_ci # Message can fit in dest 2407db96d56Sopenharmony_ci result.seek(0) 2417db96d56Sopenharmony_ci result.readinto(m[offset // itemsize : 2427db96d56Sopenharmony_ci (offset + size) // itemsize]) 2437db96d56Sopenharmony_ci return size 2447db96d56Sopenharmony_ci 2457db96d56Sopenharmony_ci def recv(self): 2467db96d56Sopenharmony_ci """Receive a (picklable) object""" 2477db96d56Sopenharmony_ci self._check_closed() 2487db96d56Sopenharmony_ci self._check_readable() 2497db96d56Sopenharmony_ci buf = self._recv_bytes() 2507db96d56Sopenharmony_ci return _ForkingPickler.loads(buf.getbuffer()) 2517db96d56Sopenharmony_ci 2527db96d56Sopenharmony_ci def poll(self, timeout=0.0): 2537db96d56Sopenharmony_ci """Whether there is any input available to be read""" 2547db96d56Sopenharmony_ci self._check_closed() 2557db96d56Sopenharmony_ci self._check_readable() 2567db96d56Sopenharmony_ci return self._poll(timeout) 2577db96d56Sopenharmony_ci 2587db96d56Sopenharmony_ci def __enter__(self): 2597db96d56Sopenharmony_ci return self 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ci def __exit__(self, exc_type, exc_value, exc_tb): 2627db96d56Sopenharmony_ci self.close() 2637db96d56Sopenharmony_ci 2647db96d56Sopenharmony_ci 2657db96d56Sopenharmony_ciif _winapi: 2667db96d56Sopenharmony_ci 2677db96d56Sopenharmony_ci class PipeConnection(_ConnectionBase): 2687db96d56Sopenharmony_ci """ 2697db96d56Sopenharmony_ci Connection class based on a Windows named pipe. 2707db96d56Sopenharmony_ci Overlapped I/O is used, so the handles must have been created 2717db96d56Sopenharmony_ci with FILE_FLAG_OVERLAPPED. 2727db96d56Sopenharmony_ci """ 2737db96d56Sopenharmony_ci _got_empty_message = False 2747db96d56Sopenharmony_ci 2757db96d56Sopenharmony_ci def _close(self, _CloseHandle=_winapi.CloseHandle): 2767db96d56Sopenharmony_ci _CloseHandle(self._handle) 2777db96d56Sopenharmony_ci 2787db96d56Sopenharmony_ci def _send_bytes(self, buf): 2797db96d56Sopenharmony_ci ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) 2807db96d56Sopenharmony_ci try: 2817db96d56Sopenharmony_ci if err == _winapi.ERROR_IO_PENDING: 2827db96d56Sopenharmony_ci waitres = _winapi.WaitForMultipleObjects( 2837db96d56Sopenharmony_ci [ov.event], False, INFINITE) 2847db96d56Sopenharmony_ci assert waitres == WAIT_OBJECT_0 2857db96d56Sopenharmony_ci except: 2867db96d56Sopenharmony_ci ov.cancel() 2877db96d56Sopenharmony_ci raise 2887db96d56Sopenharmony_ci finally: 2897db96d56Sopenharmony_ci nwritten, err = ov.GetOverlappedResult(True) 2907db96d56Sopenharmony_ci assert err == 0 2917db96d56Sopenharmony_ci assert nwritten == len(buf) 2927db96d56Sopenharmony_ci 2937db96d56Sopenharmony_ci def _recv_bytes(self, maxsize=None): 2947db96d56Sopenharmony_ci if self._got_empty_message: 2957db96d56Sopenharmony_ci self._got_empty_message = False 2967db96d56Sopenharmony_ci return io.BytesIO() 2977db96d56Sopenharmony_ci else: 2987db96d56Sopenharmony_ci bsize = 128 if maxsize is None else min(maxsize, 128) 2997db96d56Sopenharmony_ci try: 3007db96d56Sopenharmony_ci ov, err = _winapi.ReadFile(self._handle, bsize, 3017db96d56Sopenharmony_ci overlapped=True) 3027db96d56Sopenharmony_ci try: 3037db96d56Sopenharmony_ci if err == _winapi.ERROR_IO_PENDING: 3047db96d56Sopenharmony_ci waitres = _winapi.WaitForMultipleObjects( 3057db96d56Sopenharmony_ci [ov.event], False, INFINITE) 3067db96d56Sopenharmony_ci assert waitres == WAIT_OBJECT_0 3077db96d56Sopenharmony_ci except: 3087db96d56Sopenharmony_ci ov.cancel() 3097db96d56Sopenharmony_ci raise 3107db96d56Sopenharmony_ci finally: 3117db96d56Sopenharmony_ci nread, err = ov.GetOverlappedResult(True) 3127db96d56Sopenharmony_ci if err == 0: 3137db96d56Sopenharmony_ci f = io.BytesIO() 3147db96d56Sopenharmony_ci f.write(ov.getbuffer()) 3157db96d56Sopenharmony_ci return f 3167db96d56Sopenharmony_ci elif err == _winapi.ERROR_MORE_DATA: 3177db96d56Sopenharmony_ci return self._get_more_data(ov, maxsize) 3187db96d56Sopenharmony_ci except OSError as e: 3197db96d56Sopenharmony_ci if e.winerror == _winapi.ERROR_BROKEN_PIPE: 3207db96d56Sopenharmony_ci raise EOFError 3217db96d56Sopenharmony_ci else: 3227db96d56Sopenharmony_ci raise 3237db96d56Sopenharmony_ci raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") 3247db96d56Sopenharmony_ci 3257db96d56Sopenharmony_ci def _poll(self, timeout): 3267db96d56Sopenharmony_ci if (self._got_empty_message or 3277db96d56Sopenharmony_ci _winapi.PeekNamedPipe(self._handle)[0] != 0): 3287db96d56Sopenharmony_ci return True 3297db96d56Sopenharmony_ci return bool(wait([self], timeout)) 3307db96d56Sopenharmony_ci 3317db96d56Sopenharmony_ci def _get_more_data(self, ov, maxsize): 3327db96d56Sopenharmony_ci buf = ov.getbuffer() 3337db96d56Sopenharmony_ci f = io.BytesIO() 3347db96d56Sopenharmony_ci f.write(buf) 3357db96d56Sopenharmony_ci left = _winapi.PeekNamedPipe(self._handle)[1] 3367db96d56Sopenharmony_ci assert left > 0 3377db96d56Sopenharmony_ci if maxsize is not None and len(buf) + left > maxsize: 3387db96d56Sopenharmony_ci self._bad_message_length() 3397db96d56Sopenharmony_ci ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) 3407db96d56Sopenharmony_ci rbytes, err = ov.GetOverlappedResult(True) 3417db96d56Sopenharmony_ci assert err == 0 3427db96d56Sopenharmony_ci assert rbytes == left 3437db96d56Sopenharmony_ci f.write(ov.getbuffer()) 3447db96d56Sopenharmony_ci return f 3457db96d56Sopenharmony_ci 3467db96d56Sopenharmony_ci 3477db96d56Sopenharmony_ciclass Connection(_ConnectionBase): 3487db96d56Sopenharmony_ci """ 3497db96d56Sopenharmony_ci Connection class based on an arbitrary file descriptor (Unix only), or 3507db96d56Sopenharmony_ci a socket handle (Windows). 3517db96d56Sopenharmony_ci """ 3527db96d56Sopenharmony_ci 3537db96d56Sopenharmony_ci if _winapi: 3547db96d56Sopenharmony_ci def _close(self, _close=_multiprocessing.closesocket): 3557db96d56Sopenharmony_ci _close(self._handle) 3567db96d56Sopenharmony_ci _write = _multiprocessing.send 3577db96d56Sopenharmony_ci _read = _multiprocessing.recv 3587db96d56Sopenharmony_ci else: 3597db96d56Sopenharmony_ci def _close(self, _close=os.close): 3607db96d56Sopenharmony_ci _close(self._handle) 3617db96d56Sopenharmony_ci _write = os.write 3627db96d56Sopenharmony_ci _read = os.read 3637db96d56Sopenharmony_ci 3647db96d56Sopenharmony_ci def _send(self, buf, write=_write): 3657db96d56Sopenharmony_ci remaining = len(buf) 3667db96d56Sopenharmony_ci while True: 3677db96d56Sopenharmony_ci n = write(self._handle, buf) 3687db96d56Sopenharmony_ci remaining -= n 3697db96d56Sopenharmony_ci if remaining == 0: 3707db96d56Sopenharmony_ci break 3717db96d56Sopenharmony_ci buf = buf[n:] 3727db96d56Sopenharmony_ci 3737db96d56Sopenharmony_ci def _recv(self, size, read=_read): 3747db96d56Sopenharmony_ci buf = io.BytesIO() 3757db96d56Sopenharmony_ci handle = self._handle 3767db96d56Sopenharmony_ci remaining = size 3777db96d56Sopenharmony_ci while remaining > 0: 3787db96d56Sopenharmony_ci chunk = read(handle, remaining) 3797db96d56Sopenharmony_ci n = len(chunk) 3807db96d56Sopenharmony_ci if n == 0: 3817db96d56Sopenharmony_ci if remaining == size: 3827db96d56Sopenharmony_ci raise EOFError 3837db96d56Sopenharmony_ci else: 3847db96d56Sopenharmony_ci raise OSError("got end of file during message") 3857db96d56Sopenharmony_ci buf.write(chunk) 3867db96d56Sopenharmony_ci remaining -= n 3877db96d56Sopenharmony_ci return buf 3887db96d56Sopenharmony_ci 3897db96d56Sopenharmony_ci def _send_bytes(self, buf): 3907db96d56Sopenharmony_ci n = len(buf) 3917db96d56Sopenharmony_ci if n > 0x7fffffff: 3927db96d56Sopenharmony_ci pre_header = struct.pack("!i", -1) 3937db96d56Sopenharmony_ci header = struct.pack("!Q", n) 3947db96d56Sopenharmony_ci self._send(pre_header) 3957db96d56Sopenharmony_ci self._send(header) 3967db96d56Sopenharmony_ci self._send(buf) 3977db96d56Sopenharmony_ci else: 3987db96d56Sopenharmony_ci # For wire compatibility with 3.7 and lower 3997db96d56Sopenharmony_ci header = struct.pack("!i", n) 4007db96d56Sopenharmony_ci if n > 16384: 4017db96d56Sopenharmony_ci # The payload is large so Nagle's algorithm won't be triggered 4027db96d56Sopenharmony_ci # and we'd better avoid the cost of concatenation. 4037db96d56Sopenharmony_ci self._send(header) 4047db96d56Sopenharmony_ci self._send(buf) 4057db96d56Sopenharmony_ci else: 4067db96d56Sopenharmony_ci # Issue #20540: concatenate before sending, to avoid delays due 4077db96d56Sopenharmony_ci # to Nagle's algorithm on a TCP socket. 4087db96d56Sopenharmony_ci # Also note we want to avoid sending a 0-length buffer separately, 4097db96d56Sopenharmony_ci # to avoid "broken pipe" errors if the other end closed the pipe. 4107db96d56Sopenharmony_ci self._send(header + buf) 4117db96d56Sopenharmony_ci 4127db96d56Sopenharmony_ci def _recv_bytes(self, maxsize=None): 4137db96d56Sopenharmony_ci buf = self._recv(4) 4147db96d56Sopenharmony_ci size, = struct.unpack("!i", buf.getvalue()) 4157db96d56Sopenharmony_ci if size == -1: 4167db96d56Sopenharmony_ci buf = self._recv(8) 4177db96d56Sopenharmony_ci size, = struct.unpack("!Q", buf.getvalue()) 4187db96d56Sopenharmony_ci if maxsize is not None and size > maxsize: 4197db96d56Sopenharmony_ci return None 4207db96d56Sopenharmony_ci return self._recv(size) 4217db96d56Sopenharmony_ci 4227db96d56Sopenharmony_ci def _poll(self, timeout): 4237db96d56Sopenharmony_ci r = wait([self], timeout) 4247db96d56Sopenharmony_ci return bool(r) 4257db96d56Sopenharmony_ci 4267db96d56Sopenharmony_ci 4277db96d56Sopenharmony_ci# 4287db96d56Sopenharmony_ci# Public functions 4297db96d56Sopenharmony_ci# 4307db96d56Sopenharmony_ci 4317db96d56Sopenharmony_ciclass Listener(object): 4327db96d56Sopenharmony_ci ''' 4337db96d56Sopenharmony_ci Returns a listener object. 4347db96d56Sopenharmony_ci 4357db96d56Sopenharmony_ci This is a wrapper for a bound socket which is 'listening' for 4367db96d56Sopenharmony_ci connections, or for a Windows named pipe. 4377db96d56Sopenharmony_ci ''' 4387db96d56Sopenharmony_ci def __init__(self, address=None, family=None, backlog=1, authkey=None): 4397db96d56Sopenharmony_ci family = family or (address and address_type(address)) \ 4407db96d56Sopenharmony_ci or default_family 4417db96d56Sopenharmony_ci address = address or arbitrary_address(family) 4427db96d56Sopenharmony_ci 4437db96d56Sopenharmony_ci _validate_family(family) 4447db96d56Sopenharmony_ci if family == 'AF_PIPE': 4457db96d56Sopenharmony_ci self._listener = PipeListener(address, backlog) 4467db96d56Sopenharmony_ci else: 4477db96d56Sopenharmony_ci self._listener = SocketListener(address, family, backlog) 4487db96d56Sopenharmony_ci 4497db96d56Sopenharmony_ci if authkey is not None and not isinstance(authkey, bytes): 4507db96d56Sopenharmony_ci raise TypeError('authkey should be a byte string') 4517db96d56Sopenharmony_ci 4527db96d56Sopenharmony_ci self._authkey = authkey 4537db96d56Sopenharmony_ci 4547db96d56Sopenharmony_ci def accept(self): 4557db96d56Sopenharmony_ci ''' 4567db96d56Sopenharmony_ci Accept a connection on the bound socket or named pipe of `self`. 4577db96d56Sopenharmony_ci 4587db96d56Sopenharmony_ci Returns a `Connection` object. 4597db96d56Sopenharmony_ci ''' 4607db96d56Sopenharmony_ci if self._listener is None: 4617db96d56Sopenharmony_ci raise OSError('listener is closed') 4627db96d56Sopenharmony_ci c = self._listener.accept() 4637db96d56Sopenharmony_ci if self._authkey: 4647db96d56Sopenharmony_ci deliver_challenge(c, self._authkey) 4657db96d56Sopenharmony_ci answer_challenge(c, self._authkey) 4667db96d56Sopenharmony_ci return c 4677db96d56Sopenharmony_ci 4687db96d56Sopenharmony_ci def close(self): 4697db96d56Sopenharmony_ci ''' 4707db96d56Sopenharmony_ci Close the bound socket or named pipe of `self`. 4717db96d56Sopenharmony_ci ''' 4727db96d56Sopenharmony_ci listener = self._listener 4737db96d56Sopenharmony_ci if listener is not None: 4747db96d56Sopenharmony_ci self._listener = None 4757db96d56Sopenharmony_ci listener.close() 4767db96d56Sopenharmony_ci 4777db96d56Sopenharmony_ci @property 4787db96d56Sopenharmony_ci def address(self): 4797db96d56Sopenharmony_ci return self._listener._address 4807db96d56Sopenharmony_ci 4817db96d56Sopenharmony_ci @property 4827db96d56Sopenharmony_ci def last_accepted(self): 4837db96d56Sopenharmony_ci return self._listener._last_accepted 4847db96d56Sopenharmony_ci 4857db96d56Sopenharmony_ci def __enter__(self): 4867db96d56Sopenharmony_ci return self 4877db96d56Sopenharmony_ci 4887db96d56Sopenharmony_ci def __exit__(self, exc_type, exc_value, exc_tb): 4897db96d56Sopenharmony_ci self.close() 4907db96d56Sopenharmony_ci 4917db96d56Sopenharmony_ci 4927db96d56Sopenharmony_cidef Client(address, family=None, authkey=None): 4937db96d56Sopenharmony_ci ''' 4947db96d56Sopenharmony_ci Returns a connection to the address of a `Listener` 4957db96d56Sopenharmony_ci ''' 4967db96d56Sopenharmony_ci family = family or address_type(address) 4977db96d56Sopenharmony_ci _validate_family(family) 4987db96d56Sopenharmony_ci if family == 'AF_PIPE': 4997db96d56Sopenharmony_ci c = PipeClient(address) 5007db96d56Sopenharmony_ci else: 5017db96d56Sopenharmony_ci c = SocketClient(address) 5027db96d56Sopenharmony_ci 5037db96d56Sopenharmony_ci if authkey is not None and not isinstance(authkey, bytes): 5047db96d56Sopenharmony_ci raise TypeError('authkey should be a byte string') 5057db96d56Sopenharmony_ci 5067db96d56Sopenharmony_ci if authkey is not None: 5077db96d56Sopenharmony_ci answer_challenge(c, authkey) 5087db96d56Sopenharmony_ci deliver_challenge(c, authkey) 5097db96d56Sopenharmony_ci 5107db96d56Sopenharmony_ci return c 5117db96d56Sopenharmony_ci 5127db96d56Sopenharmony_ci 5137db96d56Sopenharmony_ciif sys.platform != 'win32': 5147db96d56Sopenharmony_ci 5157db96d56Sopenharmony_ci def Pipe(duplex=True): 5167db96d56Sopenharmony_ci ''' 5177db96d56Sopenharmony_ci Returns pair of connection objects at either end of a pipe 5187db96d56Sopenharmony_ci ''' 5197db96d56Sopenharmony_ci if duplex: 5207db96d56Sopenharmony_ci s1, s2 = socket.socketpair() 5217db96d56Sopenharmony_ci s1.setblocking(True) 5227db96d56Sopenharmony_ci s2.setblocking(True) 5237db96d56Sopenharmony_ci c1 = Connection(s1.detach()) 5247db96d56Sopenharmony_ci c2 = Connection(s2.detach()) 5257db96d56Sopenharmony_ci else: 5267db96d56Sopenharmony_ci fd1, fd2 = os.pipe() 5277db96d56Sopenharmony_ci c1 = Connection(fd1, writable=False) 5287db96d56Sopenharmony_ci c2 = Connection(fd2, readable=False) 5297db96d56Sopenharmony_ci 5307db96d56Sopenharmony_ci return c1, c2 5317db96d56Sopenharmony_ci 5327db96d56Sopenharmony_cielse: 5337db96d56Sopenharmony_ci 5347db96d56Sopenharmony_ci def Pipe(duplex=True): 5357db96d56Sopenharmony_ci ''' 5367db96d56Sopenharmony_ci Returns pair of connection objects at either end of a pipe 5377db96d56Sopenharmony_ci ''' 5387db96d56Sopenharmony_ci address = arbitrary_address('AF_PIPE') 5397db96d56Sopenharmony_ci if duplex: 5407db96d56Sopenharmony_ci openmode = _winapi.PIPE_ACCESS_DUPLEX 5417db96d56Sopenharmony_ci access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE 5427db96d56Sopenharmony_ci obsize, ibsize = BUFSIZE, BUFSIZE 5437db96d56Sopenharmony_ci else: 5447db96d56Sopenharmony_ci openmode = _winapi.PIPE_ACCESS_INBOUND 5457db96d56Sopenharmony_ci access = _winapi.GENERIC_WRITE 5467db96d56Sopenharmony_ci obsize, ibsize = 0, BUFSIZE 5477db96d56Sopenharmony_ci 5487db96d56Sopenharmony_ci h1 = _winapi.CreateNamedPipe( 5497db96d56Sopenharmony_ci address, openmode | _winapi.FILE_FLAG_OVERLAPPED | 5507db96d56Sopenharmony_ci _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, 5517db96d56Sopenharmony_ci _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 5527db96d56Sopenharmony_ci _winapi.PIPE_WAIT, 5537db96d56Sopenharmony_ci 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, 5547db96d56Sopenharmony_ci # default security descriptor: the handle cannot be inherited 5557db96d56Sopenharmony_ci _winapi.NULL 5567db96d56Sopenharmony_ci ) 5577db96d56Sopenharmony_ci h2 = _winapi.CreateFile( 5587db96d56Sopenharmony_ci address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, 5597db96d56Sopenharmony_ci _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 5607db96d56Sopenharmony_ci ) 5617db96d56Sopenharmony_ci _winapi.SetNamedPipeHandleState( 5627db96d56Sopenharmony_ci h2, _winapi.PIPE_READMODE_MESSAGE, None, None 5637db96d56Sopenharmony_ci ) 5647db96d56Sopenharmony_ci 5657db96d56Sopenharmony_ci overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) 5667db96d56Sopenharmony_ci _, err = overlapped.GetOverlappedResult(True) 5677db96d56Sopenharmony_ci assert err == 0 5687db96d56Sopenharmony_ci 5697db96d56Sopenharmony_ci c1 = PipeConnection(h1, writable=duplex) 5707db96d56Sopenharmony_ci c2 = PipeConnection(h2, readable=duplex) 5717db96d56Sopenharmony_ci 5727db96d56Sopenharmony_ci return c1, c2 5737db96d56Sopenharmony_ci 5747db96d56Sopenharmony_ci# 5757db96d56Sopenharmony_ci# Definitions for connections based on sockets 5767db96d56Sopenharmony_ci# 5777db96d56Sopenharmony_ci 5787db96d56Sopenharmony_ciclass SocketListener(object): 5797db96d56Sopenharmony_ci ''' 5807db96d56Sopenharmony_ci Representation of a socket which is bound to an address and listening 5817db96d56Sopenharmony_ci ''' 5827db96d56Sopenharmony_ci def __init__(self, address, family, backlog=1): 5837db96d56Sopenharmony_ci self._socket = socket.socket(getattr(socket, family)) 5847db96d56Sopenharmony_ci try: 5857db96d56Sopenharmony_ci # SO_REUSEADDR has different semantics on Windows (issue #2550). 5867db96d56Sopenharmony_ci if os.name == 'posix': 5877db96d56Sopenharmony_ci self._socket.setsockopt(socket.SOL_SOCKET, 5887db96d56Sopenharmony_ci socket.SO_REUSEADDR, 1) 5897db96d56Sopenharmony_ci self._socket.setblocking(True) 5907db96d56Sopenharmony_ci self._socket.bind(address) 5917db96d56Sopenharmony_ci self._socket.listen(backlog) 5927db96d56Sopenharmony_ci self._address = self._socket.getsockname() 5937db96d56Sopenharmony_ci except OSError: 5947db96d56Sopenharmony_ci self._socket.close() 5957db96d56Sopenharmony_ci raise 5967db96d56Sopenharmony_ci self._family = family 5977db96d56Sopenharmony_ci self._last_accepted = None 5987db96d56Sopenharmony_ci 5997db96d56Sopenharmony_ci if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address): 6007db96d56Sopenharmony_ci # Linux abstract socket namespaces do not need to be explicitly unlinked 6017db96d56Sopenharmony_ci self._unlink = util.Finalize( 6027db96d56Sopenharmony_ci self, os.unlink, args=(address,), exitpriority=0 6037db96d56Sopenharmony_ci ) 6047db96d56Sopenharmony_ci else: 6057db96d56Sopenharmony_ci self._unlink = None 6067db96d56Sopenharmony_ci 6077db96d56Sopenharmony_ci def accept(self): 6087db96d56Sopenharmony_ci s, self._last_accepted = self._socket.accept() 6097db96d56Sopenharmony_ci s.setblocking(True) 6107db96d56Sopenharmony_ci return Connection(s.detach()) 6117db96d56Sopenharmony_ci 6127db96d56Sopenharmony_ci def close(self): 6137db96d56Sopenharmony_ci try: 6147db96d56Sopenharmony_ci self._socket.close() 6157db96d56Sopenharmony_ci finally: 6167db96d56Sopenharmony_ci unlink = self._unlink 6177db96d56Sopenharmony_ci if unlink is not None: 6187db96d56Sopenharmony_ci self._unlink = None 6197db96d56Sopenharmony_ci unlink() 6207db96d56Sopenharmony_ci 6217db96d56Sopenharmony_ci 6227db96d56Sopenharmony_cidef SocketClient(address): 6237db96d56Sopenharmony_ci ''' 6247db96d56Sopenharmony_ci Return a connection object connected to the socket given by `address` 6257db96d56Sopenharmony_ci ''' 6267db96d56Sopenharmony_ci family = address_type(address) 6277db96d56Sopenharmony_ci with socket.socket( getattr(socket, family) ) as s: 6287db96d56Sopenharmony_ci s.setblocking(True) 6297db96d56Sopenharmony_ci s.connect(address) 6307db96d56Sopenharmony_ci return Connection(s.detach()) 6317db96d56Sopenharmony_ci 6327db96d56Sopenharmony_ci# 6337db96d56Sopenharmony_ci# Definitions for connections based on named pipes 6347db96d56Sopenharmony_ci# 6357db96d56Sopenharmony_ci 6367db96d56Sopenharmony_ciif sys.platform == 'win32': 6377db96d56Sopenharmony_ci 6387db96d56Sopenharmony_ci class PipeListener(object): 6397db96d56Sopenharmony_ci ''' 6407db96d56Sopenharmony_ci Representation of a named pipe 6417db96d56Sopenharmony_ci ''' 6427db96d56Sopenharmony_ci def __init__(self, address, backlog=None): 6437db96d56Sopenharmony_ci self._address = address 6447db96d56Sopenharmony_ci self._handle_queue = [self._new_handle(first=True)] 6457db96d56Sopenharmony_ci 6467db96d56Sopenharmony_ci self._last_accepted = None 6477db96d56Sopenharmony_ci util.sub_debug('listener created with address=%r', self._address) 6487db96d56Sopenharmony_ci self.close = util.Finalize( 6497db96d56Sopenharmony_ci self, PipeListener._finalize_pipe_listener, 6507db96d56Sopenharmony_ci args=(self._handle_queue, self._address), exitpriority=0 6517db96d56Sopenharmony_ci ) 6527db96d56Sopenharmony_ci 6537db96d56Sopenharmony_ci def _new_handle(self, first=False): 6547db96d56Sopenharmony_ci flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 6557db96d56Sopenharmony_ci if first: 6567db96d56Sopenharmony_ci flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 6577db96d56Sopenharmony_ci return _winapi.CreateNamedPipe( 6587db96d56Sopenharmony_ci self._address, flags, 6597db96d56Sopenharmony_ci _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 6607db96d56Sopenharmony_ci _winapi.PIPE_WAIT, 6617db96d56Sopenharmony_ci _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 6627db96d56Sopenharmony_ci _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL 6637db96d56Sopenharmony_ci ) 6647db96d56Sopenharmony_ci 6657db96d56Sopenharmony_ci def accept(self): 6667db96d56Sopenharmony_ci self._handle_queue.append(self._new_handle()) 6677db96d56Sopenharmony_ci handle = self._handle_queue.pop(0) 6687db96d56Sopenharmony_ci try: 6697db96d56Sopenharmony_ci ov = _winapi.ConnectNamedPipe(handle, overlapped=True) 6707db96d56Sopenharmony_ci except OSError as e: 6717db96d56Sopenharmony_ci if e.winerror != _winapi.ERROR_NO_DATA: 6727db96d56Sopenharmony_ci raise 6737db96d56Sopenharmony_ci # ERROR_NO_DATA can occur if a client has already connected, 6747db96d56Sopenharmony_ci # written data and then disconnected -- see Issue 14725. 6757db96d56Sopenharmony_ci else: 6767db96d56Sopenharmony_ci try: 6777db96d56Sopenharmony_ci res = _winapi.WaitForMultipleObjects( 6787db96d56Sopenharmony_ci [ov.event], False, INFINITE) 6797db96d56Sopenharmony_ci except: 6807db96d56Sopenharmony_ci ov.cancel() 6817db96d56Sopenharmony_ci _winapi.CloseHandle(handle) 6827db96d56Sopenharmony_ci raise 6837db96d56Sopenharmony_ci finally: 6847db96d56Sopenharmony_ci _, err = ov.GetOverlappedResult(True) 6857db96d56Sopenharmony_ci assert err == 0 6867db96d56Sopenharmony_ci return PipeConnection(handle) 6877db96d56Sopenharmony_ci 6887db96d56Sopenharmony_ci @staticmethod 6897db96d56Sopenharmony_ci def _finalize_pipe_listener(queue, address): 6907db96d56Sopenharmony_ci util.sub_debug('closing listener with address=%r', address) 6917db96d56Sopenharmony_ci for handle in queue: 6927db96d56Sopenharmony_ci _winapi.CloseHandle(handle) 6937db96d56Sopenharmony_ci 6947db96d56Sopenharmony_ci def PipeClient(address): 6957db96d56Sopenharmony_ci ''' 6967db96d56Sopenharmony_ci Return a connection object connected to the pipe given by `address` 6977db96d56Sopenharmony_ci ''' 6987db96d56Sopenharmony_ci t = _init_timeout() 6997db96d56Sopenharmony_ci while 1: 7007db96d56Sopenharmony_ci try: 7017db96d56Sopenharmony_ci _winapi.WaitNamedPipe(address, 1000) 7027db96d56Sopenharmony_ci h = _winapi.CreateFile( 7037db96d56Sopenharmony_ci address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 7047db96d56Sopenharmony_ci 0, _winapi.NULL, _winapi.OPEN_EXISTING, 7057db96d56Sopenharmony_ci _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL 7067db96d56Sopenharmony_ci ) 7077db96d56Sopenharmony_ci except OSError as e: 7087db96d56Sopenharmony_ci if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, 7097db96d56Sopenharmony_ci _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): 7107db96d56Sopenharmony_ci raise 7117db96d56Sopenharmony_ci else: 7127db96d56Sopenharmony_ci break 7137db96d56Sopenharmony_ci else: 7147db96d56Sopenharmony_ci raise 7157db96d56Sopenharmony_ci 7167db96d56Sopenharmony_ci _winapi.SetNamedPipeHandleState( 7177db96d56Sopenharmony_ci h, _winapi.PIPE_READMODE_MESSAGE, None, None 7187db96d56Sopenharmony_ci ) 7197db96d56Sopenharmony_ci return PipeConnection(h) 7207db96d56Sopenharmony_ci 7217db96d56Sopenharmony_ci# 7227db96d56Sopenharmony_ci# Authentication stuff 7237db96d56Sopenharmony_ci# 7247db96d56Sopenharmony_ci 7257db96d56Sopenharmony_ciMESSAGE_LENGTH = 20 7267db96d56Sopenharmony_ci 7277db96d56Sopenharmony_ciCHALLENGE = b'#CHALLENGE#' 7287db96d56Sopenharmony_ciWELCOME = b'#WELCOME#' 7297db96d56Sopenharmony_ciFAILURE = b'#FAILURE#' 7307db96d56Sopenharmony_ci 7317db96d56Sopenharmony_cidef deliver_challenge(connection, authkey): 7327db96d56Sopenharmony_ci import hmac 7337db96d56Sopenharmony_ci if not isinstance(authkey, bytes): 7347db96d56Sopenharmony_ci raise ValueError( 7357db96d56Sopenharmony_ci "Authkey must be bytes, not {0!s}".format(type(authkey))) 7367db96d56Sopenharmony_ci message = os.urandom(MESSAGE_LENGTH) 7377db96d56Sopenharmony_ci connection.send_bytes(CHALLENGE + message) 7387db96d56Sopenharmony_ci digest = hmac.new(authkey, message, 'md5').digest() 7397db96d56Sopenharmony_ci response = connection.recv_bytes(256) # reject large message 7407db96d56Sopenharmony_ci if response == digest: 7417db96d56Sopenharmony_ci connection.send_bytes(WELCOME) 7427db96d56Sopenharmony_ci else: 7437db96d56Sopenharmony_ci connection.send_bytes(FAILURE) 7447db96d56Sopenharmony_ci raise AuthenticationError('digest received was wrong') 7457db96d56Sopenharmony_ci 7467db96d56Sopenharmony_cidef answer_challenge(connection, authkey): 7477db96d56Sopenharmony_ci import hmac 7487db96d56Sopenharmony_ci if not isinstance(authkey, bytes): 7497db96d56Sopenharmony_ci raise ValueError( 7507db96d56Sopenharmony_ci "Authkey must be bytes, not {0!s}".format(type(authkey))) 7517db96d56Sopenharmony_ci message = connection.recv_bytes(256) # reject large message 7527db96d56Sopenharmony_ci assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message 7537db96d56Sopenharmony_ci message = message[len(CHALLENGE):] 7547db96d56Sopenharmony_ci digest = hmac.new(authkey, message, 'md5').digest() 7557db96d56Sopenharmony_ci connection.send_bytes(digest) 7567db96d56Sopenharmony_ci response = connection.recv_bytes(256) # reject large message 7577db96d56Sopenharmony_ci if response != WELCOME: 7587db96d56Sopenharmony_ci raise AuthenticationError('digest sent was rejected') 7597db96d56Sopenharmony_ci 7607db96d56Sopenharmony_ci# 7617db96d56Sopenharmony_ci# Support for using xmlrpclib for serialization 7627db96d56Sopenharmony_ci# 7637db96d56Sopenharmony_ci 7647db96d56Sopenharmony_ciclass ConnectionWrapper(object): 7657db96d56Sopenharmony_ci def __init__(self, conn, dumps, loads): 7667db96d56Sopenharmony_ci self._conn = conn 7677db96d56Sopenharmony_ci self._dumps = dumps 7687db96d56Sopenharmony_ci self._loads = loads 7697db96d56Sopenharmony_ci for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): 7707db96d56Sopenharmony_ci obj = getattr(conn, attr) 7717db96d56Sopenharmony_ci setattr(self, attr, obj) 7727db96d56Sopenharmony_ci def send(self, obj): 7737db96d56Sopenharmony_ci s = self._dumps(obj) 7747db96d56Sopenharmony_ci self._conn.send_bytes(s) 7757db96d56Sopenharmony_ci def recv(self): 7767db96d56Sopenharmony_ci s = self._conn.recv_bytes() 7777db96d56Sopenharmony_ci return self._loads(s) 7787db96d56Sopenharmony_ci 7797db96d56Sopenharmony_cidef _xml_dumps(obj): 7807db96d56Sopenharmony_ci return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') 7817db96d56Sopenharmony_ci 7827db96d56Sopenharmony_cidef _xml_loads(s): 7837db96d56Sopenharmony_ci (obj,), method = xmlrpclib.loads(s.decode('utf-8')) 7847db96d56Sopenharmony_ci return obj 7857db96d56Sopenharmony_ci 7867db96d56Sopenharmony_ciclass XmlListener(Listener): 7877db96d56Sopenharmony_ci def accept(self): 7887db96d56Sopenharmony_ci global xmlrpclib 7897db96d56Sopenharmony_ci import xmlrpc.client as xmlrpclib 7907db96d56Sopenharmony_ci obj = Listener.accept(self) 7917db96d56Sopenharmony_ci return ConnectionWrapper(obj, _xml_dumps, _xml_loads) 7927db96d56Sopenharmony_ci 7937db96d56Sopenharmony_cidef XmlClient(*args, **kwds): 7947db96d56Sopenharmony_ci global xmlrpclib 7957db96d56Sopenharmony_ci import xmlrpc.client as xmlrpclib 7967db96d56Sopenharmony_ci return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) 7977db96d56Sopenharmony_ci 7987db96d56Sopenharmony_ci# 7997db96d56Sopenharmony_ci# Wait 8007db96d56Sopenharmony_ci# 8017db96d56Sopenharmony_ci 8027db96d56Sopenharmony_ciif sys.platform == 'win32': 8037db96d56Sopenharmony_ci 8047db96d56Sopenharmony_ci def _exhaustive_wait(handles, timeout): 8057db96d56Sopenharmony_ci # Return ALL handles which are currently signalled. (Only 8067db96d56Sopenharmony_ci # returning the first signalled might create starvation issues.) 8077db96d56Sopenharmony_ci L = list(handles) 8087db96d56Sopenharmony_ci ready = [] 8097db96d56Sopenharmony_ci while L: 8107db96d56Sopenharmony_ci res = _winapi.WaitForMultipleObjects(L, False, timeout) 8117db96d56Sopenharmony_ci if res == WAIT_TIMEOUT: 8127db96d56Sopenharmony_ci break 8137db96d56Sopenharmony_ci elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): 8147db96d56Sopenharmony_ci res -= WAIT_OBJECT_0 8157db96d56Sopenharmony_ci elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): 8167db96d56Sopenharmony_ci res -= WAIT_ABANDONED_0 8177db96d56Sopenharmony_ci else: 8187db96d56Sopenharmony_ci raise RuntimeError('Should not get here') 8197db96d56Sopenharmony_ci ready.append(L[res]) 8207db96d56Sopenharmony_ci L = L[res+1:] 8217db96d56Sopenharmony_ci timeout = 0 8227db96d56Sopenharmony_ci return ready 8237db96d56Sopenharmony_ci 8247db96d56Sopenharmony_ci _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} 8257db96d56Sopenharmony_ci 8267db96d56Sopenharmony_ci def wait(object_list, timeout=None): 8277db96d56Sopenharmony_ci ''' 8287db96d56Sopenharmony_ci Wait till an object in object_list is ready/readable. 8297db96d56Sopenharmony_ci 8307db96d56Sopenharmony_ci Returns list of those objects in object_list which are ready/readable. 8317db96d56Sopenharmony_ci ''' 8327db96d56Sopenharmony_ci if timeout is None: 8337db96d56Sopenharmony_ci timeout = INFINITE 8347db96d56Sopenharmony_ci elif timeout < 0: 8357db96d56Sopenharmony_ci timeout = 0 8367db96d56Sopenharmony_ci else: 8377db96d56Sopenharmony_ci timeout = int(timeout * 1000 + 0.5) 8387db96d56Sopenharmony_ci 8397db96d56Sopenharmony_ci object_list = list(object_list) 8407db96d56Sopenharmony_ci waithandle_to_obj = {} 8417db96d56Sopenharmony_ci ov_list = [] 8427db96d56Sopenharmony_ci ready_objects = set() 8437db96d56Sopenharmony_ci ready_handles = set() 8447db96d56Sopenharmony_ci 8457db96d56Sopenharmony_ci try: 8467db96d56Sopenharmony_ci for o in object_list: 8477db96d56Sopenharmony_ci try: 8487db96d56Sopenharmony_ci fileno = getattr(o, 'fileno') 8497db96d56Sopenharmony_ci except AttributeError: 8507db96d56Sopenharmony_ci waithandle_to_obj[o.__index__()] = o 8517db96d56Sopenharmony_ci else: 8527db96d56Sopenharmony_ci # start an overlapped read of length zero 8537db96d56Sopenharmony_ci try: 8547db96d56Sopenharmony_ci ov, err = _winapi.ReadFile(fileno(), 0, True) 8557db96d56Sopenharmony_ci except OSError as e: 8567db96d56Sopenharmony_ci ov, err = None, e.winerror 8577db96d56Sopenharmony_ci if err not in _ready_errors: 8587db96d56Sopenharmony_ci raise 8597db96d56Sopenharmony_ci if err == _winapi.ERROR_IO_PENDING: 8607db96d56Sopenharmony_ci ov_list.append(ov) 8617db96d56Sopenharmony_ci waithandle_to_obj[ov.event] = o 8627db96d56Sopenharmony_ci else: 8637db96d56Sopenharmony_ci # If o.fileno() is an overlapped pipe handle and 8647db96d56Sopenharmony_ci # err == 0 then there is a zero length message 8657db96d56Sopenharmony_ci # in the pipe, but it HAS NOT been consumed... 8667db96d56Sopenharmony_ci if ov and sys.getwindowsversion()[:2] >= (6, 2): 8677db96d56Sopenharmony_ci # ... except on Windows 8 and later, where 8687db96d56Sopenharmony_ci # the message HAS been consumed. 8697db96d56Sopenharmony_ci try: 8707db96d56Sopenharmony_ci _, err = ov.GetOverlappedResult(False) 8717db96d56Sopenharmony_ci except OSError as e: 8727db96d56Sopenharmony_ci err = e.winerror 8737db96d56Sopenharmony_ci if not err and hasattr(o, '_got_empty_message'): 8747db96d56Sopenharmony_ci o._got_empty_message = True 8757db96d56Sopenharmony_ci ready_objects.add(o) 8767db96d56Sopenharmony_ci timeout = 0 8777db96d56Sopenharmony_ci 8787db96d56Sopenharmony_ci ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) 8797db96d56Sopenharmony_ci finally: 8807db96d56Sopenharmony_ci # request that overlapped reads stop 8817db96d56Sopenharmony_ci for ov in ov_list: 8827db96d56Sopenharmony_ci ov.cancel() 8837db96d56Sopenharmony_ci 8847db96d56Sopenharmony_ci # wait for all overlapped reads to stop 8857db96d56Sopenharmony_ci for ov in ov_list: 8867db96d56Sopenharmony_ci try: 8877db96d56Sopenharmony_ci _, err = ov.GetOverlappedResult(True) 8887db96d56Sopenharmony_ci except OSError as e: 8897db96d56Sopenharmony_ci err = e.winerror 8907db96d56Sopenharmony_ci if err not in _ready_errors: 8917db96d56Sopenharmony_ci raise 8927db96d56Sopenharmony_ci if err != _winapi.ERROR_OPERATION_ABORTED: 8937db96d56Sopenharmony_ci o = waithandle_to_obj[ov.event] 8947db96d56Sopenharmony_ci ready_objects.add(o) 8957db96d56Sopenharmony_ci if err == 0: 8967db96d56Sopenharmony_ci # If o.fileno() is an overlapped pipe handle then 8977db96d56Sopenharmony_ci # a zero length message HAS been consumed. 8987db96d56Sopenharmony_ci if hasattr(o, '_got_empty_message'): 8997db96d56Sopenharmony_ci o._got_empty_message = True 9007db96d56Sopenharmony_ci 9017db96d56Sopenharmony_ci ready_objects.update(waithandle_to_obj[h] for h in ready_handles) 9027db96d56Sopenharmony_ci return [o for o in object_list if o in ready_objects] 9037db96d56Sopenharmony_ci 9047db96d56Sopenharmony_cielse: 9057db96d56Sopenharmony_ci 9067db96d56Sopenharmony_ci import selectors 9077db96d56Sopenharmony_ci 9087db96d56Sopenharmony_ci # poll/select have the advantage of not requiring any extra file 9097db96d56Sopenharmony_ci # descriptor, contrarily to epoll/kqueue (also, they require a single 9107db96d56Sopenharmony_ci # syscall). 9117db96d56Sopenharmony_ci if hasattr(selectors, 'PollSelector'): 9127db96d56Sopenharmony_ci _WaitSelector = selectors.PollSelector 9137db96d56Sopenharmony_ci else: 9147db96d56Sopenharmony_ci _WaitSelector = selectors.SelectSelector 9157db96d56Sopenharmony_ci 9167db96d56Sopenharmony_ci def wait(object_list, timeout=None): 9177db96d56Sopenharmony_ci ''' 9187db96d56Sopenharmony_ci Wait till an object in object_list is ready/readable. 9197db96d56Sopenharmony_ci 9207db96d56Sopenharmony_ci Returns list of those objects in object_list which are ready/readable. 9217db96d56Sopenharmony_ci ''' 9227db96d56Sopenharmony_ci with _WaitSelector() as selector: 9237db96d56Sopenharmony_ci for obj in object_list: 9247db96d56Sopenharmony_ci selector.register(obj, selectors.EVENT_READ) 9257db96d56Sopenharmony_ci 9267db96d56Sopenharmony_ci if timeout is not None: 9277db96d56Sopenharmony_ci deadline = time.monotonic() + timeout 9287db96d56Sopenharmony_ci 9297db96d56Sopenharmony_ci while True: 9307db96d56Sopenharmony_ci ready = selector.select(timeout) 9317db96d56Sopenharmony_ci if ready: 9327db96d56Sopenharmony_ci return [key.fileobj for (key, events) in ready] 9337db96d56Sopenharmony_ci else: 9347db96d56Sopenharmony_ci if timeout is not None: 9357db96d56Sopenharmony_ci timeout = deadline - time.monotonic() 9367db96d56Sopenharmony_ci if timeout < 0: 9377db96d56Sopenharmony_ci return ready 9387db96d56Sopenharmony_ci 9397db96d56Sopenharmony_ci# 9407db96d56Sopenharmony_ci# Make connection and socket objects shareable if possible 9417db96d56Sopenharmony_ci# 9427db96d56Sopenharmony_ci 9437db96d56Sopenharmony_ciif sys.platform == 'win32': 9447db96d56Sopenharmony_ci def reduce_connection(conn): 9457db96d56Sopenharmony_ci handle = conn.fileno() 9467db96d56Sopenharmony_ci with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: 9477db96d56Sopenharmony_ci from . import resource_sharer 9487db96d56Sopenharmony_ci ds = resource_sharer.DupSocket(s) 9497db96d56Sopenharmony_ci return rebuild_connection, (ds, conn.readable, conn.writable) 9507db96d56Sopenharmony_ci def rebuild_connection(ds, readable, writable): 9517db96d56Sopenharmony_ci sock = ds.detach() 9527db96d56Sopenharmony_ci return Connection(sock.detach(), readable, writable) 9537db96d56Sopenharmony_ci reduction.register(Connection, reduce_connection) 9547db96d56Sopenharmony_ci 9557db96d56Sopenharmony_ci def reduce_pipe_connection(conn): 9567db96d56Sopenharmony_ci access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | 9577db96d56Sopenharmony_ci (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) 9587db96d56Sopenharmony_ci dh = reduction.DupHandle(conn.fileno(), access) 9597db96d56Sopenharmony_ci return rebuild_pipe_connection, (dh, conn.readable, conn.writable) 9607db96d56Sopenharmony_ci def rebuild_pipe_connection(dh, readable, writable): 9617db96d56Sopenharmony_ci handle = dh.detach() 9627db96d56Sopenharmony_ci return PipeConnection(handle, readable, writable) 9637db96d56Sopenharmony_ci reduction.register(PipeConnection, reduce_pipe_connection) 9647db96d56Sopenharmony_ci 9657db96d56Sopenharmony_cielse: 9667db96d56Sopenharmony_ci def reduce_connection(conn): 9677db96d56Sopenharmony_ci df = reduction.DupFd(conn.fileno()) 9687db96d56Sopenharmony_ci return rebuild_connection, (df, conn.readable, conn.writable) 9697db96d56Sopenharmony_ci def rebuild_connection(df, readable, writable): 9707db96d56Sopenharmony_ci fd = df.detach() 9717db96d56Sopenharmony_ci return Connection(fd, readable, writable) 9727db96d56Sopenharmony_ci reduction.register(Connection, reduce_connection) 973