17db96d56Sopenharmony_ci#
27db96d56Sopenharmony_ci# A higher level module for using sockets (or Windows named pipes)
37db96d56Sopenharmony_ci#
47db96d56Sopenharmony_ci# multiprocessing/connection.py
57db96d56Sopenharmony_ci#
67db96d56Sopenharmony_ci# Copyright (c) 2006-2008, R Oudkerk
77db96d56Sopenharmony_ci# Licensed to PSF under a Contributor Agreement.
87db96d56Sopenharmony_ci#
97db96d56Sopenharmony_ci
107db96d56Sopenharmony_ci__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
117db96d56Sopenharmony_ci
127db96d56Sopenharmony_ciimport io
137db96d56Sopenharmony_ciimport os
147db96d56Sopenharmony_ciimport sys
157db96d56Sopenharmony_ciimport socket
167db96d56Sopenharmony_ciimport struct
177db96d56Sopenharmony_ciimport time
187db96d56Sopenharmony_ciimport tempfile
197db96d56Sopenharmony_ciimport itertools
207db96d56Sopenharmony_ci
217db96d56Sopenharmony_ciimport _multiprocessing
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_cifrom . import util
247db96d56Sopenharmony_ci
257db96d56Sopenharmony_cifrom . import AuthenticationError, BufferTooShort
267db96d56Sopenharmony_cifrom .context import reduction
277db96d56Sopenharmony_ci_ForkingPickler = reduction.ForkingPickler
287db96d56Sopenharmony_ci
297db96d56Sopenharmony_citry:
307db96d56Sopenharmony_ci    import _winapi
317db96d56Sopenharmony_ci    from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
327db96d56Sopenharmony_ciexcept ImportError:
337db96d56Sopenharmony_ci    if sys.platform == 'win32':
347db96d56Sopenharmony_ci        raise
357db96d56Sopenharmony_ci    _winapi = None
367db96d56Sopenharmony_ci
377db96d56Sopenharmony_ci#
387db96d56Sopenharmony_ci#
397db96d56Sopenharmony_ci#
407db96d56Sopenharmony_ci
417db96d56Sopenharmony_ciBUFSIZE = 8192
427db96d56Sopenharmony_ci# A very generous timeout when it comes to local connections...
437db96d56Sopenharmony_ciCONNECTION_TIMEOUT = 20.
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci_mmap_counter = itertools.count()
467db96d56Sopenharmony_ci
477db96d56Sopenharmony_cidefault_family = 'AF_INET'
487db96d56Sopenharmony_cifamilies = ['AF_INET']
497db96d56Sopenharmony_ci
507db96d56Sopenharmony_ciif hasattr(socket, 'AF_UNIX'):
517db96d56Sopenharmony_ci    default_family = 'AF_UNIX'
527db96d56Sopenharmony_ci    families += ['AF_UNIX']
537db96d56Sopenharmony_ci
547db96d56Sopenharmony_ciif sys.platform == 'win32':
557db96d56Sopenharmony_ci    default_family = 'AF_PIPE'
567db96d56Sopenharmony_ci    families += ['AF_PIPE']
577db96d56Sopenharmony_ci
587db96d56Sopenharmony_ci
597db96d56Sopenharmony_cidef _init_timeout(timeout=CONNECTION_TIMEOUT):
607db96d56Sopenharmony_ci    return time.monotonic() + timeout
617db96d56Sopenharmony_ci
627db96d56Sopenharmony_cidef _check_timeout(t):
637db96d56Sopenharmony_ci    return time.monotonic() > t
647db96d56Sopenharmony_ci
657db96d56Sopenharmony_ci#
667db96d56Sopenharmony_ci#
677db96d56Sopenharmony_ci#
687db96d56Sopenharmony_ci
697db96d56Sopenharmony_cidef arbitrary_address(family):
707db96d56Sopenharmony_ci    '''
717db96d56Sopenharmony_ci    Return an arbitrary free address for the given family
727db96d56Sopenharmony_ci    '''
737db96d56Sopenharmony_ci    if family == 'AF_INET':
747db96d56Sopenharmony_ci        return ('localhost', 0)
757db96d56Sopenharmony_ci    elif family == 'AF_UNIX':
767db96d56Sopenharmony_ci        return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
777db96d56Sopenharmony_ci    elif family == 'AF_PIPE':
787db96d56Sopenharmony_ci        return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
797db96d56Sopenharmony_ci                               (os.getpid(), next(_mmap_counter)), dir="")
807db96d56Sopenharmony_ci    else:
817db96d56Sopenharmony_ci        raise ValueError('unrecognized family')
827db96d56Sopenharmony_ci
837db96d56Sopenharmony_cidef _validate_family(family):
847db96d56Sopenharmony_ci    '''
857db96d56Sopenharmony_ci    Checks if the family is valid for the current environment.
867db96d56Sopenharmony_ci    '''
877db96d56Sopenharmony_ci    if sys.platform != 'win32' and family == 'AF_PIPE':
887db96d56Sopenharmony_ci        raise ValueError('Family %s is not recognized.' % family)
897db96d56Sopenharmony_ci
907db96d56Sopenharmony_ci    if sys.platform == 'win32' and family == 'AF_UNIX':
917db96d56Sopenharmony_ci        # double check
927db96d56Sopenharmony_ci        if not hasattr(socket, family):
937db96d56Sopenharmony_ci            raise ValueError('Family %s is not recognized.' % family)
947db96d56Sopenharmony_ci
957db96d56Sopenharmony_cidef address_type(address):
967db96d56Sopenharmony_ci    '''
977db96d56Sopenharmony_ci    Return the types of the address
987db96d56Sopenharmony_ci
997db96d56Sopenharmony_ci    This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
1007db96d56Sopenharmony_ci    '''
1017db96d56Sopenharmony_ci    if type(address) == tuple:
1027db96d56Sopenharmony_ci        return 'AF_INET'
1037db96d56Sopenharmony_ci    elif type(address) is str and address.startswith('\\\\'):
1047db96d56Sopenharmony_ci        return 'AF_PIPE'
1057db96d56Sopenharmony_ci    elif type(address) is str or util.is_abstract_socket_namespace(address):
1067db96d56Sopenharmony_ci        return 'AF_UNIX'
1077db96d56Sopenharmony_ci    else:
1087db96d56Sopenharmony_ci        raise ValueError('address type of %r unrecognized' % address)
1097db96d56Sopenharmony_ci
1107db96d56Sopenharmony_ci#
1117db96d56Sopenharmony_ci# Connection classes
1127db96d56Sopenharmony_ci#
1137db96d56Sopenharmony_ci
1147db96d56Sopenharmony_ciclass _ConnectionBase:
1157db96d56Sopenharmony_ci    _handle = None
1167db96d56Sopenharmony_ci
1177db96d56Sopenharmony_ci    def __init__(self, handle, readable=True, writable=True):
1187db96d56Sopenharmony_ci        handle = handle.__index__()
1197db96d56Sopenharmony_ci        if handle < 0:
1207db96d56Sopenharmony_ci            raise ValueError("invalid handle")
1217db96d56Sopenharmony_ci        if not readable and not writable:
1227db96d56Sopenharmony_ci            raise ValueError(
1237db96d56Sopenharmony_ci                "at least one of `readable` and `writable` must be True")
1247db96d56Sopenharmony_ci        self._handle = handle
1257db96d56Sopenharmony_ci        self._readable = readable
1267db96d56Sopenharmony_ci        self._writable = writable
1277db96d56Sopenharmony_ci
1287db96d56Sopenharmony_ci    # XXX should we use util.Finalize instead of a __del__?
1297db96d56Sopenharmony_ci
1307db96d56Sopenharmony_ci    def __del__(self):
1317db96d56Sopenharmony_ci        if self._handle is not None:
1327db96d56Sopenharmony_ci            self._close()
1337db96d56Sopenharmony_ci
1347db96d56Sopenharmony_ci    def _check_closed(self):
1357db96d56Sopenharmony_ci        if self._handle is None:
1367db96d56Sopenharmony_ci            raise OSError("handle is closed")
1377db96d56Sopenharmony_ci
1387db96d56Sopenharmony_ci    def _check_readable(self):
1397db96d56Sopenharmony_ci        if not self._readable:
1407db96d56Sopenharmony_ci            raise OSError("connection is write-only")
1417db96d56Sopenharmony_ci
1427db96d56Sopenharmony_ci    def _check_writable(self):
1437db96d56Sopenharmony_ci        if not self._writable:
1447db96d56Sopenharmony_ci            raise OSError("connection is read-only")
1457db96d56Sopenharmony_ci
1467db96d56Sopenharmony_ci    def _bad_message_length(self):
1477db96d56Sopenharmony_ci        if self._writable:
1487db96d56Sopenharmony_ci            self._readable = False
1497db96d56Sopenharmony_ci        else:
1507db96d56Sopenharmony_ci            self.close()
1517db96d56Sopenharmony_ci        raise OSError("bad message length")
1527db96d56Sopenharmony_ci
1537db96d56Sopenharmony_ci    @property
1547db96d56Sopenharmony_ci    def closed(self):
1557db96d56Sopenharmony_ci        """True if the connection is closed"""
1567db96d56Sopenharmony_ci        return self._handle is None
1577db96d56Sopenharmony_ci
1587db96d56Sopenharmony_ci    @property
1597db96d56Sopenharmony_ci    def readable(self):
1607db96d56Sopenharmony_ci        """True if the connection is readable"""
1617db96d56Sopenharmony_ci        return self._readable
1627db96d56Sopenharmony_ci
1637db96d56Sopenharmony_ci    @property
1647db96d56Sopenharmony_ci    def writable(self):
1657db96d56Sopenharmony_ci        """True if the connection is writable"""
1667db96d56Sopenharmony_ci        return self._writable
1677db96d56Sopenharmony_ci
1687db96d56Sopenharmony_ci    def fileno(self):
1697db96d56Sopenharmony_ci        """File descriptor or handle of the connection"""
1707db96d56Sopenharmony_ci        self._check_closed()
1717db96d56Sopenharmony_ci        return self._handle
1727db96d56Sopenharmony_ci
1737db96d56Sopenharmony_ci    def close(self):
1747db96d56Sopenharmony_ci        """Close the connection"""
1757db96d56Sopenharmony_ci        if self._handle is not None:
1767db96d56Sopenharmony_ci            try:
1777db96d56Sopenharmony_ci                self._close()
1787db96d56Sopenharmony_ci            finally:
1797db96d56Sopenharmony_ci                self._handle = None
1807db96d56Sopenharmony_ci
1817db96d56Sopenharmony_ci    def send_bytes(self, buf, offset=0, size=None):
1827db96d56Sopenharmony_ci        """Send the bytes data from a bytes-like object"""
1837db96d56Sopenharmony_ci        self._check_closed()
1847db96d56Sopenharmony_ci        self._check_writable()
1857db96d56Sopenharmony_ci        m = memoryview(buf)
1867db96d56Sopenharmony_ci        if m.itemsize > 1:
1877db96d56Sopenharmony_ci            m = m.cast('B')
1887db96d56Sopenharmony_ci        n = m.nbytes
1897db96d56Sopenharmony_ci        if offset < 0:
1907db96d56Sopenharmony_ci            raise ValueError("offset is negative")
1917db96d56Sopenharmony_ci        if n < offset:
1927db96d56Sopenharmony_ci            raise ValueError("buffer length < offset")
1937db96d56Sopenharmony_ci        if size is None:
1947db96d56Sopenharmony_ci            size = n - offset
1957db96d56Sopenharmony_ci        elif size < 0:
1967db96d56Sopenharmony_ci            raise ValueError("size is negative")
1977db96d56Sopenharmony_ci        elif offset + size > n:
1987db96d56Sopenharmony_ci            raise ValueError("buffer length < offset + size")
1997db96d56Sopenharmony_ci        self._send_bytes(m[offset:offset + size])
2007db96d56Sopenharmony_ci
2017db96d56Sopenharmony_ci    def send(self, obj):
2027db96d56Sopenharmony_ci        """Send a (picklable) object"""
2037db96d56Sopenharmony_ci        self._check_closed()
2047db96d56Sopenharmony_ci        self._check_writable()
2057db96d56Sopenharmony_ci        self._send_bytes(_ForkingPickler.dumps(obj))
2067db96d56Sopenharmony_ci
2077db96d56Sopenharmony_ci    def recv_bytes(self, maxlength=None):
2087db96d56Sopenharmony_ci        """
2097db96d56Sopenharmony_ci        Receive bytes data as a bytes object.
2107db96d56Sopenharmony_ci        """
2117db96d56Sopenharmony_ci        self._check_closed()
2127db96d56Sopenharmony_ci        self._check_readable()
2137db96d56Sopenharmony_ci        if maxlength is not None and maxlength < 0:
2147db96d56Sopenharmony_ci            raise ValueError("negative maxlength")
2157db96d56Sopenharmony_ci        buf = self._recv_bytes(maxlength)
2167db96d56Sopenharmony_ci        if buf is None:
2177db96d56Sopenharmony_ci            self._bad_message_length()
2187db96d56Sopenharmony_ci        return buf.getvalue()
2197db96d56Sopenharmony_ci
2207db96d56Sopenharmony_ci    def recv_bytes_into(self, buf, offset=0):
2217db96d56Sopenharmony_ci        """
2227db96d56Sopenharmony_ci        Receive bytes data into a writeable bytes-like object.
2237db96d56Sopenharmony_ci        Return the number of bytes read.
2247db96d56Sopenharmony_ci        """
2257db96d56Sopenharmony_ci        self._check_closed()
2267db96d56Sopenharmony_ci        self._check_readable()
2277db96d56Sopenharmony_ci        with memoryview(buf) as m:
2287db96d56Sopenharmony_ci            # Get bytesize of arbitrary buffer
2297db96d56Sopenharmony_ci            itemsize = m.itemsize
2307db96d56Sopenharmony_ci            bytesize = itemsize * len(m)
2317db96d56Sopenharmony_ci            if offset < 0:
2327db96d56Sopenharmony_ci                raise ValueError("negative offset")
2337db96d56Sopenharmony_ci            elif offset > bytesize:
2347db96d56Sopenharmony_ci                raise ValueError("offset too large")
2357db96d56Sopenharmony_ci            result = self._recv_bytes()
2367db96d56Sopenharmony_ci            size = result.tell()
2377db96d56Sopenharmony_ci            if bytesize < offset + size:
2387db96d56Sopenharmony_ci                raise BufferTooShort(result.getvalue())
2397db96d56Sopenharmony_ci            # Message can fit in dest
2407db96d56Sopenharmony_ci            result.seek(0)
2417db96d56Sopenharmony_ci            result.readinto(m[offset // itemsize :
2427db96d56Sopenharmony_ci                              (offset + size) // itemsize])
2437db96d56Sopenharmony_ci            return size
2447db96d56Sopenharmony_ci
2457db96d56Sopenharmony_ci    def recv(self):
2467db96d56Sopenharmony_ci        """Receive a (picklable) object"""
2477db96d56Sopenharmony_ci        self._check_closed()
2487db96d56Sopenharmony_ci        self._check_readable()
2497db96d56Sopenharmony_ci        buf = self._recv_bytes()
2507db96d56Sopenharmony_ci        return _ForkingPickler.loads(buf.getbuffer())
2517db96d56Sopenharmony_ci
2527db96d56Sopenharmony_ci    def poll(self, timeout=0.0):
2537db96d56Sopenharmony_ci        """Whether there is any input available to be read"""
2547db96d56Sopenharmony_ci        self._check_closed()
2557db96d56Sopenharmony_ci        self._check_readable()
2567db96d56Sopenharmony_ci        return self._poll(timeout)
2577db96d56Sopenharmony_ci
2587db96d56Sopenharmony_ci    def __enter__(self):
2597db96d56Sopenharmony_ci        return self
2607db96d56Sopenharmony_ci
2617db96d56Sopenharmony_ci    def __exit__(self, exc_type, exc_value, exc_tb):
2627db96d56Sopenharmony_ci        self.close()
2637db96d56Sopenharmony_ci
2647db96d56Sopenharmony_ci
2657db96d56Sopenharmony_ciif _winapi:
2667db96d56Sopenharmony_ci
2677db96d56Sopenharmony_ci    class PipeConnection(_ConnectionBase):
2687db96d56Sopenharmony_ci        """
2697db96d56Sopenharmony_ci        Connection class based on a Windows named pipe.
2707db96d56Sopenharmony_ci        Overlapped I/O is used, so the handles must have been created
2717db96d56Sopenharmony_ci        with FILE_FLAG_OVERLAPPED.
2727db96d56Sopenharmony_ci        """
2737db96d56Sopenharmony_ci        _got_empty_message = False
2747db96d56Sopenharmony_ci
2757db96d56Sopenharmony_ci        def _close(self, _CloseHandle=_winapi.CloseHandle):
2767db96d56Sopenharmony_ci            _CloseHandle(self._handle)
2777db96d56Sopenharmony_ci
2787db96d56Sopenharmony_ci        def _send_bytes(self, buf):
2797db96d56Sopenharmony_ci            ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
2807db96d56Sopenharmony_ci            try:
2817db96d56Sopenharmony_ci                if err == _winapi.ERROR_IO_PENDING:
2827db96d56Sopenharmony_ci                    waitres = _winapi.WaitForMultipleObjects(
2837db96d56Sopenharmony_ci                        [ov.event], False, INFINITE)
2847db96d56Sopenharmony_ci                    assert waitres == WAIT_OBJECT_0
2857db96d56Sopenharmony_ci            except:
2867db96d56Sopenharmony_ci                ov.cancel()
2877db96d56Sopenharmony_ci                raise
2887db96d56Sopenharmony_ci            finally:
2897db96d56Sopenharmony_ci                nwritten, err = ov.GetOverlappedResult(True)
2907db96d56Sopenharmony_ci            assert err == 0
2917db96d56Sopenharmony_ci            assert nwritten == len(buf)
2927db96d56Sopenharmony_ci
2937db96d56Sopenharmony_ci        def _recv_bytes(self, maxsize=None):
2947db96d56Sopenharmony_ci            if self._got_empty_message:
2957db96d56Sopenharmony_ci                self._got_empty_message = False
2967db96d56Sopenharmony_ci                return io.BytesIO()
2977db96d56Sopenharmony_ci            else:
2987db96d56Sopenharmony_ci                bsize = 128 if maxsize is None else min(maxsize, 128)
2997db96d56Sopenharmony_ci                try:
3007db96d56Sopenharmony_ci                    ov, err = _winapi.ReadFile(self._handle, bsize,
3017db96d56Sopenharmony_ci                                                overlapped=True)
3027db96d56Sopenharmony_ci                    try:
3037db96d56Sopenharmony_ci                        if err == _winapi.ERROR_IO_PENDING:
3047db96d56Sopenharmony_ci                            waitres = _winapi.WaitForMultipleObjects(
3057db96d56Sopenharmony_ci                                [ov.event], False, INFINITE)
3067db96d56Sopenharmony_ci                            assert waitres == WAIT_OBJECT_0
3077db96d56Sopenharmony_ci                    except:
3087db96d56Sopenharmony_ci                        ov.cancel()
3097db96d56Sopenharmony_ci                        raise
3107db96d56Sopenharmony_ci                    finally:
3117db96d56Sopenharmony_ci                        nread, err = ov.GetOverlappedResult(True)
3127db96d56Sopenharmony_ci                        if err == 0:
3137db96d56Sopenharmony_ci                            f = io.BytesIO()
3147db96d56Sopenharmony_ci                            f.write(ov.getbuffer())
3157db96d56Sopenharmony_ci                            return f
3167db96d56Sopenharmony_ci                        elif err == _winapi.ERROR_MORE_DATA:
3177db96d56Sopenharmony_ci                            return self._get_more_data(ov, maxsize)
3187db96d56Sopenharmony_ci                except OSError as e:
3197db96d56Sopenharmony_ci                    if e.winerror == _winapi.ERROR_BROKEN_PIPE:
3207db96d56Sopenharmony_ci                        raise EOFError
3217db96d56Sopenharmony_ci                    else:
3227db96d56Sopenharmony_ci                        raise
3237db96d56Sopenharmony_ci            raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")
3247db96d56Sopenharmony_ci
3257db96d56Sopenharmony_ci        def _poll(self, timeout):
3267db96d56Sopenharmony_ci            if (self._got_empty_message or
3277db96d56Sopenharmony_ci                        _winapi.PeekNamedPipe(self._handle)[0] != 0):
3287db96d56Sopenharmony_ci                return True
3297db96d56Sopenharmony_ci            return bool(wait([self], timeout))
3307db96d56Sopenharmony_ci
3317db96d56Sopenharmony_ci        def _get_more_data(self, ov, maxsize):
3327db96d56Sopenharmony_ci            buf = ov.getbuffer()
3337db96d56Sopenharmony_ci            f = io.BytesIO()
3347db96d56Sopenharmony_ci            f.write(buf)
3357db96d56Sopenharmony_ci            left = _winapi.PeekNamedPipe(self._handle)[1]
3367db96d56Sopenharmony_ci            assert left > 0
3377db96d56Sopenharmony_ci            if maxsize is not None and len(buf) + left > maxsize:
3387db96d56Sopenharmony_ci                self._bad_message_length()
3397db96d56Sopenharmony_ci            ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
3407db96d56Sopenharmony_ci            rbytes, err = ov.GetOverlappedResult(True)
3417db96d56Sopenharmony_ci            assert err == 0
3427db96d56Sopenharmony_ci            assert rbytes == left
3437db96d56Sopenharmony_ci            f.write(ov.getbuffer())
3447db96d56Sopenharmony_ci            return f
3457db96d56Sopenharmony_ci
3467db96d56Sopenharmony_ci
3477db96d56Sopenharmony_ciclass Connection(_ConnectionBase):
3487db96d56Sopenharmony_ci    """
3497db96d56Sopenharmony_ci    Connection class based on an arbitrary file descriptor (Unix only), or
3507db96d56Sopenharmony_ci    a socket handle (Windows).
3517db96d56Sopenharmony_ci    """
3527db96d56Sopenharmony_ci
3537db96d56Sopenharmony_ci    if _winapi:
3547db96d56Sopenharmony_ci        def _close(self, _close=_multiprocessing.closesocket):
3557db96d56Sopenharmony_ci            _close(self._handle)
3567db96d56Sopenharmony_ci        _write = _multiprocessing.send
3577db96d56Sopenharmony_ci        _read = _multiprocessing.recv
3587db96d56Sopenharmony_ci    else:
3597db96d56Sopenharmony_ci        def _close(self, _close=os.close):
3607db96d56Sopenharmony_ci            _close(self._handle)
3617db96d56Sopenharmony_ci        _write = os.write
3627db96d56Sopenharmony_ci        _read = os.read
3637db96d56Sopenharmony_ci
3647db96d56Sopenharmony_ci    def _send(self, buf, write=_write):
3657db96d56Sopenharmony_ci        remaining = len(buf)
3667db96d56Sopenharmony_ci        while True:
3677db96d56Sopenharmony_ci            n = write(self._handle, buf)
3687db96d56Sopenharmony_ci            remaining -= n
3697db96d56Sopenharmony_ci            if remaining == 0:
3707db96d56Sopenharmony_ci                break
3717db96d56Sopenharmony_ci            buf = buf[n:]
3727db96d56Sopenharmony_ci
3737db96d56Sopenharmony_ci    def _recv(self, size, read=_read):
3747db96d56Sopenharmony_ci        buf = io.BytesIO()
3757db96d56Sopenharmony_ci        handle = self._handle
3767db96d56Sopenharmony_ci        remaining = size
3777db96d56Sopenharmony_ci        while remaining > 0:
3787db96d56Sopenharmony_ci            chunk = read(handle, remaining)
3797db96d56Sopenharmony_ci            n = len(chunk)
3807db96d56Sopenharmony_ci            if n == 0:
3817db96d56Sopenharmony_ci                if remaining == size:
3827db96d56Sopenharmony_ci                    raise EOFError
3837db96d56Sopenharmony_ci                else:
3847db96d56Sopenharmony_ci                    raise OSError("got end of file during message")
3857db96d56Sopenharmony_ci            buf.write(chunk)
3867db96d56Sopenharmony_ci            remaining -= n
3877db96d56Sopenharmony_ci        return buf
3887db96d56Sopenharmony_ci
3897db96d56Sopenharmony_ci    def _send_bytes(self, buf):
3907db96d56Sopenharmony_ci        n = len(buf)
3917db96d56Sopenharmony_ci        if n > 0x7fffffff:
3927db96d56Sopenharmony_ci            pre_header = struct.pack("!i", -1)
3937db96d56Sopenharmony_ci            header = struct.pack("!Q", n)
3947db96d56Sopenharmony_ci            self._send(pre_header)
3957db96d56Sopenharmony_ci            self._send(header)
3967db96d56Sopenharmony_ci            self._send(buf)
3977db96d56Sopenharmony_ci        else:
3987db96d56Sopenharmony_ci            # For wire compatibility with 3.7 and lower
3997db96d56Sopenharmony_ci            header = struct.pack("!i", n)
4007db96d56Sopenharmony_ci            if n > 16384:
4017db96d56Sopenharmony_ci                # The payload is large so Nagle's algorithm won't be triggered
4027db96d56Sopenharmony_ci                # and we'd better avoid the cost of concatenation.
4037db96d56Sopenharmony_ci                self._send(header)
4047db96d56Sopenharmony_ci                self._send(buf)
4057db96d56Sopenharmony_ci            else:
4067db96d56Sopenharmony_ci                # Issue #20540: concatenate before sending, to avoid delays due
4077db96d56Sopenharmony_ci                # to Nagle's algorithm on a TCP socket.
4087db96d56Sopenharmony_ci                # Also note we want to avoid sending a 0-length buffer separately,
4097db96d56Sopenharmony_ci                # to avoid "broken pipe" errors if the other end closed the pipe.
4107db96d56Sopenharmony_ci                self._send(header + buf)
4117db96d56Sopenharmony_ci
4127db96d56Sopenharmony_ci    def _recv_bytes(self, maxsize=None):
4137db96d56Sopenharmony_ci        buf = self._recv(4)
4147db96d56Sopenharmony_ci        size, = struct.unpack("!i", buf.getvalue())
4157db96d56Sopenharmony_ci        if size == -1:
4167db96d56Sopenharmony_ci            buf = self._recv(8)
4177db96d56Sopenharmony_ci            size, = struct.unpack("!Q", buf.getvalue())
4187db96d56Sopenharmony_ci        if maxsize is not None and size > maxsize:
4197db96d56Sopenharmony_ci            return None
4207db96d56Sopenharmony_ci        return self._recv(size)
4217db96d56Sopenharmony_ci
4227db96d56Sopenharmony_ci    def _poll(self, timeout):
4237db96d56Sopenharmony_ci        r = wait([self], timeout)
4247db96d56Sopenharmony_ci        return bool(r)
4257db96d56Sopenharmony_ci
4267db96d56Sopenharmony_ci
4277db96d56Sopenharmony_ci#
4287db96d56Sopenharmony_ci# Public functions
4297db96d56Sopenharmony_ci#
4307db96d56Sopenharmony_ci
4317db96d56Sopenharmony_ciclass Listener(object):
4327db96d56Sopenharmony_ci    '''
4337db96d56Sopenharmony_ci    Returns a listener object.
4347db96d56Sopenharmony_ci
4357db96d56Sopenharmony_ci    This is a wrapper for a bound socket which is 'listening' for
4367db96d56Sopenharmony_ci    connections, or for a Windows named pipe.
4377db96d56Sopenharmony_ci    '''
4387db96d56Sopenharmony_ci    def __init__(self, address=None, family=None, backlog=1, authkey=None):
4397db96d56Sopenharmony_ci        family = family or (address and address_type(address)) \
4407db96d56Sopenharmony_ci                 or default_family
4417db96d56Sopenharmony_ci        address = address or arbitrary_address(family)
4427db96d56Sopenharmony_ci
4437db96d56Sopenharmony_ci        _validate_family(family)
4447db96d56Sopenharmony_ci        if family == 'AF_PIPE':
4457db96d56Sopenharmony_ci            self._listener = PipeListener(address, backlog)
4467db96d56Sopenharmony_ci        else:
4477db96d56Sopenharmony_ci            self._listener = SocketListener(address, family, backlog)
4487db96d56Sopenharmony_ci
4497db96d56Sopenharmony_ci        if authkey is not None and not isinstance(authkey, bytes):
4507db96d56Sopenharmony_ci            raise TypeError('authkey should be a byte string')
4517db96d56Sopenharmony_ci
4527db96d56Sopenharmony_ci        self._authkey = authkey
4537db96d56Sopenharmony_ci
4547db96d56Sopenharmony_ci    def accept(self):
4557db96d56Sopenharmony_ci        '''
4567db96d56Sopenharmony_ci        Accept a connection on the bound socket or named pipe of `self`.
4577db96d56Sopenharmony_ci
4587db96d56Sopenharmony_ci        Returns a `Connection` object.
4597db96d56Sopenharmony_ci        '''
4607db96d56Sopenharmony_ci        if self._listener is None:
4617db96d56Sopenharmony_ci            raise OSError('listener is closed')
4627db96d56Sopenharmony_ci        c = self._listener.accept()
4637db96d56Sopenharmony_ci        if self._authkey:
4647db96d56Sopenharmony_ci            deliver_challenge(c, self._authkey)
4657db96d56Sopenharmony_ci            answer_challenge(c, self._authkey)
4667db96d56Sopenharmony_ci        return c
4677db96d56Sopenharmony_ci
4687db96d56Sopenharmony_ci    def close(self):
4697db96d56Sopenharmony_ci        '''
4707db96d56Sopenharmony_ci        Close the bound socket or named pipe of `self`.
4717db96d56Sopenharmony_ci        '''
4727db96d56Sopenharmony_ci        listener = self._listener
4737db96d56Sopenharmony_ci        if listener is not None:
4747db96d56Sopenharmony_ci            self._listener = None
4757db96d56Sopenharmony_ci            listener.close()
4767db96d56Sopenharmony_ci
4777db96d56Sopenharmony_ci    @property
4787db96d56Sopenharmony_ci    def address(self):
4797db96d56Sopenharmony_ci        return self._listener._address
4807db96d56Sopenharmony_ci
4817db96d56Sopenharmony_ci    @property
4827db96d56Sopenharmony_ci    def last_accepted(self):
4837db96d56Sopenharmony_ci        return self._listener._last_accepted
4847db96d56Sopenharmony_ci
4857db96d56Sopenharmony_ci    def __enter__(self):
4867db96d56Sopenharmony_ci        return self
4877db96d56Sopenharmony_ci
4887db96d56Sopenharmony_ci    def __exit__(self, exc_type, exc_value, exc_tb):
4897db96d56Sopenharmony_ci        self.close()
4907db96d56Sopenharmony_ci
4917db96d56Sopenharmony_ci
4927db96d56Sopenharmony_cidef Client(address, family=None, authkey=None):
4937db96d56Sopenharmony_ci    '''
4947db96d56Sopenharmony_ci    Returns a connection to the address of a `Listener`
4957db96d56Sopenharmony_ci    '''
4967db96d56Sopenharmony_ci    family = family or address_type(address)
4977db96d56Sopenharmony_ci    _validate_family(family)
4987db96d56Sopenharmony_ci    if family == 'AF_PIPE':
4997db96d56Sopenharmony_ci        c = PipeClient(address)
5007db96d56Sopenharmony_ci    else:
5017db96d56Sopenharmony_ci        c = SocketClient(address)
5027db96d56Sopenharmony_ci
5037db96d56Sopenharmony_ci    if authkey is not None and not isinstance(authkey, bytes):
5047db96d56Sopenharmony_ci        raise TypeError('authkey should be a byte string')
5057db96d56Sopenharmony_ci
5067db96d56Sopenharmony_ci    if authkey is not None:
5077db96d56Sopenharmony_ci        answer_challenge(c, authkey)
5087db96d56Sopenharmony_ci        deliver_challenge(c, authkey)
5097db96d56Sopenharmony_ci
5107db96d56Sopenharmony_ci    return c
5117db96d56Sopenharmony_ci
5127db96d56Sopenharmony_ci
5137db96d56Sopenharmony_ciif sys.platform != 'win32':
5147db96d56Sopenharmony_ci
5157db96d56Sopenharmony_ci    def Pipe(duplex=True):
5167db96d56Sopenharmony_ci        '''
5177db96d56Sopenharmony_ci        Returns pair of connection objects at either end of a pipe
5187db96d56Sopenharmony_ci        '''
5197db96d56Sopenharmony_ci        if duplex:
5207db96d56Sopenharmony_ci            s1, s2 = socket.socketpair()
5217db96d56Sopenharmony_ci            s1.setblocking(True)
5227db96d56Sopenharmony_ci            s2.setblocking(True)
5237db96d56Sopenharmony_ci            c1 = Connection(s1.detach())
5247db96d56Sopenharmony_ci            c2 = Connection(s2.detach())
5257db96d56Sopenharmony_ci        else:
5267db96d56Sopenharmony_ci            fd1, fd2 = os.pipe()
5277db96d56Sopenharmony_ci            c1 = Connection(fd1, writable=False)
5287db96d56Sopenharmony_ci            c2 = Connection(fd2, readable=False)
5297db96d56Sopenharmony_ci
5307db96d56Sopenharmony_ci        return c1, c2
5317db96d56Sopenharmony_ci
5327db96d56Sopenharmony_cielse:
5337db96d56Sopenharmony_ci
5347db96d56Sopenharmony_ci    def Pipe(duplex=True):
5357db96d56Sopenharmony_ci        '''
5367db96d56Sopenharmony_ci        Returns pair of connection objects at either end of a pipe
5377db96d56Sopenharmony_ci        '''
5387db96d56Sopenharmony_ci        address = arbitrary_address('AF_PIPE')
5397db96d56Sopenharmony_ci        if duplex:
5407db96d56Sopenharmony_ci            openmode = _winapi.PIPE_ACCESS_DUPLEX
5417db96d56Sopenharmony_ci            access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
5427db96d56Sopenharmony_ci            obsize, ibsize = BUFSIZE, BUFSIZE
5437db96d56Sopenharmony_ci        else:
5447db96d56Sopenharmony_ci            openmode = _winapi.PIPE_ACCESS_INBOUND
5457db96d56Sopenharmony_ci            access = _winapi.GENERIC_WRITE
5467db96d56Sopenharmony_ci            obsize, ibsize = 0, BUFSIZE
5477db96d56Sopenharmony_ci
5487db96d56Sopenharmony_ci        h1 = _winapi.CreateNamedPipe(
5497db96d56Sopenharmony_ci            address, openmode | _winapi.FILE_FLAG_OVERLAPPED |
5507db96d56Sopenharmony_ci            _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE,
5517db96d56Sopenharmony_ci            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
5527db96d56Sopenharmony_ci            _winapi.PIPE_WAIT,
5537db96d56Sopenharmony_ci            1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER,
5547db96d56Sopenharmony_ci            # default security descriptor: the handle cannot be inherited
5557db96d56Sopenharmony_ci            _winapi.NULL
5567db96d56Sopenharmony_ci            )
5577db96d56Sopenharmony_ci        h2 = _winapi.CreateFile(
5587db96d56Sopenharmony_ci            address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
5597db96d56Sopenharmony_ci            _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
5607db96d56Sopenharmony_ci            )
5617db96d56Sopenharmony_ci        _winapi.SetNamedPipeHandleState(
5627db96d56Sopenharmony_ci            h2, _winapi.PIPE_READMODE_MESSAGE, None, None
5637db96d56Sopenharmony_ci            )
5647db96d56Sopenharmony_ci
5657db96d56Sopenharmony_ci        overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True)
5667db96d56Sopenharmony_ci        _, err = overlapped.GetOverlappedResult(True)
5677db96d56Sopenharmony_ci        assert err == 0
5687db96d56Sopenharmony_ci
5697db96d56Sopenharmony_ci        c1 = PipeConnection(h1, writable=duplex)
5707db96d56Sopenharmony_ci        c2 = PipeConnection(h2, readable=duplex)
5717db96d56Sopenharmony_ci
5727db96d56Sopenharmony_ci        return c1, c2
5737db96d56Sopenharmony_ci
5747db96d56Sopenharmony_ci#
5757db96d56Sopenharmony_ci# Definitions for connections based on sockets
5767db96d56Sopenharmony_ci#
5777db96d56Sopenharmony_ci
5787db96d56Sopenharmony_ciclass SocketListener(object):
5797db96d56Sopenharmony_ci    '''
5807db96d56Sopenharmony_ci    Representation of a socket which is bound to an address and listening
5817db96d56Sopenharmony_ci    '''
5827db96d56Sopenharmony_ci    def __init__(self, address, family, backlog=1):
5837db96d56Sopenharmony_ci        self._socket = socket.socket(getattr(socket, family))
5847db96d56Sopenharmony_ci        try:
5857db96d56Sopenharmony_ci            # SO_REUSEADDR has different semantics on Windows (issue #2550).
5867db96d56Sopenharmony_ci            if os.name == 'posix':
5877db96d56Sopenharmony_ci                self._socket.setsockopt(socket.SOL_SOCKET,
5887db96d56Sopenharmony_ci                                        socket.SO_REUSEADDR, 1)
5897db96d56Sopenharmony_ci            self._socket.setblocking(True)
5907db96d56Sopenharmony_ci            self._socket.bind(address)
5917db96d56Sopenharmony_ci            self._socket.listen(backlog)
5927db96d56Sopenharmony_ci            self._address = self._socket.getsockname()
5937db96d56Sopenharmony_ci        except OSError:
5947db96d56Sopenharmony_ci            self._socket.close()
5957db96d56Sopenharmony_ci            raise
5967db96d56Sopenharmony_ci        self._family = family
5977db96d56Sopenharmony_ci        self._last_accepted = None
5987db96d56Sopenharmony_ci
5997db96d56Sopenharmony_ci        if family == 'AF_UNIX' and not util.is_abstract_socket_namespace(address):
6007db96d56Sopenharmony_ci            # Linux abstract socket namespaces do not need to be explicitly unlinked
6017db96d56Sopenharmony_ci            self._unlink = util.Finalize(
6027db96d56Sopenharmony_ci                self, os.unlink, args=(address,), exitpriority=0
6037db96d56Sopenharmony_ci                )
6047db96d56Sopenharmony_ci        else:
6057db96d56Sopenharmony_ci            self._unlink = None
6067db96d56Sopenharmony_ci
6077db96d56Sopenharmony_ci    def accept(self):
6087db96d56Sopenharmony_ci        s, self._last_accepted = self._socket.accept()
6097db96d56Sopenharmony_ci        s.setblocking(True)
6107db96d56Sopenharmony_ci        return Connection(s.detach())
6117db96d56Sopenharmony_ci
6127db96d56Sopenharmony_ci    def close(self):
6137db96d56Sopenharmony_ci        try:
6147db96d56Sopenharmony_ci            self._socket.close()
6157db96d56Sopenharmony_ci        finally:
6167db96d56Sopenharmony_ci            unlink = self._unlink
6177db96d56Sopenharmony_ci            if unlink is not None:
6187db96d56Sopenharmony_ci                self._unlink = None
6197db96d56Sopenharmony_ci                unlink()
6207db96d56Sopenharmony_ci
6217db96d56Sopenharmony_ci
6227db96d56Sopenharmony_cidef SocketClient(address):
6237db96d56Sopenharmony_ci    '''
6247db96d56Sopenharmony_ci    Return a connection object connected to the socket given by `address`
6257db96d56Sopenharmony_ci    '''
6267db96d56Sopenharmony_ci    family = address_type(address)
6277db96d56Sopenharmony_ci    with socket.socket( getattr(socket, family) ) as s:
6287db96d56Sopenharmony_ci        s.setblocking(True)
6297db96d56Sopenharmony_ci        s.connect(address)
6307db96d56Sopenharmony_ci        return Connection(s.detach())
6317db96d56Sopenharmony_ci
6327db96d56Sopenharmony_ci#
6337db96d56Sopenharmony_ci# Definitions for connections based on named pipes
6347db96d56Sopenharmony_ci#
6357db96d56Sopenharmony_ci
6367db96d56Sopenharmony_ciif sys.platform == 'win32':
6377db96d56Sopenharmony_ci
6387db96d56Sopenharmony_ci    class PipeListener(object):
6397db96d56Sopenharmony_ci        '''
6407db96d56Sopenharmony_ci        Representation of a named pipe
6417db96d56Sopenharmony_ci        '''
6427db96d56Sopenharmony_ci        def __init__(self, address, backlog=None):
6437db96d56Sopenharmony_ci            self._address = address
6447db96d56Sopenharmony_ci            self._handle_queue = [self._new_handle(first=True)]
6457db96d56Sopenharmony_ci
6467db96d56Sopenharmony_ci            self._last_accepted = None
6477db96d56Sopenharmony_ci            util.sub_debug('listener created with address=%r', self._address)
6487db96d56Sopenharmony_ci            self.close = util.Finalize(
6497db96d56Sopenharmony_ci                self, PipeListener._finalize_pipe_listener,
6507db96d56Sopenharmony_ci                args=(self._handle_queue, self._address), exitpriority=0
6517db96d56Sopenharmony_ci                )
6527db96d56Sopenharmony_ci
6537db96d56Sopenharmony_ci        def _new_handle(self, first=False):
6547db96d56Sopenharmony_ci            flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
6557db96d56Sopenharmony_ci            if first:
6567db96d56Sopenharmony_ci                flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
6577db96d56Sopenharmony_ci            return _winapi.CreateNamedPipe(
6587db96d56Sopenharmony_ci                self._address, flags,
6597db96d56Sopenharmony_ci                _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
6607db96d56Sopenharmony_ci                _winapi.PIPE_WAIT,
6617db96d56Sopenharmony_ci                _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
6627db96d56Sopenharmony_ci                _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL
6637db96d56Sopenharmony_ci                )
6647db96d56Sopenharmony_ci
6657db96d56Sopenharmony_ci        def accept(self):
6667db96d56Sopenharmony_ci            self._handle_queue.append(self._new_handle())
6677db96d56Sopenharmony_ci            handle = self._handle_queue.pop(0)
6687db96d56Sopenharmony_ci            try:
6697db96d56Sopenharmony_ci                ov = _winapi.ConnectNamedPipe(handle, overlapped=True)
6707db96d56Sopenharmony_ci            except OSError as e:
6717db96d56Sopenharmony_ci                if e.winerror != _winapi.ERROR_NO_DATA:
6727db96d56Sopenharmony_ci                    raise
6737db96d56Sopenharmony_ci                # ERROR_NO_DATA can occur if a client has already connected,
6747db96d56Sopenharmony_ci                # written data and then disconnected -- see Issue 14725.
6757db96d56Sopenharmony_ci            else:
6767db96d56Sopenharmony_ci                try:
6777db96d56Sopenharmony_ci                    res = _winapi.WaitForMultipleObjects(
6787db96d56Sopenharmony_ci                        [ov.event], False, INFINITE)
6797db96d56Sopenharmony_ci                except:
6807db96d56Sopenharmony_ci                    ov.cancel()
6817db96d56Sopenharmony_ci                    _winapi.CloseHandle(handle)
6827db96d56Sopenharmony_ci                    raise
6837db96d56Sopenharmony_ci                finally:
6847db96d56Sopenharmony_ci                    _, err = ov.GetOverlappedResult(True)
6857db96d56Sopenharmony_ci                    assert err == 0
6867db96d56Sopenharmony_ci            return PipeConnection(handle)
6877db96d56Sopenharmony_ci
6887db96d56Sopenharmony_ci        @staticmethod
6897db96d56Sopenharmony_ci        def _finalize_pipe_listener(queue, address):
6907db96d56Sopenharmony_ci            util.sub_debug('closing listener with address=%r', address)
6917db96d56Sopenharmony_ci            for handle in queue:
6927db96d56Sopenharmony_ci                _winapi.CloseHandle(handle)
6937db96d56Sopenharmony_ci
6947db96d56Sopenharmony_ci    def PipeClient(address):
6957db96d56Sopenharmony_ci        '''
6967db96d56Sopenharmony_ci        Return a connection object connected to the pipe given by `address`
6977db96d56Sopenharmony_ci        '''
6987db96d56Sopenharmony_ci        t = _init_timeout()
6997db96d56Sopenharmony_ci        while 1:
7007db96d56Sopenharmony_ci            try:
7017db96d56Sopenharmony_ci                _winapi.WaitNamedPipe(address, 1000)
7027db96d56Sopenharmony_ci                h = _winapi.CreateFile(
7037db96d56Sopenharmony_ci                    address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
7047db96d56Sopenharmony_ci                    0, _winapi.NULL, _winapi.OPEN_EXISTING,
7057db96d56Sopenharmony_ci                    _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL
7067db96d56Sopenharmony_ci                    )
7077db96d56Sopenharmony_ci            except OSError as e:
7087db96d56Sopenharmony_ci                if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT,
7097db96d56Sopenharmony_ci                                      _winapi.ERROR_PIPE_BUSY) or _check_timeout(t):
7107db96d56Sopenharmony_ci                    raise
7117db96d56Sopenharmony_ci            else:
7127db96d56Sopenharmony_ci                break
7137db96d56Sopenharmony_ci        else:
7147db96d56Sopenharmony_ci            raise
7157db96d56Sopenharmony_ci
7167db96d56Sopenharmony_ci        _winapi.SetNamedPipeHandleState(
7177db96d56Sopenharmony_ci            h, _winapi.PIPE_READMODE_MESSAGE, None, None
7187db96d56Sopenharmony_ci            )
7197db96d56Sopenharmony_ci        return PipeConnection(h)
7207db96d56Sopenharmony_ci
7217db96d56Sopenharmony_ci#
7227db96d56Sopenharmony_ci# Authentication stuff
7237db96d56Sopenharmony_ci#
7247db96d56Sopenharmony_ci
7257db96d56Sopenharmony_ciMESSAGE_LENGTH = 20
7267db96d56Sopenharmony_ci
7277db96d56Sopenharmony_ciCHALLENGE = b'#CHALLENGE#'
7287db96d56Sopenharmony_ciWELCOME = b'#WELCOME#'
7297db96d56Sopenharmony_ciFAILURE = b'#FAILURE#'
7307db96d56Sopenharmony_ci
7317db96d56Sopenharmony_cidef deliver_challenge(connection, authkey):
7327db96d56Sopenharmony_ci    import hmac
7337db96d56Sopenharmony_ci    if not isinstance(authkey, bytes):
7347db96d56Sopenharmony_ci        raise ValueError(
7357db96d56Sopenharmony_ci            "Authkey must be bytes, not {0!s}".format(type(authkey)))
7367db96d56Sopenharmony_ci    message = os.urandom(MESSAGE_LENGTH)
7377db96d56Sopenharmony_ci    connection.send_bytes(CHALLENGE + message)
7387db96d56Sopenharmony_ci    digest = hmac.new(authkey, message, 'md5').digest()
7397db96d56Sopenharmony_ci    response = connection.recv_bytes(256)        # reject large message
7407db96d56Sopenharmony_ci    if response == digest:
7417db96d56Sopenharmony_ci        connection.send_bytes(WELCOME)
7427db96d56Sopenharmony_ci    else:
7437db96d56Sopenharmony_ci        connection.send_bytes(FAILURE)
7447db96d56Sopenharmony_ci        raise AuthenticationError('digest received was wrong')
7457db96d56Sopenharmony_ci
7467db96d56Sopenharmony_cidef answer_challenge(connection, authkey):
7477db96d56Sopenharmony_ci    import hmac
7487db96d56Sopenharmony_ci    if not isinstance(authkey, bytes):
7497db96d56Sopenharmony_ci        raise ValueError(
7507db96d56Sopenharmony_ci            "Authkey must be bytes, not {0!s}".format(type(authkey)))
7517db96d56Sopenharmony_ci    message = connection.recv_bytes(256)         # reject large message
7527db96d56Sopenharmony_ci    assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
7537db96d56Sopenharmony_ci    message = message[len(CHALLENGE):]
7547db96d56Sopenharmony_ci    digest = hmac.new(authkey, message, 'md5').digest()
7557db96d56Sopenharmony_ci    connection.send_bytes(digest)
7567db96d56Sopenharmony_ci    response = connection.recv_bytes(256)        # reject large message
7577db96d56Sopenharmony_ci    if response != WELCOME:
7587db96d56Sopenharmony_ci        raise AuthenticationError('digest sent was rejected')
7597db96d56Sopenharmony_ci
7607db96d56Sopenharmony_ci#
7617db96d56Sopenharmony_ci# Support for using xmlrpclib for serialization
7627db96d56Sopenharmony_ci#
7637db96d56Sopenharmony_ci
7647db96d56Sopenharmony_ciclass ConnectionWrapper(object):
7657db96d56Sopenharmony_ci    def __init__(self, conn, dumps, loads):
7667db96d56Sopenharmony_ci        self._conn = conn
7677db96d56Sopenharmony_ci        self._dumps = dumps
7687db96d56Sopenharmony_ci        self._loads = loads
7697db96d56Sopenharmony_ci        for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
7707db96d56Sopenharmony_ci            obj = getattr(conn, attr)
7717db96d56Sopenharmony_ci            setattr(self, attr, obj)
7727db96d56Sopenharmony_ci    def send(self, obj):
7737db96d56Sopenharmony_ci        s = self._dumps(obj)
7747db96d56Sopenharmony_ci        self._conn.send_bytes(s)
7757db96d56Sopenharmony_ci    def recv(self):
7767db96d56Sopenharmony_ci        s = self._conn.recv_bytes()
7777db96d56Sopenharmony_ci        return self._loads(s)
7787db96d56Sopenharmony_ci
7797db96d56Sopenharmony_cidef _xml_dumps(obj):
7807db96d56Sopenharmony_ci    return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8')
7817db96d56Sopenharmony_ci
7827db96d56Sopenharmony_cidef _xml_loads(s):
7837db96d56Sopenharmony_ci    (obj,), method = xmlrpclib.loads(s.decode('utf-8'))
7847db96d56Sopenharmony_ci    return obj
7857db96d56Sopenharmony_ci
7867db96d56Sopenharmony_ciclass XmlListener(Listener):
7877db96d56Sopenharmony_ci    def accept(self):
7887db96d56Sopenharmony_ci        global xmlrpclib
7897db96d56Sopenharmony_ci        import xmlrpc.client as xmlrpclib
7907db96d56Sopenharmony_ci        obj = Listener.accept(self)
7917db96d56Sopenharmony_ci        return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
7927db96d56Sopenharmony_ci
7937db96d56Sopenharmony_cidef XmlClient(*args, **kwds):
7947db96d56Sopenharmony_ci    global xmlrpclib
7957db96d56Sopenharmony_ci    import xmlrpc.client as xmlrpclib
7967db96d56Sopenharmony_ci    return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
7977db96d56Sopenharmony_ci
7987db96d56Sopenharmony_ci#
7997db96d56Sopenharmony_ci# Wait
8007db96d56Sopenharmony_ci#
8017db96d56Sopenharmony_ci
8027db96d56Sopenharmony_ciif sys.platform == 'win32':
8037db96d56Sopenharmony_ci
8047db96d56Sopenharmony_ci    def _exhaustive_wait(handles, timeout):
8057db96d56Sopenharmony_ci        # Return ALL handles which are currently signalled.  (Only
8067db96d56Sopenharmony_ci        # returning the first signalled might create starvation issues.)
8077db96d56Sopenharmony_ci        L = list(handles)
8087db96d56Sopenharmony_ci        ready = []
8097db96d56Sopenharmony_ci        while L:
8107db96d56Sopenharmony_ci            res = _winapi.WaitForMultipleObjects(L, False, timeout)
8117db96d56Sopenharmony_ci            if res == WAIT_TIMEOUT:
8127db96d56Sopenharmony_ci                break
8137db96d56Sopenharmony_ci            elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
8147db96d56Sopenharmony_ci                res -= WAIT_OBJECT_0
8157db96d56Sopenharmony_ci            elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L):
8167db96d56Sopenharmony_ci                res -= WAIT_ABANDONED_0
8177db96d56Sopenharmony_ci            else:
8187db96d56Sopenharmony_ci                raise RuntimeError('Should not get here')
8197db96d56Sopenharmony_ci            ready.append(L[res])
8207db96d56Sopenharmony_ci            L = L[res+1:]
8217db96d56Sopenharmony_ci            timeout = 0
8227db96d56Sopenharmony_ci        return ready
8237db96d56Sopenharmony_ci
8247db96d56Sopenharmony_ci    _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED}
8257db96d56Sopenharmony_ci
8267db96d56Sopenharmony_ci    def wait(object_list, timeout=None):
8277db96d56Sopenharmony_ci        '''
8287db96d56Sopenharmony_ci        Wait till an object in object_list is ready/readable.
8297db96d56Sopenharmony_ci
8307db96d56Sopenharmony_ci        Returns list of those objects in object_list which are ready/readable.
8317db96d56Sopenharmony_ci        '''
8327db96d56Sopenharmony_ci        if timeout is None:
8337db96d56Sopenharmony_ci            timeout = INFINITE
8347db96d56Sopenharmony_ci        elif timeout < 0:
8357db96d56Sopenharmony_ci            timeout = 0
8367db96d56Sopenharmony_ci        else:
8377db96d56Sopenharmony_ci            timeout = int(timeout * 1000 + 0.5)
8387db96d56Sopenharmony_ci
8397db96d56Sopenharmony_ci        object_list = list(object_list)
8407db96d56Sopenharmony_ci        waithandle_to_obj = {}
8417db96d56Sopenharmony_ci        ov_list = []
8427db96d56Sopenharmony_ci        ready_objects = set()
8437db96d56Sopenharmony_ci        ready_handles = set()
8447db96d56Sopenharmony_ci
8457db96d56Sopenharmony_ci        try:
8467db96d56Sopenharmony_ci            for o in object_list:
8477db96d56Sopenharmony_ci                try:
8487db96d56Sopenharmony_ci                    fileno = getattr(o, 'fileno')
8497db96d56Sopenharmony_ci                except AttributeError:
8507db96d56Sopenharmony_ci                    waithandle_to_obj[o.__index__()] = o
8517db96d56Sopenharmony_ci                else:
8527db96d56Sopenharmony_ci                    # start an overlapped read of length zero
8537db96d56Sopenharmony_ci                    try:
8547db96d56Sopenharmony_ci                        ov, err = _winapi.ReadFile(fileno(), 0, True)
8557db96d56Sopenharmony_ci                    except OSError as e:
8567db96d56Sopenharmony_ci                        ov, err = None, e.winerror
8577db96d56Sopenharmony_ci                        if err not in _ready_errors:
8587db96d56Sopenharmony_ci                            raise
8597db96d56Sopenharmony_ci                    if err == _winapi.ERROR_IO_PENDING:
8607db96d56Sopenharmony_ci                        ov_list.append(ov)
8617db96d56Sopenharmony_ci                        waithandle_to_obj[ov.event] = o
8627db96d56Sopenharmony_ci                    else:
8637db96d56Sopenharmony_ci                        # If o.fileno() is an overlapped pipe handle and
8647db96d56Sopenharmony_ci                        # err == 0 then there is a zero length message
8657db96d56Sopenharmony_ci                        # in the pipe, but it HAS NOT been consumed...
8667db96d56Sopenharmony_ci                        if ov and sys.getwindowsversion()[:2] >= (6, 2):
8677db96d56Sopenharmony_ci                            # ... except on Windows 8 and later, where
8687db96d56Sopenharmony_ci                            # the message HAS been consumed.
8697db96d56Sopenharmony_ci                            try:
8707db96d56Sopenharmony_ci                                _, err = ov.GetOverlappedResult(False)
8717db96d56Sopenharmony_ci                            except OSError as e:
8727db96d56Sopenharmony_ci                                err = e.winerror
8737db96d56Sopenharmony_ci                            if not err and hasattr(o, '_got_empty_message'):
8747db96d56Sopenharmony_ci                                o._got_empty_message = True
8757db96d56Sopenharmony_ci                        ready_objects.add(o)
8767db96d56Sopenharmony_ci                        timeout = 0
8777db96d56Sopenharmony_ci
8787db96d56Sopenharmony_ci            ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
8797db96d56Sopenharmony_ci        finally:
8807db96d56Sopenharmony_ci            # request that overlapped reads stop
8817db96d56Sopenharmony_ci            for ov in ov_list:
8827db96d56Sopenharmony_ci                ov.cancel()
8837db96d56Sopenharmony_ci
8847db96d56Sopenharmony_ci            # wait for all overlapped reads to stop
8857db96d56Sopenharmony_ci            for ov in ov_list:
8867db96d56Sopenharmony_ci                try:
8877db96d56Sopenharmony_ci                    _, err = ov.GetOverlappedResult(True)
8887db96d56Sopenharmony_ci                except OSError as e:
8897db96d56Sopenharmony_ci                    err = e.winerror
8907db96d56Sopenharmony_ci                    if err not in _ready_errors:
8917db96d56Sopenharmony_ci                        raise
8927db96d56Sopenharmony_ci                if err != _winapi.ERROR_OPERATION_ABORTED:
8937db96d56Sopenharmony_ci                    o = waithandle_to_obj[ov.event]
8947db96d56Sopenharmony_ci                    ready_objects.add(o)
8957db96d56Sopenharmony_ci                    if err == 0:
8967db96d56Sopenharmony_ci                        # If o.fileno() is an overlapped pipe handle then
8977db96d56Sopenharmony_ci                        # a zero length message HAS been consumed.
8987db96d56Sopenharmony_ci                        if hasattr(o, '_got_empty_message'):
8997db96d56Sopenharmony_ci                            o._got_empty_message = True
9007db96d56Sopenharmony_ci
9017db96d56Sopenharmony_ci        ready_objects.update(waithandle_to_obj[h] for h in ready_handles)
9027db96d56Sopenharmony_ci        return [o for o in object_list if o in ready_objects]
9037db96d56Sopenharmony_ci
9047db96d56Sopenharmony_cielse:
9057db96d56Sopenharmony_ci
9067db96d56Sopenharmony_ci    import selectors
9077db96d56Sopenharmony_ci
9087db96d56Sopenharmony_ci    # poll/select have the advantage of not requiring any extra file
9097db96d56Sopenharmony_ci    # descriptor, contrarily to epoll/kqueue (also, they require a single
9107db96d56Sopenharmony_ci    # syscall).
9117db96d56Sopenharmony_ci    if hasattr(selectors, 'PollSelector'):
9127db96d56Sopenharmony_ci        _WaitSelector = selectors.PollSelector
9137db96d56Sopenharmony_ci    else:
9147db96d56Sopenharmony_ci        _WaitSelector = selectors.SelectSelector
9157db96d56Sopenharmony_ci
9167db96d56Sopenharmony_ci    def wait(object_list, timeout=None):
9177db96d56Sopenharmony_ci        '''
9187db96d56Sopenharmony_ci        Wait till an object in object_list is ready/readable.
9197db96d56Sopenharmony_ci
9207db96d56Sopenharmony_ci        Returns list of those objects in object_list which are ready/readable.
9217db96d56Sopenharmony_ci        '''
9227db96d56Sopenharmony_ci        with _WaitSelector() as selector:
9237db96d56Sopenharmony_ci            for obj in object_list:
9247db96d56Sopenharmony_ci                selector.register(obj, selectors.EVENT_READ)
9257db96d56Sopenharmony_ci
9267db96d56Sopenharmony_ci            if timeout is not None:
9277db96d56Sopenharmony_ci                deadline = time.monotonic() + timeout
9287db96d56Sopenharmony_ci
9297db96d56Sopenharmony_ci            while True:
9307db96d56Sopenharmony_ci                ready = selector.select(timeout)
9317db96d56Sopenharmony_ci                if ready:
9327db96d56Sopenharmony_ci                    return [key.fileobj for (key, events) in ready]
9337db96d56Sopenharmony_ci                else:
9347db96d56Sopenharmony_ci                    if timeout is not None:
9357db96d56Sopenharmony_ci                        timeout = deadline - time.monotonic()
9367db96d56Sopenharmony_ci                        if timeout < 0:
9377db96d56Sopenharmony_ci                            return ready
9387db96d56Sopenharmony_ci
9397db96d56Sopenharmony_ci#
9407db96d56Sopenharmony_ci# Make connection and socket objects shareable if possible
9417db96d56Sopenharmony_ci#
9427db96d56Sopenharmony_ci
9437db96d56Sopenharmony_ciif sys.platform == 'win32':
9447db96d56Sopenharmony_ci    def reduce_connection(conn):
9457db96d56Sopenharmony_ci        handle = conn.fileno()
9467db96d56Sopenharmony_ci        with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
9477db96d56Sopenharmony_ci            from . import resource_sharer
9487db96d56Sopenharmony_ci            ds = resource_sharer.DupSocket(s)
9497db96d56Sopenharmony_ci            return rebuild_connection, (ds, conn.readable, conn.writable)
9507db96d56Sopenharmony_ci    def rebuild_connection(ds, readable, writable):
9517db96d56Sopenharmony_ci        sock = ds.detach()
9527db96d56Sopenharmony_ci        return Connection(sock.detach(), readable, writable)
9537db96d56Sopenharmony_ci    reduction.register(Connection, reduce_connection)
9547db96d56Sopenharmony_ci
9557db96d56Sopenharmony_ci    def reduce_pipe_connection(conn):
9567db96d56Sopenharmony_ci        access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
9577db96d56Sopenharmony_ci                  (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
9587db96d56Sopenharmony_ci        dh = reduction.DupHandle(conn.fileno(), access)
9597db96d56Sopenharmony_ci        return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
9607db96d56Sopenharmony_ci    def rebuild_pipe_connection(dh, readable, writable):
9617db96d56Sopenharmony_ci        handle = dh.detach()
9627db96d56Sopenharmony_ci        return PipeConnection(handle, readable, writable)
9637db96d56Sopenharmony_ci    reduction.register(PipeConnection, reduce_pipe_connection)
9647db96d56Sopenharmony_ci
9657db96d56Sopenharmony_cielse:
9667db96d56Sopenharmony_ci    def reduce_connection(conn):
9677db96d56Sopenharmony_ci        df = reduction.DupFd(conn.fileno())
9687db96d56Sopenharmony_ci        return rebuild_connection, (df, conn.readable, conn.writable)
9697db96d56Sopenharmony_ci    def rebuild_connection(df, readable, writable):
9707db96d56Sopenharmony_ci        fd = df.detach()
9717db96d56Sopenharmony_ci        return Connection(fd, readable, writable)
9727db96d56Sopenharmony_ci    reduction.register(Connection, reduce_connection)
973