17db96d56Sopenharmony_ci"""Event loop using a selector and related classes.
27db96d56Sopenharmony_ci
37db96d56Sopenharmony_ciA selector is a "notify-when-ready" multiplexer.  For a subclass which
47db96d56Sopenharmony_cialso includes support for signal handling, see the unix_events sub-module.
57db96d56Sopenharmony_ci"""
67db96d56Sopenharmony_ci
77db96d56Sopenharmony_ci__all__ = 'BaseSelectorEventLoop',
87db96d56Sopenharmony_ci
97db96d56Sopenharmony_ciimport collections
107db96d56Sopenharmony_ciimport errno
117db96d56Sopenharmony_ciimport functools
127db96d56Sopenharmony_ciimport selectors
137db96d56Sopenharmony_ciimport socket
147db96d56Sopenharmony_ciimport warnings
157db96d56Sopenharmony_ciimport weakref
167db96d56Sopenharmony_citry:
177db96d56Sopenharmony_ci    import ssl
187db96d56Sopenharmony_ciexcept ImportError:  # pragma: no cover
197db96d56Sopenharmony_ci    ssl = None
207db96d56Sopenharmony_ci
217db96d56Sopenharmony_cifrom . import base_events
227db96d56Sopenharmony_cifrom . import constants
237db96d56Sopenharmony_cifrom . import events
247db96d56Sopenharmony_cifrom . import futures
257db96d56Sopenharmony_cifrom . import protocols
267db96d56Sopenharmony_cifrom . import sslproto
277db96d56Sopenharmony_cifrom . import transports
287db96d56Sopenharmony_cifrom . import trsock
297db96d56Sopenharmony_cifrom .log import logger
307db96d56Sopenharmony_ci
317db96d56Sopenharmony_ci
327db96d56Sopenharmony_cidef _test_selector_event(selector, fd, event):
337db96d56Sopenharmony_ci    # Test if the selector is monitoring 'event' events
347db96d56Sopenharmony_ci    # for the file descriptor 'fd'.
357db96d56Sopenharmony_ci    try:
367db96d56Sopenharmony_ci        key = selector.get_key(fd)
377db96d56Sopenharmony_ci    except KeyError:
387db96d56Sopenharmony_ci        return False
397db96d56Sopenharmony_ci    else:
407db96d56Sopenharmony_ci        return bool(key.events & event)
417db96d56Sopenharmony_ci
427db96d56Sopenharmony_ci
437db96d56Sopenharmony_ciclass BaseSelectorEventLoop(base_events.BaseEventLoop):
447db96d56Sopenharmony_ci    """Selector event loop.
457db96d56Sopenharmony_ci
467db96d56Sopenharmony_ci    See events.EventLoop for API specification.
477db96d56Sopenharmony_ci    """
487db96d56Sopenharmony_ci
497db96d56Sopenharmony_ci    def __init__(self, selector=None):
507db96d56Sopenharmony_ci        super().__init__()
517db96d56Sopenharmony_ci
527db96d56Sopenharmony_ci        if selector is None:
537db96d56Sopenharmony_ci            selector = selectors.DefaultSelector()
547db96d56Sopenharmony_ci        logger.debug('Using selector: %s', selector.__class__.__name__)
557db96d56Sopenharmony_ci        self._selector = selector
567db96d56Sopenharmony_ci        self._make_self_pipe()
577db96d56Sopenharmony_ci        self._transports = weakref.WeakValueDictionary()
587db96d56Sopenharmony_ci
597db96d56Sopenharmony_ci    def _make_socket_transport(self, sock, protocol, waiter=None, *,
607db96d56Sopenharmony_ci                               extra=None, server=None):
617db96d56Sopenharmony_ci        return _SelectorSocketTransport(self, sock, protocol, waiter,
627db96d56Sopenharmony_ci                                        extra, server)
637db96d56Sopenharmony_ci
647db96d56Sopenharmony_ci    def _make_ssl_transport(
657db96d56Sopenharmony_ci            self, rawsock, protocol, sslcontext, waiter=None,
667db96d56Sopenharmony_ci            *, server_side=False, server_hostname=None,
677db96d56Sopenharmony_ci            extra=None, server=None,
687db96d56Sopenharmony_ci            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
697db96d56Sopenharmony_ci            ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
707db96d56Sopenharmony_ci    ):
717db96d56Sopenharmony_ci        ssl_protocol = sslproto.SSLProtocol(
727db96d56Sopenharmony_ci            self, protocol, sslcontext, waiter,
737db96d56Sopenharmony_ci            server_side, server_hostname,
747db96d56Sopenharmony_ci            ssl_handshake_timeout=ssl_handshake_timeout,
757db96d56Sopenharmony_ci            ssl_shutdown_timeout=ssl_shutdown_timeout
767db96d56Sopenharmony_ci        )
777db96d56Sopenharmony_ci        _SelectorSocketTransport(self, rawsock, ssl_protocol,
787db96d56Sopenharmony_ci                                 extra=extra, server=server)
797db96d56Sopenharmony_ci        return ssl_protocol._app_transport
807db96d56Sopenharmony_ci
817db96d56Sopenharmony_ci    def _make_datagram_transport(self, sock, protocol,
827db96d56Sopenharmony_ci                                 address=None, waiter=None, extra=None):
837db96d56Sopenharmony_ci        return _SelectorDatagramTransport(self, sock, protocol,
847db96d56Sopenharmony_ci                                          address, waiter, extra)
857db96d56Sopenharmony_ci
867db96d56Sopenharmony_ci    def close(self):
877db96d56Sopenharmony_ci        if self.is_running():
887db96d56Sopenharmony_ci            raise RuntimeError("Cannot close a running event loop")
897db96d56Sopenharmony_ci        if self.is_closed():
907db96d56Sopenharmony_ci            return
917db96d56Sopenharmony_ci        self._close_self_pipe()
927db96d56Sopenharmony_ci        super().close()
937db96d56Sopenharmony_ci        if self._selector is not None:
947db96d56Sopenharmony_ci            self._selector.close()
957db96d56Sopenharmony_ci            self._selector = None
967db96d56Sopenharmony_ci
977db96d56Sopenharmony_ci    def _close_self_pipe(self):
987db96d56Sopenharmony_ci        self._remove_reader(self._ssock.fileno())
997db96d56Sopenharmony_ci        self._ssock.close()
1007db96d56Sopenharmony_ci        self._ssock = None
1017db96d56Sopenharmony_ci        self._csock.close()
1027db96d56Sopenharmony_ci        self._csock = None
1037db96d56Sopenharmony_ci        self._internal_fds -= 1
1047db96d56Sopenharmony_ci
1057db96d56Sopenharmony_ci    def _make_self_pipe(self):
1067db96d56Sopenharmony_ci        # A self-socket, really. :-)
1077db96d56Sopenharmony_ci        self._ssock, self._csock = socket.socketpair()
1087db96d56Sopenharmony_ci        self._ssock.setblocking(False)
1097db96d56Sopenharmony_ci        self._csock.setblocking(False)
1107db96d56Sopenharmony_ci        self._internal_fds += 1
1117db96d56Sopenharmony_ci        self._add_reader(self._ssock.fileno(), self._read_from_self)
1127db96d56Sopenharmony_ci
1137db96d56Sopenharmony_ci    def _process_self_data(self, data):
1147db96d56Sopenharmony_ci        pass
1157db96d56Sopenharmony_ci
1167db96d56Sopenharmony_ci    def _read_from_self(self):
1177db96d56Sopenharmony_ci        while True:
1187db96d56Sopenharmony_ci            try:
1197db96d56Sopenharmony_ci                data = self._ssock.recv(4096)
1207db96d56Sopenharmony_ci                if not data:
1217db96d56Sopenharmony_ci                    break
1227db96d56Sopenharmony_ci                self._process_self_data(data)
1237db96d56Sopenharmony_ci            except InterruptedError:
1247db96d56Sopenharmony_ci                continue
1257db96d56Sopenharmony_ci            except BlockingIOError:
1267db96d56Sopenharmony_ci                break
1277db96d56Sopenharmony_ci
1287db96d56Sopenharmony_ci    def _write_to_self(self):
1297db96d56Sopenharmony_ci        # This may be called from a different thread, possibly after
1307db96d56Sopenharmony_ci        # _close_self_pipe() has been called or even while it is
1317db96d56Sopenharmony_ci        # running.  Guard for self._csock being None or closed.  When
1327db96d56Sopenharmony_ci        # a socket is closed, send() raises OSError (with errno set to
1337db96d56Sopenharmony_ci        # EBADF, but let's not rely on the exact error code).
1347db96d56Sopenharmony_ci        csock = self._csock
1357db96d56Sopenharmony_ci        if csock is None:
1367db96d56Sopenharmony_ci            return
1377db96d56Sopenharmony_ci
1387db96d56Sopenharmony_ci        try:
1397db96d56Sopenharmony_ci            csock.send(b'\0')
1407db96d56Sopenharmony_ci        except OSError:
1417db96d56Sopenharmony_ci            if self._debug:
1427db96d56Sopenharmony_ci                logger.debug("Fail to write a null byte into the "
1437db96d56Sopenharmony_ci                             "self-pipe socket",
1447db96d56Sopenharmony_ci                             exc_info=True)
1457db96d56Sopenharmony_ci
1467db96d56Sopenharmony_ci    def _start_serving(self, protocol_factory, sock,
1477db96d56Sopenharmony_ci                       sslcontext=None, server=None, backlog=100,
1487db96d56Sopenharmony_ci                       ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
1497db96d56Sopenharmony_ci                       ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
1507db96d56Sopenharmony_ci        self._add_reader(sock.fileno(), self._accept_connection,
1517db96d56Sopenharmony_ci                         protocol_factory, sock, sslcontext, server, backlog,
1527db96d56Sopenharmony_ci                         ssl_handshake_timeout, ssl_shutdown_timeout)
1537db96d56Sopenharmony_ci
1547db96d56Sopenharmony_ci    def _accept_connection(
1557db96d56Sopenharmony_ci            self, protocol_factory, sock,
1567db96d56Sopenharmony_ci            sslcontext=None, server=None, backlog=100,
1577db96d56Sopenharmony_ci            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
1587db96d56Sopenharmony_ci            ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
1597db96d56Sopenharmony_ci        # This method is only called once for each event loop tick where the
1607db96d56Sopenharmony_ci        # listening socket has triggered an EVENT_READ. There may be multiple
1617db96d56Sopenharmony_ci        # connections waiting for an .accept() so it is called in a loop.
1627db96d56Sopenharmony_ci        # See https://bugs.python.org/issue27906 for more details.
1637db96d56Sopenharmony_ci        for _ in range(backlog):
1647db96d56Sopenharmony_ci            try:
1657db96d56Sopenharmony_ci                conn, addr = sock.accept()
1667db96d56Sopenharmony_ci                if self._debug:
1677db96d56Sopenharmony_ci                    logger.debug("%r got a new connection from %r: %r",
1687db96d56Sopenharmony_ci                                 server, addr, conn)
1697db96d56Sopenharmony_ci                conn.setblocking(False)
1707db96d56Sopenharmony_ci            except (BlockingIOError, InterruptedError, ConnectionAbortedError):
1717db96d56Sopenharmony_ci                # Early exit because the socket accept buffer is empty.
1727db96d56Sopenharmony_ci                return None
1737db96d56Sopenharmony_ci            except OSError as exc:
1747db96d56Sopenharmony_ci                # There's nowhere to send the error, so just log it.
1757db96d56Sopenharmony_ci                if exc.errno in (errno.EMFILE, errno.ENFILE,
1767db96d56Sopenharmony_ci                                 errno.ENOBUFS, errno.ENOMEM):
1777db96d56Sopenharmony_ci                    # Some platforms (e.g. Linux keep reporting the FD as
1787db96d56Sopenharmony_ci                    # ready, so we remove the read handler temporarily.
1797db96d56Sopenharmony_ci                    # We'll try again in a while.
1807db96d56Sopenharmony_ci                    self.call_exception_handler({
1817db96d56Sopenharmony_ci                        'message': 'socket.accept() out of system resource',
1827db96d56Sopenharmony_ci                        'exception': exc,
1837db96d56Sopenharmony_ci                        'socket': trsock.TransportSocket(sock),
1847db96d56Sopenharmony_ci                    })
1857db96d56Sopenharmony_ci                    self._remove_reader(sock.fileno())
1867db96d56Sopenharmony_ci                    self.call_later(constants.ACCEPT_RETRY_DELAY,
1877db96d56Sopenharmony_ci                                    self._start_serving,
1887db96d56Sopenharmony_ci                                    protocol_factory, sock, sslcontext, server,
1897db96d56Sopenharmony_ci                                    backlog, ssl_handshake_timeout,
1907db96d56Sopenharmony_ci                                    ssl_shutdown_timeout)
1917db96d56Sopenharmony_ci                else:
1927db96d56Sopenharmony_ci                    raise  # The event loop will catch, log and ignore it.
1937db96d56Sopenharmony_ci            else:
1947db96d56Sopenharmony_ci                extra = {'peername': addr}
1957db96d56Sopenharmony_ci                accept = self._accept_connection2(
1967db96d56Sopenharmony_ci                    protocol_factory, conn, extra, sslcontext, server,
1977db96d56Sopenharmony_ci                    ssl_handshake_timeout, ssl_shutdown_timeout)
1987db96d56Sopenharmony_ci                self.create_task(accept)
1997db96d56Sopenharmony_ci
2007db96d56Sopenharmony_ci    async def _accept_connection2(
2017db96d56Sopenharmony_ci            self, protocol_factory, conn, extra,
2027db96d56Sopenharmony_ci            sslcontext=None, server=None,
2037db96d56Sopenharmony_ci            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
2047db96d56Sopenharmony_ci            ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
2057db96d56Sopenharmony_ci        protocol = None
2067db96d56Sopenharmony_ci        transport = None
2077db96d56Sopenharmony_ci        try:
2087db96d56Sopenharmony_ci            protocol = protocol_factory()
2097db96d56Sopenharmony_ci            waiter = self.create_future()
2107db96d56Sopenharmony_ci            if sslcontext:
2117db96d56Sopenharmony_ci                transport = self._make_ssl_transport(
2127db96d56Sopenharmony_ci                    conn, protocol, sslcontext, waiter=waiter,
2137db96d56Sopenharmony_ci                    server_side=True, extra=extra, server=server,
2147db96d56Sopenharmony_ci                    ssl_handshake_timeout=ssl_handshake_timeout,
2157db96d56Sopenharmony_ci                    ssl_shutdown_timeout=ssl_shutdown_timeout)
2167db96d56Sopenharmony_ci            else:
2177db96d56Sopenharmony_ci                transport = self._make_socket_transport(
2187db96d56Sopenharmony_ci                    conn, protocol, waiter=waiter, extra=extra,
2197db96d56Sopenharmony_ci                    server=server)
2207db96d56Sopenharmony_ci
2217db96d56Sopenharmony_ci            try:
2227db96d56Sopenharmony_ci                await waiter
2237db96d56Sopenharmony_ci            except BaseException:
2247db96d56Sopenharmony_ci                transport.close()
2257db96d56Sopenharmony_ci                raise
2267db96d56Sopenharmony_ci                # It's now up to the protocol to handle the connection.
2277db96d56Sopenharmony_ci
2287db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
2297db96d56Sopenharmony_ci            raise
2307db96d56Sopenharmony_ci        except BaseException as exc:
2317db96d56Sopenharmony_ci            if self._debug:
2327db96d56Sopenharmony_ci                context = {
2337db96d56Sopenharmony_ci                    'message':
2347db96d56Sopenharmony_ci                        'Error on transport creation for incoming connection',
2357db96d56Sopenharmony_ci                    'exception': exc,
2367db96d56Sopenharmony_ci                }
2377db96d56Sopenharmony_ci                if protocol is not None:
2387db96d56Sopenharmony_ci                    context['protocol'] = protocol
2397db96d56Sopenharmony_ci                if transport is not None:
2407db96d56Sopenharmony_ci                    context['transport'] = transport
2417db96d56Sopenharmony_ci                self.call_exception_handler(context)
2427db96d56Sopenharmony_ci
2437db96d56Sopenharmony_ci    def _ensure_fd_no_transport(self, fd):
2447db96d56Sopenharmony_ci        fileno = fd
2457db96d56Sopenharmony_ci        if not isinstance(fileno, int):
2467db96d56Sopenharmony_ci            try:
2477db96d56Sopenharmony_ci                fileno = int(fileno.fileno())
2487db96d56Sopenharmony_ci            except (AttributeError, TypeError, ValueError):
2497db96d56Sopenharmony_ci                # This code matches selectors._fileobj_to_fd function.
2507db96d56Sopenharmony_ci                raise ValueError(f"Invalid file object: {fd!r}") from None
2517db96d56Sopenharmony_ci        try:
2527db96d56Sopenharmony_ci            transport = self._transports[fileno]
2537db96d56Sopenharmony_ci        except KeyError:
2547db96d56Sopenharmony_ci            pass
2557db96d56Sopenharmony_ci        else:
2567db96d56Sopenharmony_ci            if not transport.is_closing():
2577db96d56Sopenharmony_ci                raise RuntimeError(
2587db96d56Sopenharmony_ci                    f'File descriptor {fd!r} is used by transport '
2597db96d56Sopenharmony_ci                    f'{transport!r}')
2607db96d56Sopenharmony_ci
2617db96d56Sopenharmony_ci    def _add_reader(self, fd, callback, *args):
2627db96d56Sopenharmony_ci        self._check_closed()
2637db96d56Sopenharmony_ci        handle = events.Handle(callback, args, self, None)
2647db96d56Sopenharmony_ci        try:
2657db96d56Sopenharmony_ci            key = self._selector.get_key(fd)
2667db96d56Sopenharmony_ci        except KeyError:
2677db96d56Sopenharmony_ci            self._selector.register(fd, selectors.EVENT_READ,
2687db96d56Sopenharmony_ci                                    (handle, None))
2697db96d56Sopenharmony_ci        else:
2707db96d56Sopenharmony_ci            mask, (reader, writer) = key.events, key.data
2717db96d56Sopenharmony_ci            self._selector.modify(fd, mask | selectors.EVENT_READ,
2727db96d56Sopenharmony_ci                                  (handle, writer))
2737db96d56Sopenharmony_ci            if reader is not None:
2747db96d56Sopenharmony_ci                reader.cancel()
2757db96d56Sopenharmony_ci        return handle
2767db96d56Sopenharmony_ci
2777db96d56Sopenharmony_ci    def _remove_reader(self, fd):
2787db96d56Sopenharmony_ci        if self.is_closed():
2797db96d56Sopenharmony_ci            return False
2807db96d56Sopenharmony_ci        try:
2817db96d56Sopenharmony_ci            key = self._selector.get_key(fd)
2827db96d56Sopenharmony_ci        except KeyError:
2837db96d56Sopenharmony_ci            return False
2847db96d56Sopenharmony_ci        else:
2857db96d56Sopenharmony_ci            mask, (reader, writer) = key.events, key.data
2867db96d56Sopenharmony_ci            mask &= ~selectors.EVENT_READ
2877db96d56Sopenharmony_ci            if not mask:
2887db96d56Sopenharmony_ci                self._selector.unregister(fd)
2897db96d56Sopenharmony_ci            else:
2907db96d56Sopenharmony_ci                self._selector.modify(fd, mask, (None, writer))
2917db96d56Sopenharmony_ci
2927db96d56Sopenharmony_ci            if reader is not None:
2937db96d56Sopenharmony_ci                reader.cancel()
2947db96d56Sopenharmony_ci                return True
2957db96d56Sopenharmony_ci            else:
2967db96d56Sopenharmony_ci                return False
2977db96d56Sopenharmony_ci
2987db96d56Sopenharmony_ci    def _add_writer(self, fd, callback, *args):
2997db96d56Sopenharmony_ci        self._check_closed()
3007db96d56Sopenharmony_ci        handle = events.Handle(callback, args, self, None)
3017db96d56Sopenharmony_ci        try:
3027db96d56Sopenharmony_ci            key = self._selector.get_key(fd)
3037db96d56Sopenharmony_ci        except KeyError:
3047db96d56Sopenharmony_ci            self._selector.register(fd, selectors.EVENT_WRITE,
3057db96d56Sopenharmony_ci                                    (None, handle))
3067db96d56Sopenharmony_ci        else:
3077db96d56Sopenharmony_ci            mask, (reader, writer) = key.events, key.data
3087db96d56Sopenharmony_ci            self._selector.modify(fd, mask | selectors.EVENT_WRITE,
3097db96d56Sopenharmony_ci                                  (reader, handle))
3107db96d56Sopenharmony_ci            if writer is not None:
3117db96d56Sopenharmony_ci                writer.cancel()
3127db96d56Sopenharmony_ci        return handle
3137db96d56Sopenharmony_ci
3147db96d56Sopenharmony_ci    def _remove_writer(self, fd):
3157db96d56Sopenharmony_ci        """Remove a writer callback."""
3167db96d56Sopenharmony_ci        if self.is_closed():
3177db96d56Sopenharmony_ci            return False
3187db96d56Sopenharmony_ci        try:
3197db96d56Sopenharmony_ci            key = self._selector.get_key(fd)
3207db96d56Sopenharmony_ci        except KeyError:
3217db96d56Sopenharmony_ci            return False
3227db96d56Sopenharmony_ci        else:
3237db96d56Sopenharmony_ci            mask, (reader, writer) = key.events, key.data
3247db96d56Sopenharmony_ci            # Remove both writer and connector.
3257db96d56Sopenharmony_ci            mask &= ~selectors.EVENT_WRITE
3267db96d56Sopenharmony_ci            if not mask:
3277db96d56Sopenharmony_ci                self._selector.unregister(fd)
3287db96d56Sopenharmony_ci            else:
3297db96d56Sopenharmony_ci                self._selector.modify(fd, mask, (reader, None))
3307db96d56Sopenharmony_ci
3317db96d56Sopenharmony_ci            if writer is not None:
3327db96d56Sopenharmony_ci                writer.cancel()
3337db96d56Sopenharmony_ci                return True
3347db96d56Sopenharmony_ci            else:
3357db96d56Sopenharmony_ci                return False
3367db96d56Sopenharmony_ci
3377db96d56Sopenharmony_ci    def add_reader(self, fd, callback, *args):
3387db96d56Sopenharmony_ci        """Add a reader callback."""
3397db96d56Sopenharmony_ci        self._ensure_fd_no_transport(fd)
3407db96d56Sopenharmony_ci        self._add_reader(fd, callback, *args)
3417db96d56Sopenharmony_ci
3427db96d56Sopenharmony_ci    def remove_reader(self, fd):
3437db96d56Sopenharmony_ci        """Remove a reader callback."""
3447db96d56Sopenharmony_ci        self._ensure_fd_no_transport(fd)
3457db96d56Sopenharmony_ci        return self._remove_reader(fd)
3467db96d56Sopenharmony_ci
3477db96d56Sopenharmony_ci    def add_writer(self, fd, callback, *args):
3487db96d56Sopenharmony_ci        """Add a writer callback.."""
3497db96d56Sopenharmony_ci        self._ensure_fd_no_transport(fd)
3507db96d56Sopenharmony_ci        self._add_writer(fd, callback, *args)
3517db96d56Sopenharmony_ci
3527db96d56Sopenharmony_ci    def remove_writer(self, fd):
3537db96d56Sopenharmony_ci        """Remove a writer callback."""
3547db96d56Sopenharmony_ci        self._ensure_fd_no_transport(fd)
3557db96d56Sopenharmony_ci        return self._remove_writer(fd)
3567db96d56Sopenharmony_ci
3577db96d56Sopenharmony_ci    async def sock_recv(self, sock, n):
3587db96d56Sopenharmony_ci        """Receive data from the socket.
3597db96d56Sopenharmony_ci
3607db96d56Sopenharmony_ci        The return value is a bytes object representing the data received.
3617db96d56Sopenharmony_ci        The maximum amount of data to be received at once is specified by
3627db96d56Sopenharmony_ci        nbytes.
3637db96d56Sopenharmony_ci        """
3647db96d56Sopenharmony_ci        base_events._check_ssl_socket(sock)
3657db96d56Sopenharmony_ci        if self._debug and sock.gettimeout() != 0:
3667db96d56Sopenharmony_ci            raise ValueError("the socket must be non-blocking")
3677db96d56Sopenharmony_ci        try:
3687db96d56Sopenharmony_ci            return sock.recv(n)
3697db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
3707db96d56Sopenharmony_ci            pass
3717db96d56Sopenharmony_ci        fut = self.create_future()
3727db96d56Sopenharmony_ci        fd = sock.fileno()
3737db96d56Sopenharmony_ci        self._ensure_fd_no_transport(fd)
3747db96d56Sopenharmony_ci        handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
3757db96d56Sopenharmony_ci        fut.add_done_callback(
3767db96d56Sopenharmony_ci            functools.partial(self._sock_read_done, fd, handle=handle))
3777db96d56Sopenharmony_ci        return await fut
3787db96d56Sopenharmony_ci
3797db96d56Sopenharmony_ci    def _sock_read_done(self, fd, fut, handle=None):
3807db96d56Sopenharmony_ci        if handle is None or not handle.cancelled():
3817db96d56Sopenharmony_ci            self.remove_reader(fd)
3827db96d56Sopenharmony_ci
3837db96d56Sopenharmony_ci    def _sock_recv(self, fut, sock, n):
3847db96d56Sopenharmony_ci        # _sock_recv() can add itself as an I/O callback if the operation can't
3857db96d56Sopenharmony_ci        # be done immediately. Don't use it directly, call sock_recv().
3867db96d56Sopenharmony_ci        if fut.done():
3877db96d56Sopenharmony_ci            return
3887db96d56Sopenharmony_ci        try:
3897db96d56Sopenharmony_ci            data = sock.recv(n)
3907db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
3917db96d56Sopenharmony_ci            return  # try again next time
3927db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
3937db96d56Sopenharmony_ci            raise
3947db96d56Sopenharmony_ci        except BaseException as exc:
3957db96d56Sopenharmony_ci            fut.set_exception(exc)
3967db96d56Sopenharmony_ci        else:
3977db96d56Sopenharmony_ci            fut.set_result(data)
3987db96d56Sopenharmony_ci
3997db96d56Sopenharmony_ci    async def sock_recv_into(self, sock, buf):
4007db96d56Sopenharmony_ci        """Receive data from the socket.
4017db96d56Sopenharmony_ci
4027db96d56Sopenharmony_ci        The received data is written into *buf* (a writable buffer).
4037db96d56Sopenharmony_ci        The return value is the number of bytes written.
4047db96d56Sopenharmony_ci        """
4057db96d56Sopenharmony_ci        base_events._check_ssl_socket(sock)
4067db96d56Sopenharmony_ci        if self._debug and sock.gettimeout() != 0:
4077db96d56Sopenharmony_ci            raise ValueError("the socket must be non-blocking")
4087db96d56Sopenharmony_ci        try:
4097db96d56Sopenharmony_ci            return sock.recv_into(buf)
4107db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
4117db96d56Sopenharmony_ci            pass
4127db96d56Sopenharmony_ci        fut = self.create_future()
4137db96d56Sopenharmony_ci        fd = sock.fileno()
4147db96d56Sopenharmony_ci        self._ensure_fd_no_transport(fd)
4157db96d56Sopenharmony_ci        handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
4167db96d56Sopenharmony_ci        fut.add_done_callback(
4177db96d56Sopenharmony_ci            functools.partial(self._sock_read_done, fd, handle=handle))
4187db96d56Sopenharmony_ci        return await fut
4197db96d56Sopenharmony_ci
4207db96d56Sopenharmony_ci    def _sock_recv_into(self, fut, sock, buf):
4217db96d56Sopenharmony_ci        # _sock_recv_into() can add itself as an I/O callback if the operation
4227db96d56Sopenharmony_ci        # can't be done immediately. Don't use it directly, call
4237db96d56Sopenharmony_ci        # sock_recv_into().
4247db96d56Sopenharmony_ci        if fut.done():
4257db96d56Sopenharmony_ci            return
4267db96d56Sopenharmony_ci        try:
4277db96d56Sopenharmony_ci            nbytes = sock.recv_into(buf)
4287db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
4297db96d56Sopenharmony_ci            return  # try again next time
4307db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
4317db96d56Sopenharmony_ci            raise
4327db96d56Sopenharmony_ci        except BaseException as exc:
4337db96d56Sopenharmony_ci            fut.set_exception(exc)
4347db96d56Sopenharmony_ci        else:
4357db96d56Sopenharmony_ci            fut.set_result(nbytes)
4367db96d56Sopenharmony_ci
4377db96d56Sopenharmony_ci    async def sock_recvfrom(self, sock, bufsize):
4387db96d56Sopenharmony_ci        """Receive a datagram from a datagram socket.
4397db96d56Sopenharmony_ci
4407db96d56Sopenharmony_ci        The return value is a tuple of (bytes, address) representing the
4417db96d56Sopenharmony_ci        datagram received and the address it came from.
4427db96d56Sopenharmony_ci        The maximum amount of data to be received at once is specified by
4437db96d56Sopenharmony_ci        nbytes.
4447db96d56Sopenharmony_ci        """
4457db96d56Sopenharmony_ci        base_events._check_ssl_socket(sock)
4467db96d56Sopenharmony_ci        if self._debug and sock.gettimeout() != 0:
4477db96d56Sopenharmony_ci            raise ValueError("the socket must be non-blocking")
4487db96d56Sopenharmony_ci        try:
4497db96d56Sopenharmony_ci            return sock.recvfrom(bufsize)
4507db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
4517db96d56Sopenharmony_ci            pass
4527db96d56Sopenharmony_ci        fut = self.create_future()
4537db96d56Sopenharmony_ci        fd = sock.fileno()
4547db96d56Sopenharmony_ci        self._ensure_fd_no_transport(fd)
4557db96d56Sopenharmony_ci        handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
4567db96d56Sopenharmony_ci        fut.add_done_callback(
4577db96d56Sopenharmony_ci            functools.partial(self._sock_read_done, fd, handle=handle))
4587db96d56Sopenharmony_ci        return await fut
4597db96d56Sopenharmony_ci
4607db96d56Sopenharmony_ci    def _sock_recvfrom(self, fut, sock, bufsize):
4617db96d56Sopenharmony_ci        # _sock_recvfrom() can add itself as an I/O callback if the operation
4627db96d56Sopenharmony_ci        # can't be done immediately. Don't use it directly, call
4637db96d56Sopenharmony_ci        # sock_recvfrom().
4647db96d56Sopenharmony_ci        if fut.done():
4657db96d56Sopenharmony_ci            return
4667db96d56Sopenharmony_ci        try:
4677db96d56Sopenharmony_ci            result = sock.recvfrom(bufsize)
4687db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
4697db96d56Sopenharmony_ci            return  # try again next time
4707db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
4717db96d56Sopenharmony_ci            raise
4727db96d56Sopenharmony_ci        except BaseException as exc:
4737db96d56Sopenharmony_ci            fut.set_exception(exc)
4747db96d56Sopenharmony_ci        else:
4757db96d56Sopenharmony_ci            fut.set_result(result)
4767db96d56Sopenharmony_ci
4777db96d56Sopenharmony_ci    async def sock_recvfrom_into(self, sock, buf, nbytes=0):
4787db96d56Sopenharmony_ci        """Receive data from the socket.
4797db96d56Sopenharmony_ci
4807db96d56Sopenharmony_ci        The received data is written into *buf* (a writable buffer).
4817db96d56Sopenharmony_ci        The return value is a tuple of (number of bytes written, address).
4827db96d56Sopenharmony_ci        """
4837db96d56Sopenharmony_ci        base_events._check_ssl_socket(sock)
4847db96d56Sopenharmony_ci        if self._debug and sock.gettimeout() != 0:
4857db96d56Sopenharmony_ci            raise ValueError("the socket must be non-blocking")
4867db96d56Sopenharmony_ci        if not nbytes:
4877db96d56Sopenharmony_ci            nbytes = len(buf)
4887db96d56Sopenharmony_ci
4897db96d56Sopenharmony_ci        try:
4907db96d56Sopenharmony_ci            return sock.recvfrom_into(buf, nbytes)
4917db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
4927db96d56Sopenharmony_ci            pass
4937db96d56Sopenharmony_ci        fut = self.create_future()
4947db96d56Sopenharmony_ci        fd = sock.fileno()
4957db96d56Sopenharmony_ci        self._ensure_fd_no_transport(fd)
4967db96d56Sopenharmony_ci        handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
4977db96d56Sopenharmony_ci                                  nbytes)
4987db96d56Sopenharmony_ci        fut.add_done_callback(
4997db96d56Sopenharmony_ci            functools.partial(self._sock_read_done, fd, handle=handle))
5007db96d56Sopenharmony_ci        return await fut
5017db96d56Sopenharmony_ci
5027db96d56Sopenharmony_ci    def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
5037db96d56Sopenharmony_ci        # _sock_recv_into() can add itself as an I/O callback if the operation
5047db96d56Sopenharmony_ci        # can't be done immediately. Don't use it directly, call
5057db96d56Sopenharmony_ci        # sock_recv_into().
5067db96d56Sopenharmony_ci        if fut.done():
5077db96d56Sopenharmony_ci            return
5087db96d56Sopenharmony_ci        try:
5097db96d56Sopenharmony_ci            result = sock.recvfrom_into(buf, bufsize)
5107db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
5117db96d56Sopenharmony_ci            return  # try again next time
5127db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
5137db96d56Sopenharmony_ci            raise
5147db96d56Sopenharmony_ci        except BaseException as exc:
5157db96d56Sopenharmony_ci            fut.set_exception(exc)
5167db96d56Sopenharmony_ci        else:
5177db96d56Sopenharmony_ci            fut.set_result(result)
5187db96d56Sopenharmony_ci
5197db96d56Sopenharmony_ci    async def sock_sendall(self, sock, data):
5207db96d56Sopenharmony_ci        """Send data to the socket.
5217db96d56Sopenharmony_ci
5227db96d56Sopenharmony_ci        The socket must be connected to a remote socket. This method continues
5237db96d56Sopenharmony_ci        to send data from data until either all data has been sent or an
5247db96d56Sopenharmony_ci        error occurs. None is returned on success. On error, an exception is
5257db96d56Sopenharmony_ci        raised, and there is no way to determine how much data, if any, was
5267db96d56Sopenharmony_ci        successfully processed by the receiving end of the connection.
5277db96d56Sopenharmony_ci        """
5287db96d56Sopenharmony_ci        base_events._check_ssl_socket(sock)
5297db96d56Sopenharmony_ci        if self._debug and sock.gettimeout() != 0:
5307db96d56Sopenharmony_ci            raise ValueError("the socket must be non-blocking")
5317db96d56Sopenharmony_ci        try:
5327db96d56Sopenharmony_ci            n = sock.send(data)
5337db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
5347db96d56Sopenharmony_ci            n = 0
5357db96d56Sopenharmony_ci
5367db96d56Sopenharmony_ci        if n == len(data):
5377db96d56Sopenharmony_ci            # all data sent
5387db96d56Sopenharmony_ci            return
5397db96d56Sopenharmony_ci
5407db96d56Sopenharmony_ci        fut = self.create_future()
5417db96d56Sopenharmony_ci        fd = sock.fileno()
5427db96d56Sopenharmony_ci        self._ensure_fd_no_transport(fd)
5437db96d56Sopenharmony_ci        # use a trick with a list in closure to store a mutable state
5447db96d56Sopenharmony_ci        handle = self._add_writer(fd, self._sock_sendall, fut, sock,
5457db96d56Sopenharmony_ci                                  memoryview(data), [n])
5467db96d56Sopenharmony_ci        fut.add_done_callback(
5477db96d56Sopenharmony_ci            functools.partial(self._sock_write_done, fd, handle=handle))
5487db96d56Sopenharmony_ci        return await fut
5497db96d56Sopenharmony_ci
5507db96d56Sopenharmony_ci    def _sock_sendall(self, fut, sock, view, pos):
5517db96d56Sopenharmony_ci        if fut.done():
5527db96d56Sopenharmony_ci            # Future cancellation can be scheduled on previous loop iteration
5537db96d56Sopenharmony_ci            return
5547db96d56Sopenharmony_ci        start = pos[0]
5557db96d56Sopenharmony_ci        try:
5567db96d56Sopenharmony_ci            n = sock.send(view[start:])
5577db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
5587db96d56Sopenharmony_ci            return
5597db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
5607db96d56Sopenharmony_ci            raise
5617db96d56Sopenharmony_ci        except BaseException as exc:
5627db96d56Sopenharmony_ci            fut.set_exception(exc)
5637db96d56Sopenharmony_ci            return
5647db96d56Sopenharmony_ci
5657db96d56Sopenharmony_ci        start += n
5667db96d56Sopenharmony_ci
5677db96d56Sopenharmony_ci        if start == len(view):
5687db96d56Sopenharmony_ci            fut.set_result(None)
5697db96d56Sopenharmony_ci        else:
5707db96d56Sopenharmony_ci            pos[0] = start
5717db96d56Sopenharmony_ci
5727db96d56Sopenharmony_ci    async def sock_sendto(self, sock, data, address):
5737db96d56Sopenharmony_ci        """Send data to the socket.
5747db96d56Sopenharmony_ci
5757db96d56Sopenharmony_ci        The socket must be connected to a remote socket. This method continues
5767db96d56Sopenharmony_ci        to send data from data until either all data has been sent or an
5777db96d56Sopenharmony_ci        error occurs. None is returned on success. On error, an exception is
5787db96d56Sopenharmony_ci        raised, and there is no way to determine how much data, if any, was
5797db96d56Sopenharmony_ci        successfully processed by the receiving end of the connection.
5807db96d56Sopenharmony_ci        """
5817db96d56Sopenharmony_ci        base_events._check_ssl_socket(sock)
5827db96d56Sopenharmony_ci        if self._debug and sock.gettimeout() != 0:
5837db96d56Sopenharmony_ci            raise ValueError("the socket must be non-blocking")
5847db96d56Sopenharmony_ci        try:
5857db96d56Sopenharmony_ci            return sock.sendto(data, address)
5867db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
5877db96d56Sopenharmony_ci            pass
5887db96d56Sopenharmony_ci
5897db96d56Sopenharmony_ci        fut = self.create_future()
5907db96d56Sopenharmony_ci        fd = sock.fileno()
5917db96d56Sopenharmony_ci        self._ensure_fd_no_transport(fd)
5927db96d56Sopenharmony_ci        # use a trick with a list in closure to store a mutable state
5937db96d56Sopenharmony_ci        handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
5947db96d56Sopenharmony_ci                                  address)
5957db96d56Sopenharmony_ci        fut.add_done_callback(
5967db96d56Sopenharmony_ci            functools.partial(self._sock_write_done, fd, handle=handle))
5977db96d56Sopenharmony_ci        return await fut
5987db96d56Sopenharmony_ci
5997db96d56Sopenharmony_ci    def _sock_sendto(self, fut, sock, data, address):
6007db96d56Sopenharmony_ci        if fut.done():
6017db96d56Sopenharmony_ci            # Future cancellation can be scheduled on previous loop iteration
6027db96d56Sopenharmony_ci            return
6037db96d56Sopenharmony_ci        try:
6047db96d56Sopenharmony_ci            n = sock.sendto(data, 0, address)
6057db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
6067db96d56Sopenharmony_ci            return
6077db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
6087db96d56Sopenharmony_ci            raise
6097db96d56Sopenharmony_ci        except BaseException as exc:
6107db96d56Sopenharmony_ci            fut.set_exception(exc)
6117db96d56Sopenharmony_ci        else:
6127db96d56Sopenharmony_ci            fut.set_result(n)
6137db96d56Sopenharmony_ci
6147db96d56Sopenharmony_ci    async def sock_connect(self, sock, address):
6157db96d56Sopenharmony_ci        """Connect to a remote socket at address.
6167db96d56Sopenharmony_ci
6177db96d56Sopenharmony_ci        This method is a coroutine.
6187db96d56Sopenharmony_ci        """
6197db96d56Sopenharmony_ci        base_events._check_ssl_socket(sock)
6207db96d56Sopenharmony_ci        if self._debug and sock.gettimeout() != 0:
6217db96d56Sopenharmony_ci            raise ValueError("the socket must be non-blocking")
6227db96d56Sopenharmony_ci
6237db96d56Sopenharmony_ci        if sock.family == socket.AF_INET or (
6247db96d56Sopenharmony_ci                base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
6257db96d56Sopenharmony_ci            resolved = await self._ensure_resolved(
6267db96d56Sopenharmony_ci                address, family=sock.family, type=sock.type, proto=sock.proto,
6277db96d56Sopenharmony_ci                loop=self,
6287db96d56Sopenharmony_ci            )
6297db96d56Sopenharmony_ci            _, _, _, _, address = resolved[0]
6307db96d56Sopenharmony_ci
6317db96d56Sopenharmony_ci        fut = self.create_future()
6327db96d56Sopenharmony_ci        self._sock_connect(fut, sock, address)
6337db96d56Sopenharmony_ci        try:
6347db96d56Sopenharmony_ci            return await fut
6357db96d56Sopenharmony_ci        finally:
6367db96d56Sopenharmony_ci            # Needed to break cycles when an exception occurs.
6377db96d56Sopenharmony_ci            fut = None
6387db96d56Sopenharmony_ci
6397db96d56Sopenharmony_ci    def _sock_connect(self, fut, sock, address):
6407db96d56Sopenharmony_ci        fd = sock.fileno()
6417db96d56Sopenharmony_ci        try:
6427db96d56Sopenharmony_ci            sock.connect(address)
6437db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
6447db96d56Sopenharmony_ci            # Issue #23618: When the C function connect() fails with EINTR, the
6457db96d56Sopenharmony_ci            # connection runs in background. We have to wait until the socket
6467db96d56Sopenharmony_ci            # becomes writable to be notified when the connection succeed or
6477db96d56Sopenharmony_ci            # fails.
6487db96d56Sopenharmony_ci            self._ensure_fd_no_transport(fd)
6497db96d56Sopenharmony_ci            handle = self._add_writer(
6507db96d56Sopenharmony_ci                fd, self._sock_connect_cb, fut, sock, address)
6517db96d56Sopenharmony_ci            fut.add_done_callback(
6527db96d56Sopenharmony_ci                functools.partial(self._sock_write_done, fd, handle=handle))
6537db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
6547db96d56Sopenharmony_ci            raise
6557db96d56Sopenharmony_ci        except BaseException as exc:
6567db96d56Sopenharmony_ci            fut.set_exception(exc)
6577db96d56Sopenharmony_ci        else:
6587db96d56Sopenharmony_ci            fut.set_result(None)
6597db96d56Sopenharmony_ci        finally:
6607db96d56Sopenharmony_ci            fut = None
6617db96d56Sopenharmony_ci
6627db96d56Sopenharmony_ci    def _sock_write_done(self, fd, fut, handle=None):
6637db96d56Sopenharmony_ci        if handle is None or not handle.cancelled():
6647db96d56Sopenharmony_ci            self.remove_writer(fd)
6657db96d56Sopenharmony_ci
6667db96d56Sopenharmony_ci    def _sock_connect_cb(self, fut, sock, address):
6677db96d56Sopenharmony_ci        if fut.done():
6687db96d56Sopenharmony_ci            return
6697db96d56Sopenharmony_ci
6707db96d56Sopenharmony_ci        try:
6717db96d56Sopenharmony_ci            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
6727db96d56Sopenharmony_ci            if err != 0:
6737db96d56Sopenharmony_ci                # Jump to any except clause below.
6747db96d56Sopenharmony_ci                raise OSError(err, f'Connect call failed {address}')
6757db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
6767db96d56Sopenharmony_ci            # socket is still registered, the callback will be retried later
6777db96d56Sopenharmony_ci            pass
6787db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
6797db96d56Sopenharmony_ci            raise
6807db96d56Sopenharmony_ci        except BaseException as exc:
6817db96d56Sopenharmony_ci            fut.set_exception(exc)
6827db96d56Sopenharmony_ci        else:
6837db96d56Sopenharmony_ci            fut.set_result(None)
6847db96d56Sopenharmony_ci        finally:
6857db96d56Sopenharmony_ci            fut = None
6867db96d56Sopenharmony_ci
6877db96d56Sopenharmony_ci    async def sock_accept(self, sock):
6887db96d56Sopenharmony_ci        """Accept a connection.
6897db96d56Sopenharmony_ci
6907db96d56Sopenharmony_ci        The socket must be bound to an address and listening for connections.
6917db96d56Sopenharmony_ci        The return value is a pair (conn, address) where conn is a new socket
6927db96d56Sopenharmony_ci        object usable to send and receive data on the connection, and address
6937db96d56Sopenharmony_ci        is the address bound to the socket on the other end of the connection.
6947db96d56Sopenharmony_ci        """
6957db96d56Sopenharmony_ci        base_events._check_ssl_socket(sock)
6967db96d56Sopenharmony_ci        if self._debug and sock.gettimeout() != 0:
6977db96d56Sopenharmony_ci            raise ValueError("the socket must be non-blocking")
6987db96d56Sopenharmony_ci        fut = self.create_future()
6997db96d56Sopenharmony_ci        self._sock_accept(fut, sock)
7007db96d56Sopenharmony_ci        return await fut
7017db96d56Sopenharmony_ci
7027db96d56Sopenharmony_ci    def _sock_accept(self, fut, sock):
7037db96d56Sopenharmony_ci        fd = sock.fileno()
7047db96d56Sopenharmony_ci        try:
7057db96d56Sopenharmony_ci            conn, address = sock.accept()
7067db96d56Sopenharmony_ci            conn.setblocking(False)
7077db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
7087db96d56Sopenharmony_ci            self._ensure_fd_no_transport(fd)
7097db96d56Sopenharmony_ci            handle = self._add_reader(fd, self._sock_accept, fut, sock)
7107db96d56Sopenharmony_ci            fut.add_done_callback(
7117db96d56Sopenharmony_ci                functools.partial(self._sock_read_done, fd, handle=handle))
7127db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
7137db96d56Sopenharmony_ci            raise
7147db96d56Sopenharmony_ci        except BaseException as exc:
7157db96d56Sopenharmony_ci            fut.set_exception(exc)
7167db96d56Sopenharmony_ci        else:
7177db96d56Sopenharmony_ci            fut.set_result((conn, address))
7187db96d56Sopenharmony_ci
7197db96d56Sopenharmony_ci    async def _sendfile_native(self, transp, file, offset, count):
7207db96d56Sopenharmony_ci        del self._transports[transp._sock_fd]
7217db96d56Sopenharmony_ci        resume_reading = transp.is_reading()
7227db96d56Sopenharmony_ci        transp.pause_reading()
7237db96d56Sopenharmony_ci        await transp._make_empty_waiter()
7247db96d56Sopenharmony_ci        try:
7257db96d56Sopenharmony_ci            return await self.sock_sendfile(transp._sock, file, offset, count,
7267db96d56Sopenharmony_ci                                            fallback=False)
7277db96d56Sopenharmony_ci        finally:
7287db96d56Sopenharmony_ci            transp._reset_empty_waiter()
7297db96d56Sopenharmony_ci            if resume_reading:
7307db96d56Sopenharmony_ci                transp.resume_reading()
7317db96d56Sopenharmony_ci            self._transports[transp._sock_fd] = transp
7327db96d56Sopenharmony_ci
7337db96d56Sopenharmony_ci    def _process_events(self, event_list):
7347db96d56Sopenharmony_ci        for key, mask in event_list:
7357db96d56Sopenharmony_ci            fileobj, (reader, writer) = key.fileobj, key.data
7367db96d56Sopenharmony_ci            if mask & selectors.EVENT_READ and reader is not None:
7377db96d56Sopenharmony_ci                if reader._cancelled:
7387db96d56Sopenharmony_ci                    self._remove_reader(fileobj)
7397db96d56Sopenharmony_ci                else:
7407db96d56Sopenharmony_ci                    self._add_callback(reader)
7417db96d56Sopenharmony_ci            if mask & selectors.EVENT_WRITE and writer is not None:
7427db96d56Sopenharmony_ci                if writer._cancelled:
7437db96d56Sopenharmony_ci                    self._remove_writer(fileobj)
7447db96d56Sopenharmony_ci                else:
7457db96d56Sopenharmony_ci                    self._add_callback(writer)
7467db96d56Sopenharmony_ci
7477db96d56Sopenharmony_ci    def _stop_serving(self, sock):
7487db96d56Sopenharmony_ci        self._remove_reader(sock.fileno())
7497db96d56Sopenharmony_ci        sock.close()
7507db96d56Sopenharmony_ci
7517db96d56Sopenharmony_ci
7527db96d56Sopenharmony_ciclass _SelectorTransport(transports._FlowControlMixin,
7537db96d56Sopenharmony_ci                         transports.Transport):
7547db96d56Sopenharmony_ci
7557db96d56Sopenharmony_ci    max_size = 256 * 1024  # Buffer size passed to recv().
7567db96d56Sopenharmony_ci
7577db96d56Sopenharmony_ci    _buffer_factory = bytearray  # Constructs initial value for self._buffer.
7587db96d56Sopenharmony_ci
7597db96d56Sopenharmony_ci    # Attribute used in the destructor: it must be set even if the constructor
7607db96d56Sopenharmony_ci    # is not called (see _SelectorSslTransport which may start by raising an
7617db96d56Sopenharmony_ci    # exception)
7627db96d56Sopenharmony_ci    _sock = None
7637db96d56Sopenharmony_ci
7647db96d56Sopenharmony_ci    def __init__(self, loop, sock, protocol, extra=None, server=None):
7657db96d56Sopenharmony_ci        super().__init__(extra, loop)
7667db96d56Sopenharmony_ci        self._extra['socket'] = trsock.TransportSocket(sock)
7677db96d56Sopenharmony_ci        try:
7687db96d56Sopenharmony_ci            self._extra['sockname'] = sock.getsockname()
7697db96d56Sopenharmony_ci        except OSError:
7707db96d56Sopenharmony_ci            self._extra['sockname'] = None
7717db96d56Sopenharmony_ci        if 'peername' not in self._extra:
7727db96d56Sopenharmony_ci            try:
7737db96d56Sopenharmony_ci                self._extra['peername'] = sock.getpeername()
7747db96d56Sopenharmony_ci            except socket.error:
7757db96d56Sopenharmony_ci                self._extra['peername'] = None
7767db96d56Sopenharmony_ci        self._sock = sock
7777db96d56Sopenharmony_ci        self._sock_fd = sock.fileno()
7787db96d56Sopenharmony_ci
7797db96d56Sopenharmony_ci        self._protocol_connected = False
7807db96d56Sopenharmony_ci        self.set_protocol(protocol)
7817db96d56Sopenharmony_ci
7827db96d56Sopenharmony_ci        self._server = server
7837db96d56Sopenharmony_ci        self._buffer = self._buffer_factory()
7847db96d56Sopenharmony_ci        self._conn_lost = 0  # Set when call to connection_lost scheduled.
7857db96d56Sopenharmony_ci        self._closing = False  # Set when close() called.
7867db96d56Sopenharmony_ci        self._paused = False  # Set when pause_reading() called
7877db96d56Sopenharmony_ci
7887db96d56Sopenharmony_ci        if self._server is not None:
7897db96d56Sopenharmony_ci            self._server._attach()
7907db96d56Sopenharmony_ci        loop._transports[self._sock_fd] = self
7917db96d56Sopenharmony_ci
7927db96d56Sopenharmony_ci    def __repr__(self):
7937db96d56Sopenharmony_ci        info = [self.__class__.__name__]
7947db96d56Sopenharmony_ci        if self._sock is None:
7957db96d56Sopenharmony_ci            info.append('closed')
7967db96d56Sopenharmony_ci        elif self._closing:
7977db96d56Sopenharmony_ci            info.append('closing')
7987db96d56Sopenharmony_ci        info.append(f'fd={self._sock_fd}')
7997db96d56Sopenharmony_ci        # test if the transport was closed
8007db96d56Sopenharmony_ci        if self._loop is not None and not self._loop.is_closed():
8017db96d56Sopenharmony_ci            polling = _test_selector_event(self._loop._selector,
8027db96d56Sopenharmony_ci                                           self._sock_fd, selectors.EVENT_READ)
8037db96d56Sopenharmony_ci            if polling:
8047db96d56Sopenharmony_ci                info.append('read=polling')
8057db96d56Sopenharmony_ci            else:
8067db96d56Sopenharmony_ci                info.append('read=idle')
8077db96d56Sopenharmony_ci
8087db96d56Sopenharmony_ci            polling = _test_selector_event(self._loop._selector,
8097db96d56Sopenharmony_ci                                           self._sock_fd,
8107db96d56Sopenharmony_ci                                           selectors.EVENT_WRITE)
8117db96d56Sopenharmony_ci            if polling:
8127db96d56Sopenharmony_ci                state = 'polling'
8137db96d56Sopenharmony_ci            else:
8147db96d56Sopenharmony_ci                state = 'idle'
8157db96d56Sopenharmony_ci
8167db96d56Sopenharmony_ci            bufsize = self.get_write_buffer_size()
8177db96d56Sopenharmony_ci            info.append(f'write=<{state}, bufsize={bufsize}>')
8187db96d56Sopenharmony_ci        return '<{}>'.format(' '.join(info))
8197db96d56Sopenharmony_ci
8207db96d56Sopenharmony_ci    def abort(self):
8217db96d56Sopenharmony_ci        self._force_close(None)
8227db96d56Sopenharmony_ci
8237db96d56Sopenharmony_ci    def set_protocol(self, protocol):
8247db96d56Sopenharmony_ci        self._protocol = protocol
8257db96d56Sopenharmony_ci        self._protocol_connected = True
8267db96d56Sopenharmony_ci
8277db96d56Sopenharmony_ci    def get_protocol(self):
8287db96d56Sopenharmony_ci        return self._protocol
8297db96d56Sopenharmony_ci
8307db96d56Sopenharmony_ci    def is_closing(self):
8317db96d56Sopenharmony_ci        return self._closing
8327db96d56Sopenharmony_ci
8337db96d56Sopenharmony_ci    def is_reading(self):
8347db96d56Sopenharmony_ci        return not self.is_closing() and not self._paused
8357db96d56Sopenharmony_ci
8367db96d56Sopenharmony_ci    def pause_reading(self):
8377db96d56Sopenharmony_ci        if not self.is_reading():
8387db96d56Sopenharmony_ci            return
8397db96d56Sopenharmony_ci        self._paused = True
8407db96d56Sopenharmony_ci        self._loop._remove_reader(self._sock_fd)
8417db96d56Sopenharmony_ci        if self._loop.get_debug():
8427db96d56Sopenharmony_ci            logger.debug("%r pauses reading", self)
8437db96d56Sopenharmony_ci
8447db96d56Sopenharmony_ci    def resume_reading(self):
8457db96d56Sopenharmony_ci        if self._closing or not self._paused:
8467db96d56Sopenharmony_ci            return
8477db96d56Sopenharmony_ci        self._paused = False
8487db96d56Sopenharmony_ci        self._add_reader(self._sock_fd, self._read_ready)
8497db96d56Sopenharmony_ci        if self._loop.get_debug():
8507db96d56Sopenharmony_ci            logger.debug("%r resumes reading", self)
8517db96d56Sopenharmony_ci
8527db96d56Sopenharmony_ci    def close(self):
8537db96d56Sopenharmony_ci        if self._closing:
8547db96d56Sopenharmony_ci            return
8557db96d56Sopenharmony_ci        self._closing = True
8567db96d56Sopenharmony_ci        self._loop._remove_reader(self._sock_fd)
8577db96d56Sopenharmony_ci        if not self._buffer:
8587db96d56Sopenharmony_ci            self._conn_lost += 1
8597db96d56Sopenharmony_ci            self._loop._remove_writer(self._sock_fd)
8607db96d56Sopenharmony_ci            self._loop.call_soon(self._call_connection_lost, None)
8617db96d56Sopenharmony_ci
8627db96d56Sopenharmony_ci    def __del__(self, _warn=warnings.warn):
8637db96d56Sopenharmony_ci        if self._sock is not None:
8647db96d56Sopenharmony_ci            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
8657db96d56Sopenharmony_ci            self._sock.close()
8667db96d56Sopenharmony_ci
8677db96d56Sopenharmony_ci    def _fatal_error(self, exc, message='Fatal error on transport'):
8687db96d56Sopenharmony_ci        # Should be called from exception handler only.
8697db96d56Sopenharmony_ci        if isinstance(exc, OSError):
8707db96d56Sopenharmony_ci            if self._loop.get_debug():
8717db96d56Sopenharmony_ci                logger.debug("%r: %s", self, message, exc_info=True)
8727db96d56Sopenharmony_ci        else:
8737db96d56Sopenharmony_ci            self._loop.call_exception_handler({
8747db96d56Sopenharmony_ci                'message': message,
8757db96d56Sopenharmony_ci                'exception': exc,
8767db96d56Sopenharmony_ci                'transport': self,
8777db96d56Sopenharmony_ci                'protocol': self._protocol,
8787db96d56Sopenharmony_ci            })
8797db96d56Sopenharmony_ci        self._force_close(exc)
8807db96d56Sopenharmony_ci
8817db96d56Sopenharmony_ci    def _force_close(self, exc):
8827db96d56Sopenharmony_ci        if self._conn_lost:
8837db96d56Sopenharmony_ci            return
8847db96d56Sopenharmony_ci        if self._buffer:
8857db96d56Sopenharmony_ci            self._buffer.clear()
8867db96d56Sopenharmony_ci            self._loop._remove_writer(self._sock_fd)
8877db96d56Sopenharmony_ci        if not self._closing:
8887db96d56Sopenharmony_ci            self._closing = True
8897db96d56Sopenharmony_ci            self._loop._remove_reader(self._sock_fd)
8907db96d56Sopenharmony_ci        self._conn_lost += 1
8917db96d56Sopenharmony_ci        self._loop.call_soon(self._call_connection_lost, exc)
8927db96d56Sopenharmony_ci
8937db96d56Sopenharmony_ci    def _call_connection_lost(self, exc):
8947db96d56Sopenharmony_ci        try:
8957db96d56Sopenharmony_ci            if self._protocol_connected:
8967db96d56Sopenharmony_ci                self._protocol.connection_lost(exc)
8977db96d56Sopenharmony_ci        finally:
8987db96d56Sopenharmony_ci            self._sock.close()
8997db96d56Sopenharmony_ci            self._sock = None
9007db96d56Sopenharmony_ci            self._protocol = None
9017db96d56Sopenharmony_ci            self._loop = None
9027db96d56Sopenharmony_ci            server = self._server
9037db96d56Sopenharmony_ci            if server is not None:
9047db96d56Sopenharmony_ci                server._detach()
9057db96d56Sopenharmony_ci                self._server = None
9067db96d56Sopenharmony_ci
9077db96d56Sopenharmony_ci    def get_write_buffer_size(self):
9087db96d56Sopenharmony_ci        return len(self._buffer)
9097db96d56Sopenharmony_ci
9107db96d56Sopenharmony_ci    def _add_reader(self, fd, callback, *args):
9117db96d56Sopenharmony_ci        if not self.is_reading():
9127db96d56Sopenharmony_ci            return
9137db96d56Sopenharmony_ci        self._loop._add_reader(fd, callback, *args)
9147db96d56Sopenharmony_ci
9157db96d56Sopenharmony_ci
9167db96d56Sopenharmony_ciclass _SelectorSocketTransport(_SelectorTransport):
9177db96d56Sopenharmony_ci
9187db96d56Sopenharmony_ci    _start_tls_compatible = True
9197db96d56Sopenharmony_ci    _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
9207db96d56Sopenharmony_ci
9217db96d56Sopenharmony_ci    def __init__(self, loop, sock, protocol, waiter=None,
9227db96d56Sopenharmony_ci                 extra=None, server=None):
9237db96d56Sopenharmony_ci
9247db96d56Sopenharmony_ci        self._read_ready_cb = None
9257db96d56Sopenharmony_ci        super().__init__(loop, sock, protocol, extra, server)
9267db96d56Sopenharmony_ci        self._eof = False
9277db96d56Sopenharmony_ci        self._empty_waiter = None
9287db96d56Sopenharmony_ci
9297db96d56Sopenharmony_ci        # Disable the Nagle algorithm -- small writes will be
9307db96d56Sopenharmony_ci        # sent without waiting for the TCP ACK.  This generally
9317db96d56Sopenharmony_ci        # decreases the latency (in some cases significantly.)
9327db96d56Sopenharmony_ci        base_events._set_nodelay(self._sock)
9337db96d56Sopenharmony_ci
9347db96d56Sopenharmony_ci        self._loop.call_soon(self._protocol.connection_made, self)
9357db96d56Sopenharmony_ci        # only start reading when connection_made() has been called
9367db96d56Sopenharmony_ci        self._loop.call_soon(self._add_reader,
9377db96d56Sopenharmony_ci                             self._sock_fd, self._read_ready)
9387db96d56Sopenharmony_ci        if waiter is not None:
9397db96d56Sopenharmony_ci            # only wake up the waiter when connection_made() has been called
9407db96d56Sopenharmony_ci            self._loop.call_soon(futures._set_result_unless_cancelled,
9417db96d56Sopenharmony_ci                                 waiter, None)
9427db96d56Sopenharmony_ci
9437db96d56Sopenharmony_ci    def set_protocol(self, protocol):
9447db96d56Sopenharmony_ci        if isinstance(protocol, protocols.BufferedProtocol):
9457db96d56Sopenharmony_ci            self._read_ready_cb = self._read_ready__get_buffer
9467db96d56Sopenharmony_ci        else:
9477db96d56Sopenharmony_ci            self._read_ready_cb = self._read_ready__data_received
9487db96d56Sopenharmony_ci
9497db96d56Sopenharmony_ci        super().set_protocol(protocol)
9507db96d56Sopenharmony_ci
9517db96d56Sopenharmony_ci    def _read_ready(self):
9527db96d56Sopenharmony_ci        self._read_ready_cb()
9537db96d56Sopenharmony_ci
9547db96d56Sopenharmony_ci    def _read_ready__get_buffer(self):
9557db96d56Sopenharmony_ci        if self._conn_lost:
9567db96d56Sopenharmony_ci            return
9577db96d56Sopenharmony_ci
9587db96d56Sopenharmony_ci        try:
9597db96d56Sopenharmony_ci            buf = self._protocol.get_buffer(-1)
9607db96d56Sopenharmony_ci            if not len(buf):
9617db96d56Sopenharmony_ci                raise RuntimeError('get_buffer() returned an empty buffer')
9627db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
9637db96d56Sopenharmony_ci            raise
9647db96d56Sopenharmony_ci        except BaseException as exc:
9657db96d56Sopenharmony_ci            self._fatal_error(
9667db96d56Sopenharmony_ci                exc, 'Fatal error: protocol.get_buffer() call failed.')
9677db96d56Sopenharmony_ci            return
9687db96d56Sopenharmony_ci
9697db96d56Sopenharmony_ci        try:
9707db96d56Sopenharmony_ci            nbytes = self._sock.recv_into(buf)
9717db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
9727db96d56Sopenharmony_ci            return
9737db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
9747db96d56Sopenharmony_ci            raise
9757db96d56Sopenharmony_ci        except BaseException as exc:
9767db96d56Sopenharmony_ci            self._fatal_error(exc, 'Fatal read error on socket transport')
9777db96d56Sopenharmony_ci            return
9787db96d56Sopenharmony_ci
9797db96d56Sopenharmony_ci        if not nbytes:
9807db96d56Sopenharmony_ci            self._read_ready__on_eof()
9817db96d56Sopenharmony_ci            return
9827db96d56Sopenharmony_ci
9837db96d56Sopenharmony_ci        try:
9847db96d56Sopenharmony_ci            self._protocol.buffer_updated(nbytes)
9857db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
9867db96d56Sopenharmony_ci            raise
9877db96d56Sopenharmony_ci        except BaseException as exc:
9887db96d56Sopenharmony_ci            self._fatal_error(
9897db96d56Sopenharmony_ci                exc, 'Fatal error: protocol.buffer_updated() call failed.')
9907db96d56Sopenharmony_ci
9917db96d56Sopenharmony_ci    def _read_ready__data_received(self):
9927db96d56Sopenharmony_ci        if self._conn_lost:
9937db96d56Sopenharmony_ci            return
9947db96d56Sopenharmony_ci        try:
9957db96d56Sopenharmony_ci            data = self._sock.recv(self.max_size)
9967db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
9977db96d56Sopenharmony_ci            return
9987db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
9997db96d56Sopenharmony_ci            raise
10007db96d56Sopenharmony_ci        except BaseException as exc:
10017db96d56Sopenharmony_ci            self._fatal_error(exc, 'Fatal read error on socket transport')
10027db96d56Sopenharmony_ci            return
10037db96d56Sopenharmony_ci
10047db96d56Sopenharmony_ci        if not data:
10057db96d56Sopenharmony_ci            self._read_ready__on_eof()
10067db96d56Sopenharmony_ci            return
10077db96d56Sopenharmony_ci
10087db96d56Sopenharmony_ci        try:
10097db96d56Sopenharmony_ci            self._protocol.data_received(data)
10107db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
10117db96d56Sopenharmony_ci            raise
10127db96d56Sopenharmony_ci        except BaseException as exc:
10137db96d56Sopenharmony_ci            self._fatal_error(
10147db96d56Sopenharmony_ci                exc, 'Fatal error: protocol.data_received() call failed.')
10157db96d56Sopenharmony_ci
10167db96d56Sopenharmony_ci    def _read_ready__on_eof(self):
10177db96d56Sopenharmony_ci        if self._loop.get_debug():
10187db96d56Sopenharmony_ci            logger.debug("%r received EOF", self)
10197db96d56Sopenharmony_ci
10207db96d56Sopenharmony_ci        try:
10217db96d56Sopenharmony_ci            keep_open = self._protocol.eof_received()
10227db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
10237db96d56Sopenharmony_ci            raise
10247db96d56Sopenharmony_ci        except BaseException as exc:
10257db96d56Sopenharmony_ci            self._fatal_error(
10267db96d56Sopenharmony_ci                exc, 'Fatal error: protocol.eof_received() call failed.')
10277db96d56Sopenharmony_ci            return
10287db96d56Sopenharmony_ci
10297db96d56Sopenharmony_ci        if keep_open:
10307db96d56Sopenharmony_ci            # We're keeping the connection open so the
10317db96d56Sopenharmony_ci            # protocol can write more, but we still can't
10327db96d56Sopenharmony_ci            # receive more, so remove the reader callback.
10337db96d56Sopenharmony_ci            self._loop._remove_reader(self._sock_fd)
10347db96d56Sopenharmony_ci        else:
10357db96d56Sopenharmony_ci            self.close()
10367db96d56Sopenharmony_ci
10377db96d56Sopenharmony_ci    def write(self, data):
10387db96d56Sopenharmony_ci        if not isinstance(data, (bytes, bytearray, memoryview)):
10397db96d56Sopenharmony_ci            raise TypeError(f'data argument must be a bytes-like object, '
10407db96d56Sopenharmony_ci                            f'not {type(data).__name__!r}')
10417db96d56Sopenharmony_ci        if self._eof:
10427db96d56Sopenharmony_ci            raise RuntimeError('Cannot call write() after write_eof()')
10437db96d56Sopenharmony_ci        if self._empty_waiter is not None:
10447db96d56Sopenharmony_ci            raise RuntimeError('unable to write; sendfile is in progress')
10457db96d56Sopenharmony_ci        if not data:
10467db96d56Sopenharmony_ci            return
10477db96d56Sopenharmony_ci
10487db96d56Sopenharmony_ci        if self._conn_lost:
10497db96d56Sopenharmony_ci            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
10507db96d56Sopenharmony_ci                logger.warning('socket.send() raised exception.')
10517db96d56Sopenharmony_ci            self._conn_lost += 1
10527db96d56Sopenharmony_ci            return
10537db96d56Sopenharmony_ci
10547db96d56Sopenharmony_ci        if not self._buffer:
10557db96d56Sopenharmony_ci            # Optimization: try to send now.
10567db96d56Sopenharmony_ci            try:
10577db96d56Sopenharmony_ci                n = self._sock.send(data)
10587db96d56Sopenharmony_ci            except (BlockingIOError, InterruptedError):
10597db96d56Sopenharmony_ci                pass
10607db96d56Sopenharmony_ci            except (SystemExit, KeyboardInterrupt):
10617db96d56Sopenharmony_ci                raise
10627db96d56Sopenharmony_ci            except BaseException as exc:
10637db96d56Sopenharmony_ci                self._fatal_error(exc, 'Fatal write error on socket transport')
10647db96d56Sopenharmony_ci                return
10657db96d56Sopenharmony_ci            else:
10667db96d56Sopenharmony_ci                data = data[n:]
10677db96d56Sopenharmony_ci                if not data:
10687db96d56Sopenharmony_ci                    return
10697db96d56Sopenharmony_ci            # Not all was written; register write handler.
10707db96d56Sopenharmony_ci            self._loop._add_writer(self._sock_fd, self._write_ready)
10717db96d56Sopenharmony_ci
10727db96d56Sopenharmony_ci        # Add it to the buffer.
10737db96d56Sopenharmony_ci        self._buffer.extend(data)
10747db96d56Sopenharmony_ci        self._maybe_pause_protocol()
10757db96d56Sopenharmony_ci
10767db96d56Sopenharmony_ci    def _write_ready(self):
10777db96d56Sopenharmony_ci        assert self._buffer, 'Data should not be empty'
10787db96d56Sopenharmony_ci
10797db96d56Sopenharmony_ci        if self._conn_lost:
10807db96d56Sopenharmony_ci            return
10817db96d56Sopenharmony_ci        try:
10827db96d56Sopenharmony_ci            n = self._sock.send(self._buffer)
10837db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
10847db96d56Sopenharmony_ci            pass
10857db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
10867db96d56Sopenharmony_ci            raise
10877db96d56Sopenharmony_ci        except BaseException as exc:
10887db96d56Sopenharmony_ci            self._loop._remove_writer(self._sock_fd)
10897db96d56Sopenharmony_ci            self._buffer.clear()
10907db96d56Sopenharmony_ci            self._fatal_error(exc, 'Fatal write error on socket transport')
10917db96d56Sopenharmony_ci            if self._empty_waiter is not None:
10927db96d56Sopenharmony_ci                self._empty_waiter.set_exception(exc)
10937db96d56Sopenharmony_ci        else:
10947db96d56Sopenharmony_ci            if n:
10957db96d56Sopenharmony_ci                del self._buffer[:n]
10967db96d56Sopenharmony_ci            self._maybe_resume_protocol()  # May append to buffer.
10977db96d56Sopenharmony_ci            if not self._buffer:
10987db96d56Sopenharmony_ci                self._loop._remove_writer(self._sock_fd)
10997db96d56Sopenharmony_ci                if self._empty_waiter is not None:
11007db96d56Sopenharmony_ci                    self._empty_waiter.set_result(None)
11017db96d56Sopenharmony_ci                if self._closing:
11027db96d56Sopenharmony_ci                    self._call_connection_lost(None)
11037db96d56Sopenharmony_ci                elif self._eof:
11047db96d56Sopenharmony_ci                    self._sock.shutdown(socket.SHUT_WR)
11057db96d56Sopenharmony_ci
11067db96d56Sopenharmony_ci    def write_eof(self):
11077db96d56Sopenharmony_ci        if self._closing or self._eof:
11087db96d56Sopenharmony_ci            return
11097db96d56Sopenharmony_ci        self._eof = True
11107db96d56Sopenharmony_ci        if not self._buffer:
11117db96d56Sopenharmony_ci            self._sock.shutdown(socket.SHUT_WR)
11127db96d56Sopenharmony_ci
11137db96d56Sopenharmony_ci    def can_write_eof(self):
11147db96d56Sopenharmony_ci        return True
11157db96d56Sopenharmony_ci
11167db96d56Sopenharmony_ci    def _call_connection_lost(self, exc):
11177db96d56Sopenharmony_ci        super()._call_connection_lost(exc)
11187db96d56Sopenharmony_ci        if self._empty_waiter is not None:
11197db96d56Sopenharmony_ci            self._empty_waiter.set_exception(
11207db96d56Sopenharmony_ci                ConnectionError("Connection is closed by peer"))
11217db96d56Sopenharmony_ci
11227db96d56Sopenharmony_ci    def _make_empty_waiter(self):
11237db96d56Sopenharmony_ci        if self._empty_waiter is not None:
11247db96d56Sopenharmony_ci            raise RuntimeError("Empty waiter is already set")
11257db96d56Sopenharmony_ci        self._empty_waiter = self._loop.create_future()
11267db96d56Sopenharmony_ci        if not self._buffer:
11277db96d56Sopenharmony_ci            self._empty_waiter.set_result(None)
11287db96d56Sopenharmony_ci        return self._empty_waiter
11297db96d56Sopenharmony_ci
11307db96d56Sopenharmony_ci    def _reset_empty_waiter(self):
11317db96d56Sopenharmony_ci        self._empty_waiter = None
11327db96d56Sopenharmony_ci
11337db96d56Sopenharmony_ci
11347db96d56Sopenharmony_ciclass _SelectorDatagramTransport(_SelectorTransport):
11357db96d56Sopenharmony_ci
11367db96d56Sopenharmony_ci    _buffer_factory = collections.deque
11377db96d56Sopenharmony_ci
11387db96d56Sopenharmony_ci    def __init__(self, loop, sock, protocol, address=None,
11397db96d56Sopenharmony_ci                 waiter=None, extra=None):
11407db96d56Sopenharmony_ci        super().__init__(loop, sock, protocol, extra)
11417db96d56Sopenharmony_ci        self._address = address
11427db96d56Sopenharmony_ci        self._buffer_size = 0
11437db96d56Sopenharmony_ci        self._loop.call_soon(self._protocol.connection_made, self)
11447db96d56Sopenharmony_ci        # only start reading when connection_made() has been called
11457db96d56Sopenharmony_ci        self._loop.call_soon(self._add_reader,
11467db96d56Sopenharmony_ci                             self._sock_fd, self._read_ready)
11477db96d56Sopenharmony_ci        if waiter is not None:
11487db96d56Sopenharmony_ci            # only wake up the waiter when connection_made() has been called
11497db96d56Sopenharmony_ci            self._loop.call_soon(futures._set_result_unless_cancelled,
11507db96d56Sopenharmony_ci                                 waiter, None)
11517db96d56Sopenharmony_ci
11527db96d56Sopenharmony_ci    def get_write_buffer_size(self):
11537db96d56Sopenharmony_ci        return self._buffer_size
11547db96d56Sopenharmony_ci
11557db96d56Sopenharmony_ci    def _read_ready(self):
11567db96d56Sopenharmony_ci        if self._conn_lost:
11577db96d56Sopenharmony_ci            return
11587db96d56Sopenharmony_ci        try:
11597db96d56Sopenharmony_ci            data, addr = self._sock.recvfrom(self.max_size)
11607db96d56Sopenharmony_ci        except (BlockingIOError, InterruptedError):
11617db96d56Sopenharmony_ci            pass
11627db96d56Sopenharmony_ci        except OSError as exc:
11637db96d56Sopenharmony_ci            self._protocol.error_received(exc)
11647db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
11657db96d56Sopenharmony_ci            raise
11667db96d56Sopenharmony_ci        except BaseException as exc:
11677db96d56Sopenharmony_ci            self._fatal_error(exc, 'Fatal read error on datagram transport')
11687db96d56Sopenharmony_ci        else:
11697db96d56Sopenharmony_ci            self._protocol.datagram_received(data, addr)
11707db96d56Sopenharmony_ci
11717db96d56Sopenharmony_ci    def sendto(self, data, addr=None):
11727db96d56Sopenharmony_ci        if not isinstance(data, (bytes, bytearray, memoryview)):
11737db96d56Sopenharmony_ci            raise TypeError(f'data argument must be a bytes-like object, '
11747db96d56Sopenharmony_ci                            f'not {type(data).__name__!r}')
11757db96d56Sopenharmony_ci        if not data:
11767db96d56Sopenharmony_ci            return
11777db96d56Sopenharmony_ci
11787db96d56Sopenharmony_ci        if self._address:
11797db96d56Sopenharmony_ci            if addr not in (None, self._address):
11807db96d56Sopenharmony_ci                raise ValueError(
11817db96d56Sopenharmony_ci                    f'Invalid address: must be None or {self._address}')
11827db96d56Sopenharmony_ci            addr = self._address
11837db96d56Sopenharmony_ci
11847db96d56Sopenharmony_ci        if self._conn_lost and self._address:
11857db96d56Sopenharmony_ci            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
11867db96d56Sopenharmony_ci                logger.warning('socket.send() raised exception.')
11877db96d56Sopenharmony_ci            self._conn_lost += 1
11887db96d56Sopenharmony_ci            return
11897db96d56Sopenharmony_ci
11907db96d56Sopenharmony_ci        if not self._buffer:
11917db96d56Sopenharmony_ci            # Attempt to send it right away first.
11927db96d56Sopenharmony_ci            try:
11937db96d56Sopenharmony_ci                if self._extra['peername']:
11947db96d56Sopenharmony_ci                    self._sock.send(data)
11957db96d56Sopenharmony_ci                else:
11967db96d56Sopenharmony_ci                    self._sock.sendto(data, addr)
11977db96d56Sopenharmony_ci                return
11987db96d56Sopenharmony_ci            except (BlockingIOError, InterruptedError):
11997db96d56Sopenharmony_ci                self._loop._add_writer(self._sock_fd, self._sendto_ready)
12007db96d56Sopenharmony_ci            except OSError as exc:
12017db96d56Sopenharmony_ci                self._protocol.error_received(exc)
12027db96d56Sopenharmony_ci                return
12037db96d56Sopenharmony_ci            except (SystemExit, KeyboardInterrupt):
12047db96d56Sopenharmony_ci                raise
12057db96d56Sopenharmony_ci            except BaseException as exc:
12067db96d56Sopenharmony_ci                self._fatal_error(
12077db96d56Sopenharmony_ci                    exc, 'Fatal write error on datagram transport')
12087db96d56Sopenharmony_ci                return
12097db96d56Sopenharmony_ci
12107db96d56Sopenharmony_ci        # Ensure that what we buffer is immutable.
12117db96d56Sopenharmony_ci        self._buffer.append((bytes(data), addr))
12127db96d56Sopenharmony_ci        self._buffer_size += len(data)
12137db96d56Sopenharmony_ci        self._maybe_pause_protocol()
12147db96d56Sopenharmony_ci
12157db96d56Sopenharmony_ci    def _sendto_ready(self):
12167db96d56Sopenharmony_ci        while self._buffer:
12177db96d56Sopenharmony_ci            data, addr = self._buffer.popleft()
12187db96d56Sopenharmony_ci            self._buffer_size -= len(data)
12197db96d56Sopenharmony_ci            try:
12207db96d56Sopenharmony_ci                if self._extra['peername']:
12217db96d56Sopenharmony_ci                    self._sock.send(data)
12227db96d56Sopenharmony_ci                else:
12237db96d56Sopenharmony_ci                    self._sock.sendto(data, addr)
12247db96d56Sopenharmony_ci            except (BlockingIOError, InterruptedError):
12257db96d56Sopenharmony_ci                self._buffer.appendleft((data, addr))  # Try again later.
12267db96d56Sopenharmony_ci                self._buffer_size += len(data)
12277db96d56Sopenharmony_ci                break
12287db96d56Sopenharmony_ci            except OSError as exc:
12297db96d56Sopenharmony_ci                self._protocol.error_received(exc)
12307db96d56Sopenharmony_ci                return
12317db96d56Sopenharmony_ci            except (SystemExit, KeyboardInterrupt):
12327db96d56Sopenharmony_ci                raise
12337db96d56Sopenharmony_ci            except BaseException as exc:
12347db96d56Sopenharmony_ci                self._fatal_error(
12357db96d56Sopenharmony_ci                    exc, 'Fatal write error on datagram transport')
12367db96d56Sopenharmony_ci                return
12377db96d56Sopenharmony_ci
12387db96d56Sopenharmony_ci        self._maybe_resume_protocol()  # May append to buffer.
12397db96d56Sopenharmony_ci        if not self._buffer:
12407db96d56Sopenharmony_ci            self._loop._remove_writer(self._sock_fd)
12417db96d56Sopenharmony_ci            if self._closing:
12427db96d56Sopenharmony_ci                self._call_connection_lost(None)
1243