17db96d56Sopenharmony_ci"""Event loop using a selector and related classes. 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciA selector is a "notify-when-ready" multiplexer. For a subclass which 47db96d56Sopenharmony_cialso includes support for signal handling, see the unix_events sub-module. 57db96d56Sopenharmony_ci""" 67db96d56Sopenharmony_ci 77db96d56Sopenharmony_ci__all__ = 'BaseSelectorEventLoop', 87db96d56Sopenharmony_ci 97db96d56Sopenharmony_ciimport collections 107db96d56Sopenharmony_ciimport errno 117db96d56Sopenharmony_ciimport functools 127db96d56Sopenharmony_ciimport selectors 137db96d56Sopenharmony_ciimport socket 147db96d56Sopenharmony_ciimport warnings 157db96d56Sopenharmony_ciimport weakref 167db96d56Sopenharmony_citry: 177db96d56Sopenharmony_ci import ssl 187db96d56Sopenharmony_ciexcept ImportError: # pragma: no cover 197db96d56Sopenharmony_ci ssl = None 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_cifrom . import base_events 227db96d56Sopenharmony_cifrom . import constants 237db96d56Sopenharmony_cifrom . import events 247db96d56Sopenharmony_cifrom . import futures 257db96d56Sopenharmony_cifrom . import protocols 267db96d56Sopenharmony_cifrom . import sslproto 277db96d56Sopenharmony_cifrom . import transports 287db96d56Sopenharmony_cifrom . import trsock 297db96d56Sopenharmony_cifrom .log import logger 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_cidef _test_selector_event(selector, fd, event): 337db96d56Sopenharmony_ci # Test if the selector is monitoring 'event' events 347db96d56Sopenharmony_ci # for the file descriptor 'fd'. 357db96d56Sopenharmony_ci try: 367db96d56Sopenharmony_ci key = selector.get_key(fd) 377db96d56Sopenharmony_ci except KeyError: 387db96d56Sopenharmony_ci return False 397db96d56Sopenharmony_ci else: 407db96d56Sopenharmony_ci return bool(key.events & event) 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_ciclass BaseSelectorEventLoop(base_events.BaseEventLoop): 447db96d56Sopenharmony_ci """Selector event loop. 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_ci See events.EventLoop for API specification. 477db96d56Sopenharmony_ci """ 487db96d56Sopenharmony_ci 497db96d56Sopenharmony_ci def __init__(self, selector=None): 507db96d56Sopenharmony_ci super().__init__() 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ci if selector is None: 537db96d56Sopenharmony_ci selector = selectors.DefaultSelector() 547db96d56Sopenharmony_ci logger.debug('Using selector: %s', selector.__class__.__name__) 557db96d56Sopenharmony_ci self._selector = selector 567db96d56Sopenharmony_ci self._make_self_pipe() 577db96d56Sopenharmony_ci self._transports = weakref.WeakValueDictionary() 587db96d56Sopenharmony_ci 597db96d56Sopenharmony_ci def _make_socket_transport(self, sock, protocol, waiter=None, *, 607db96d56Sopenharmony_ci extra=None, server=None): 617db96d56Sopenharmony_ci return _SelectorSocketTransport(self, sock, protocol, waiter, 627db96d56Sopenharmony_ci extra, server) 637db96d56Sopenharmony_ci 647db96d56Sopenharmony_ci def _make_ssl_transport( 657db96d56Sopenharmony_ci self, rawsock, protocol, sslcontext, waiter=None, 667db96d56Sopenharmony_ci *, server_side=False, server_hostname=None, 677db96d56Sopenharmony_ci extra=None, server=None, 687db96d56Sopenharmony_ci ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 697db96d56Sopenharmony_ci ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, 707db96d56Sopenharmony_ci ): 717db96d56Sopenharmony_ci ssl_protocol = sslproto.SSLProtocol( 727db96d56Sopenharmony_ci self, protocol, sslcontext, waiter, 737db96d56Sopenharmony_ci server_side, server_hostname, 747db96d56Sopenharmony_ci ssl_handshake_timeout=ssl_handshake_timeout, 757db96d56Sopenharmony_ci ssl_shutdown_timeout=ssl_shutdown_timeout 767db96d56Sopenharmony_ci ) 777db96d56Sopenharmony_ci _SelectorSocketTransport(self, rawsock, ssl_protocol, 787db96d56Sopenharmony_ci extra=extra, server=server) 797db96d56Sopenharmony_ci return ssl_protocol._app_transport 807db96d56Sopenharmony_ci 817db96d56Sopenharmony_ci def _make_datagram_transport(self, sock, protocol, 827db96d56Sopenharmony_ci address=None, waiter=None, extra=None): 837db96d56Sopenharmony_ci return _SelectorDatagramTransport(self, sock, protocol, 847db96d56Sopenharmony_ci address, waiter, extra) 857db96d56Sopenharmony_ci 867db96d56Sopenharmony_ci def close(self): 877db96d56Sopenharmony_ci if self.is_running(): 887db96d56Sopenharmony_ci raise RuntimeError("Cannot close a running event loop") 897db96d56Sopenharmony_ci if self.is_closed(): 907db96d56Sopenharmony_ci return 917db96d56Sopenharmony_ci self._close_self_pipe() 927db96d56Sopenharmony_ci super().close() 937db96d56Sopenharmony_ci if self._selector is not None: 947db96d56Sopenharmony_ci self._selector.close() 957db96d56Sopenharmony_ci self._selector = None 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_ci def _close_self_pipe(self): 987db96d56Sopenharmony_ci self._remove_reader(self._ssock.fileno()) 997db96d56Sopenharmony_ci self._ssock.close() 1007db96d56Sopenharmony_ci self._ssock = None 1017db96d56Sopenharmony_ci self._csock.close() 1027db96d56Sopenharmony_ci self._csock = None 1037db96d56Sopenharmony_ci self._internal_fds -= 1 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_ci def _make_self_pipe(self): 1067db96d56Sopenharmony_ci # A self-socket, really. :-) 1077db96d56Sopenharmony_ci self._ssock, self._csock = socket.socketpair() 1087db96d56Sopenharmony_ci self._ssock.setblocking(False) 1097db96d56Sopenharmony_ci self._csock.setblocking(False) 1107db96d56Sopenharmony_ci self._internal_fds += 1 1117db96d56Sopenharmony_ci self._add_reader(self._ssock.fileno(), self._read_from_self) 1127db96d56Sopenharmony_ci 1137db96d56Sopenharmony_ci def _process_self_data(self, data): 1147db96d56Sopenharmony_ci pass 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci def _read_from_self(self): 1177db96d56Sopenharmony_ci while True: 1187db96d56Sopenharmony_ci try: 1197db96d56Sopenharmony_ci data = self._ssock.recv(4096) 1207db96d56Sopenharmony_ci if not data: 1217db96d56Sopenharmony_ci break 1227db96d56Sopenharmony_ci self._process_self_data(data) 1237db96d56Sopenharmony_ci except InterruptedError: 1247db96d56Sopenharmony_ci continue 1257db96d56Sopenharmony_ci except BlockingIOError: 1267db96d56Sopenharmony_ci break 1277db96d56Sopenharmony_ci 1287db96d56Sopenharmony_ci def _write_to_self(self): 1297db96d56Sopenharmony_ci # This may be called from a different thread, possibly after 1307db96d56Sopenharmony_ci # _close_self_pipe() has been called or even while it is 1317db96d56Sopenharmony_ci # running. Guard for self._csock being None or closed. When 1327db96d56Sopenharmony_ci # a socket is closed, send() raises OSError (with errno set to 1337db96d56Sopenharmony_ci # EBADF, but let's not rely on the exact error code). 1347db96d56Sopenharmony_ci csock = self._csock 1357db96d56Sopenharmony_ci if csock is None: 1367db96d56Sopenharmony_ci return 1377db96d56Sopenharmony_ci 1387db96d56Sopenharmony_ci try: 1397db96d56Sopenharmony_ci csock.send(b'\0') 1407db96d56Sopenharmony_ci except OSError: 1417db96d56Sopenharmony_ci if self._debug: 1427db96d56Sopenharmony_ci logger.debug("Fail to write a null byte into the " 1437db96d56Sopenharmony_ci "self-pipe socket", 1447db96d56Sopenharmony_ci exc_info=True) 1457db96d56Sopenharmony_ci 1467db96d56Sopenharmony_ci def _start_serving(self, protocol_factory, sock, 1477db96d56Sopenharmony_ci sslcontext=None, server=None, backlog=100, 1487db96d56Sopenharmony_ci ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 1497db96d56Sopenharmony_ci ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT): 1507db96d56Sopenharmony_ci self._add_reader(sock.fileno(), self._accept_connection, 1517db96d56Sopenharmony_ci protocol_factory, sock, sslcontext, server, backlog, 1527db96d56Sopenharmony_ci ssl_handshake_timeout, ssl_shutdown_timeout) 1537db96d56Sopenharmony_ci 1547db96d56Sopenharmony_ci def _accept_connection( 1557db96d56Sopenharmony_ci self, protocol_factory, sock, 1567db96d56Sopenharmony_ci sslcontext=None, server=None, backlog=100, 1577db96d56Sopenharmony_ci ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 1587db96d56Sopenharmony_ci ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT): 1597db96d56Sopenharmony_ci # This method is only called once for each event loop tick where the 1607db96d56Sopenharmony_ci # listening socket has triggered an EVENT_READ. There may be multiple 1617db96d56Sopenharmony_ci # connections waiting for an .accept() so it is called in a loop. 1627db96d56Sopenharmony_ci # See https://bugs.python.org/issue27906 for more details. 1637db96d56Sopenharmony_ci for _ in range(backlog): 1647db96d56Sopenharmony_ci try: 1657db96d56Sopenharmony_ci conn, addr = sock.accept() 1667db96d56Sopenharmony_ci if self._debug: 1677db96d56Sopenharmony_ci logger.debug("%r got a new connection from %r: %r", 1687db96d56Sopenharmony_ci server, addr, conn) 1697db96d56Sopenharmony_ci conn.setblocking(False) 1707db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError, ConnectionAbortedError): 1717db96d56Sopenharmony_ci # Early exit because the socket accept buffer is empty. 1727db96d56Sopenharmony_ci return None 1737db96d56Sopenharmony_ci except OSError as exc: 1747db96d56Sopenharmony_ci # There's nowhere to send the error, so just log it. 1757db96d56Sopenharmony_ci if exc.errno in (errno.EMFILE, errno.ENFILE, 1767db96d56Sopenharmony_ci errno.ENOBUFS, errno.ENOMEM): 1777db96d56Sopenharmony_ci # Some platforms (e.g. Linux keep reporting the FD as 1787db96d56Sopenharmony_ci # ready, so we remove the read handler temporarily. 1797db96d56Sopenharmony_ci # We'll try again in a while. 1807db96d56Sopenharmony_ci self.call_exception_handler({ 1817db96d56Sopenharmony_ci 'message': 'socket.accept() out of system resource', 1827db96d56Sopenharmony_ci 'exception': exc, 1837db96d56Sopenharmony_ci 'socket': trsock.TransportSocket(sock), 1847db96d56Sopenharmony_ci }) 1857db96d56Sopenharmony_ci self._remove_reader(sock.fileno()) 1867db96d56Sopenharmony_ci self.call_later(constants.ACCEPT_RETRY_DELAY, 1877db96d56Sopenharmony_ci self._start_serving, 1887db96d56Sopenharmony_ci protocol_factory, sock, sslcontext, server, 1897db96d56Sopenharmony_ci backlog, ssl_handshake_timeout, 1907db96d56Sopenharmony_ci ssl_shutdown_timeout) 1917db96d56Sopenharmony_ci else: 1927db96d56Sopenharmony_ci raise # The event loop will catch, log and ignore it. 1937db96d56Sopenharmony_ci else: 1947db96d56Sopenharmony_ci extra = {'peername': addr} 1957db96d56Sopenharmony_ci accept = self._accept_connection2( 1967db96d56Sopenharmony_ci protocol_factory, conn, extra, sslcontext, server, 1977db96d56Sopenharmony_ci ssl_handshake_timeout, ssl_shutdown_timeout) 1987db96d56Sopenharmony_ci self.create_task(accept) 1997db96d56Sopenharmony_ci 2007db96d56Sopenharmony_ci async def _accept_connection2( 2017db96d56Sopenharmony_ci self, protocol_factory, conn, extra, 2027db96d56Sopenharmony_ci sslcontext=None, server=None, 2037db96d56Sopenharmony_ci ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 2047db96d56Sopenharmony_ci ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT): 2057db96d56Sopenharmony_ci protocol = None 2067db96d56Sopenharmony_ci transport = None 2077db96d56Sopenharmony_ci try: 2087db96d56Sopenharmony_ci protocol = protocol_factory() 2097db96d56Sopenharmony_ci waiter = self.create_future() 2107db96d56Sopenharmony_ci if sslcontext: 2117db96d56Sopenharmony_ci transport = self._make_ssl_transport( 2127db96d56Sopenharmony_ci conn, protocol, sslcontext, waiter=waiter, 2137db96d56Sopenharmony_ci server_side=True, extra=extra, server=server, 2147db96d56Sopenharmony_ci ssl_handshake_timeout=ssl_handshake_timeout, 2157db96d56Sopenharmony_ci ssl_shutdown_timeout=ssl_shutdown_timeout) 2167db96d56Sopenharmony_ci else: 2177db96d56Sopenharmony_ci transport = self._make_socket_transport( 2187db96d56Sopenharmony_ci conn, protocol, waiter=waiter, extra=extra, 2197db96d56Sopenharmony_ci server=server) 2207db96d56Sopenharmony_ci 2217db96d56Sopenharmony_ci try: 2227db96d56Sopenharmony_ci await waiter 2237db96d56Sopenharmony_ci except BaseException: 2247db96d56Sopenharmony_ci transport.close() 2257db96d56Sopenharmony_ci raise 2267db96d56Sopenharmony_ci # It's now up to the protocol to handle the connection. 2277db96d56Sopenharmony_ci 2287db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 2297db96d56Sopenharmony_ci raise 2307db96d56Sopenharmony_ci except BaseException as exc: 2317db96d56Sopenharmony_ci if self._debug: 2327db96d56Sopenharmony_ci context = { 2337db96d56Sopenharmony_ci 'message': 2347db96d56Sopenharmony_ci 'Error on transport creation for incoming connection', 2357db96d56Sopenharmony_ci 'exception': exc, 2367db96d56Sopenharmony_ci } 2377db96d56Sopenharmony_ci if protocol is not None: 2387db96d56Sopenharmony_ci context['protocol'] = protocol 2397db96d56Sopenharmony_ci if transport is not None: 2407db96d56Sopenharmony_ci context['transport'] = transport 2417db96d56Sopenharmony_ci self.call_exception_handler(context) 2427db96d56Sopenharmony_ci 2437db96d56Sopenharmony_ci def _ensure_fd_no_transport(self, fd): 2447db96d56Sopenharmony_ci fileno = fd 2457db96d56Sopenharmony_ci if not isinstance(fileno, int): 2467db96d56Sopenharmony_ci try: 2477db96d56Sopenharmony_ci fileno = int(fileno.fileno()) 2487db96d56Sopenharmony_ci except (AttributeError, TypeError, ValueError): 2497db96d56Sopenharmony_ci # This code matches selectors._fileobj_to_fd function. 2507db96d56Sopenharmony_ci raise ValueError(f"Invalid file object: {fd!r}") from None 2517db96d56Sopenharmony_ci try: 2527db96d56Sopenharmony_ci transport = self._transports[fileno] 2537db96d56Sopenharmony_ci except KeyError: 2547db96d56Sopenharmony_ci pass 2557db96d56Sopenharmony_ci else: 2567db96d56Sopenharmony_ci if not transport.is_closing(): 2577db96d56Sopenharmony_ci raise RuntimeError( 2587db96d56Sopenharmony_ci f'File descriptor {fd!r} is used by transport ' 2597db96d56Sopenharmony_ci f'{transport!r}') 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ci def _add_reader(self, fd, callback, *args): 2627db96d56Sopenharmony_ci self._check_closed() 2637db96d56Sopenharmony_ci handle = events.Handle(callback, args, self, None) 2647db96d56Sopenharmony_ci try: 2657db96d56Sopenharmony_ci key = self._selector.get_key(fd) 2667db96d56Sopenharmony_ci except KeyError: 2677db96d56Sopenharmony_ci self._selector.register(fd, selectors.EVENT_READ, 2687db96d56Sopenharmony_ci (handle, None)) 2697db96d56Sopenharmony_ci else: 2707db96d56Sopenharmony_ci mask, (reader, writer) = key.events, key.data 2717db96d56Sopenharmony_ci self._selector.modify(fd, mask | selectors.EVENT_READ, 2727db96d56Sopenharmony_ci (handle, writer)) 2737db96d56Sopenharmony_ci if reader is not None: 2747db96d56Sopenharmony_ci reader.cancel() 2757db96d56Sopenharmony_ci return handle 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci def _remove_reader(self, fd): 2787db96d56Sopenharmony_ci if self.is_closed(): 2797db96d56Sopenharmony_ci return False 2807db96d56Sopenharmony_ci try: 2817db96d56Sopenharmony_ci key = self._selector.get_key(fd) 2827db96d56Sopenharmony_ci except KeyError: 2837db96d56Sopenharmony_ci return False 2847db96d56Sopenharmony_ci else: 2857db96d56Sopenharmony_ci mask, (reader, writer) = key.events, key.data 2867db96d56Sopenharmony_ci mask &= ~selectors.EVENT_READ 2877db96d56Sopenharmony_ci if not mask: 2887db96d56Sopenharmony_ci self._selector.unregister(fd) 2897db96d56Sopenharmony_ci else: 2907db96d56Sopenharmony_ci self._selector.modify(fd, mask, (None, writer)) 2917db96d56Sopenharmony_ci 2927db96d56Sopenharmony_ci if reader is not None: 2937db96d56Sopenharmony_ci reader.cancel() 2947db96d56Sopenharmony_ci return True 2957db96d56Sopenharmony_ci else: 2967db96d56Sopenharmony_ci return False 2977db96d56Sopenharmony_ci 2987db96d56Sopenharmony_ci def _add_writer(self, fd, callback, *args): 2997db96d56Sopenharmony_ci self._check_closed() 3007db96d56Sopenharmony_ci handle = events.Handle(callback, args, self, None) 3017db96d56Sopenharmony_ci try: 3027db96d56Sopenharmony_ci key = self._selector.get_key(fd) 3037db96d56Sopenharmony_ci except KeyError: 3047db96d56Sopenharmony_ci self._selector.register(fd, selectors.EVENT_WRITE, 3057db96d56Sopenharmony_ci (None, handle)) 3067db96d56Sopenharmony_ci else: 3077db96d56Sopenharmony_ci mask, (reader, writer) = key.events, key.data 3087db96d56Sopenharmony_ci self._selector.modify(fd, mask | selectors.EVENT_WRITE, 3097db96d56Sopenharmony_ci (reader, handle)) 3107db96d56Sopenharmony_ci if writer is not None: 3117db96d56Sopenharmony_ci writer.cancel() 3127db96d56Sopenharmony_ci return handle 3137db96d56Sopenharmony_ci 3147db96d56Sopenharmony_ci def _remove_writer(self, fd): 3157db96d56Sopenharmony_ci """Remove a writer callback.""" 3167db96d56Sopenharmony_ci if self.is_closed(): 3177db96d56Sopenharmony_ci return False 3187db96d56Sopenharmony_ci try: 3197db96d56Sopenharmony_ci key = self._selector.get_key(fd) 3207db96d56Sopenharmony_ci except KeyError: 3217db96d56Sopenharmony_ci return False 3227db96d56Sopenharmony_ci else: 3237db96d56Sopenharmony_ci mask, (reader, writer) = key.events, key.data 3247db96d56Sopenharmony_ci # Remove both writer and connector. 3257db96d56Sopenharmony_ci mask &= ~selectors.EVENT_WRITE 3267db96d56Sopenharmony_ci if not mask: 3277db96d56Sopenharmony_ci self._selector.unregister(fd) 3287db96d56Sopenharmony_ci else: 3297db96d56Sopenharmony_ci self._selector.modify(fd, mask, (reader, None)) 3307db96d56Sopenharmony_ci 3317db96d56Sopenharmony_ci if writer is not None: 3327db96d56Sopenharmony_ci writer.cancel() 3337db96d56Sopenharmony_ci return True 3347db96d56Sopenharmony_ci else: 3357db96d56Sopenharmony_ci return False 3367db96d56Sopenharmony_ci 3377db96d56Sopenharmony_ci def add_reader(self, fd, callback, *args): 3387db96d56Sopenharmony_ci """Add a reader callback.""" 3397db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 3407db96d56Sopenharmony_ci self._add_reader(fd, callback, *args) 3417db96d56Sopenharmony_ci 3427db96d56Sopenharmony_ci def remove_reader(self, fd): 3437db96d56Sopenharmony_ci """Remove a reader callback.""" 3447db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 3457db96d56Sopenharmony_ci return self._remove_reader(fd) 3467db96d56Sopenharmony_ci 3477db96d56Sopenharmony_ci def add_writer(self, fd, callback, *args): 3487db96d56Sopenharmony_ci """Add a writer callback..""" 3497db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 3507db96d56Sopenharmony_ci self._add_writer(fd, callback, *args) 3517db96d56Sopenharmony_ci 3527db96d56Sopenharmony_ci def remove_writer(self, fd): 3537db96d56Sopenharmony_ci """Remove a writer callback.""" 3547db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 3557db96d56Sopenharmony_ci return self._remove_writer(fd) 3567db96d56Sopenharmony_ci 3577db96d56Sopenharmony_ci async def sock_recv(self, sock, n): 3587db96d56Sopenharmony_ci """Receive data from the socket. 3597db96d56Sopenharmony_ci 3607db96d56Sopenharmony_ci The return value is a bytes object representing the data received. 3617db96d56Sopenharmony_ci The maximum amount of data to be received at once is specified by 3627db96d56Sopenharmony_ci nbytes. 3637db96d56Sopenharmony_ci """ 3647db96d56Sopenharmony_ci base_events._check_ssl_socket(sock) 3657db96d56Sopenharmony_ci if self._debug and sock.gettimeout() != 0: 3667db96d56Sopenharmony_ci raise ValueError("the socket must be non-blocking") 3677db96d56Sopenharmony_ci try: 3687db96d56Sopenharmony_ci return sock.recv(n) 3697db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 3707db96d56Sopenharmony_ci pass 3717db96d56Sopenharmony_ci fut = self.create_future() 3727db96d56Sopenharmony_ci fd = sock.fileno() 3737db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 3747db96d56Sopenharmony_ci handle = self._add_reader(fd, self._sock_recv, fut, sock, n) 3757db96d56Sopenharmony_ci fut.add_done_callback( 3767db96d56Sopenharmony_ci functools.partial(self._sock_read_done, fd, handle=handle)) 3777db96d56Sopenharmony_ci return await fut 3787db96d56Sopenharmony_ci 3797db96d56Sopenharmony_ci def _sock_read_done(self, fd, fut, handle=None): 3807db96d56Sopenharmony_ci if handle is None or not handle.cancelled(): 3817db96d56Sopenharmony_ci self.remove_reader(fd) 3827db96d56Sopenharmony_ci 3837db96d56Sopenharmony_ci def _sock_recv(self, fut, sock, n): 3847db96d56Sopenharmony_ci # _sock_recv() can add itself as an I/O callback if the operation can't 3857db96d56Sopenharmony_ci # be done immediately. Don't use it directly, call sock_recv(). 3867db96d56Sopenharmony_ci if fut.done(): 3877db96d56Sopenharmony_ci return 3887db96d56Sopenharmony_ci try: 3897db96d56Sopenharmony_ci data = sock.recv(n) 3907db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 3917db96d56Sopenharmony_ci return # try again next time 3927db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 3937db96d56Sopenharmony_ci raise 3947db96d56Sopenharmony_ci except BaseException as exc: 3957db96d56Sopenharmony_ci fut.set_exception(exc) 3967db96d56Sopenharmony_ci else: 3977db96d56Sopenharmony_ci fut.set_result(data) 3987db96d56Sopenharmony_ci 3997db96d56Sopenharmony_ci async def sock_recv_into(self, sock, buf): 4007db96d56Sopenharmony_ci """Receive data from the socket. 4017db96d56Sopenharmony_ci 4027db96d56Sopenharmony_ci The received data is written into *buf* (a writable buffer). 4037db96d56Sopenharmony_ci The return value is the number of bytes written. 4047db96d56Sopenharmony_ci """ 4057db96d56Sopenharmony_ci base_events._check_ssl_socket(sock) 4067db96d56Sopenharmony_ci if self._debug and sock.gettimeout() != 0: 4077db96d56Sopenharmony_ci raise ValueError("the socket must be non-blocking") 4087db96d56Sopenharmony_ci try: 4097db96d56Sopenharmony_ci return sock.recv_into(buf) 4107db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 4117db96d56Sopenharmony_ci pass 4127db96d56Sopenharmony_ci fut = self.create_future() 4137db96d56Sopenharmony_ci fd = sock.fileno() 4147db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 4157db96d56Sopenharmony_ci handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) 4167db96d56Sopenharmony_ci fut.add_done_callback( 4177db96d56Sopenharmony_ci functools.partial(self._sock_read_done, fd, handle=handle)) 4187db96d56Sopenharmony_ci return await fut 4197db96d56Sopenharmony_ci 4207db96d56Sopenharmony_ci def _sock_recv_into(self, fut, sock, buf): 4217db96d56Sopenharmony_ci # _sock_recv_into() can add itself as an I/O callback if the operation 4227db96d56Sopenharmony_ci # can't be done immediately. Don't use it directly, call 4237db96d56Sopenharmony_ci # sock_recv_into(). 4247db96d56Sopenharmony_ci if fut.done(): 4257db96d56Sopenharmony_ci return 4267db96d56Sopenharmony_ci try: 4277db96d56Sopenharmony_ci nbytes = sock.recv_into(buf) 4287db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 4297db96d56Sopenharmony_ci return # try again next time 4307db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 4317db96d56Sopenharmony_ci raise 4327db96d56Sopenharmony_ci except BaseException as exc: 4337db96d56Sopenharmony_ci fut.set_exception(exc) 4347db96d56Sopenharmony_ci else: 4357db96d56Sopenharmony_ci fut.set_result(nbytes) 4367db96d56Sopenharmony_ci 4377db96d56Sopenharmony_ci async def sock_recvfrom(self, sock, bufsize): 4387db96d56Sopenharmony_ci """Receive a datagram from a datagram socket. 4397db96d56Sopenharmony_ci 4407db96d56Sopenharmony_ci The return value is a tuple of (bytes, address) representing the 4417db96d56Sopenharmony_ci datagram received and the address it came from. 4427db96d56Sopenharmony_ci The maximum amount of data to be received at once is specified by 4437db96d56Sopenharmony_ci nbytes. 4447db96d56Sopenharmony_ci """ 4457db96d56Sopenharmony_ci base_events._check_ssl_socket(sock) 4467db96d56Sopenharmony_ci if self._debug and sock.gettimeout() != 0: 4477db96d56Sopenharmony_ci raise ValueError("the socket must be non-blocking") 4487db96d56Sopenharmony_ci try: 4497db96d56Sopenharmony_ci return sock.recvfrom(bufsize) 4507db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 4517db96d56Sopenharmony_ci pass 4527db96d56Sopenharmony_ci fut = self.create_future() 4537db96d56Sopenharmony_ci fd = sock.fileno() 4547db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 4557db96d56Sopenharmony_ci handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize) 4567db96d56Sopenharmony_ci fut.add_done_callback( 4577db96d56Sopenharmony_ci functools.partial(self._sock_read_done, fd, handle=handle)) 4587db96d56Sopenharmony_ci return await fut 4597db96d56Sopenharmony_ci 4607db96d56Sopenharmony_ci def _sock_recvfrom(self, fut, sock, bufsize): 4617db96d56Sopenharmony_ci # _sock_recvfrom() can add itself as an I/O callback if the operation 4627db96d56Sopenharmony_ci # can't be done immediately. Don't use it directly, call 4637db96d56Sopenharmony_ci # sock_recvfrom(). 4647db96d56Sopenharmony_ci if fut.done(): 4657db96d56Sopenharmony_ci return 4667db96d56Sopenharmony_ci try: 4677db96d56Sopenharmony_ci result = sock.recvfrom(bufsize) 4687db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 4697db96d56Sopenharmony_ci return # try again next time 4707db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 4717db96d56Sopenharmony_ci raise 4727db96d56Sopenharmony_ci except BaseException as exc: 4737db96d56Sopenharmony_ci fut.set_exception(exc) 4747db96d56Sopenharmony_ci else: 4757db96d56Sopenharmony_ci fut.set_result(result) 4767db96d56Sopenharmony_ci 4777db96d56Sopenharmony_ci async def sock_recvfrom_into(self, sock, buf, nbytes=0): 4787db96d56Sopenharmony_ci """Receive data from the socket. 4797db96d56Sopenharmony_ci 4807db96d56Sopenharmony_ci The received data is written into *buf* (a writable buffer). 4817db96d56Sopenharmony_ci The return value is a tuple of (number of bytes written, address). 4827db96d56Sopenharmony_ci """ 4837db96d56Sopenharmony_ci base_events._check_ssl_socket(sock) 4847db96d56Sopenharmony_ci if self._debug and sock.gettimeout() != 0: 4857db96d56Sopenharmony_ci raise ValueError("the socket must be non-blocking") 4867db96d56Sopenharmony_ci if not nbytes: 4877db96d56Sopenharmony_ci nbytes = len(buf) 4887db96d56Sopenharmony_ci 4897db96d56Sopenharmony_ci try: 4907db96d56Sopenharmony_ci return sock.recvfrom_into(buf, nbytes) 4917db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 4927db96d56Sopenharmony_ci pass 4937db96d56Sopenharmony_ci fut = self.create_future() 4947db96d56Sopenharmony_ci fd = sock.fileno() 4957db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 4967db96d56Sopenharmony_ci handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf, 4977db96d56Sopenharmony_ci nbytes) 4987db96d56Sopenharmony_ci fut.add_done_callback( 4997db96d56Sopenharmony_ci functools.partial(self._sock_read_done, fd, handle=handle)) 5007db96d56Sopenharmony_ci return await fut 5017db96d56Sopenharmony_ci 5027db96d56Sopenharmony_ci def _sock_recvfrom_into(self, fut, sock, buf, bufsize): 5037db96d56Sopenharmony_ci # _sock_recv_into() can add itself as an I/O callback if the operation 5047db96d56Sopenharmony_ci # can't be done immediately. Don't use it directly, call 5057db96d56Sopenharmony_ci # sock_recv_into(). 5067db96d56Sopenharmony_ci if fut.done(): 5077db96d56Sopenharmony_ci return 5087db96d56Sopenharmony_ci try: 5097db96d56Sopenharmony_ci result = sock.recvfrom_into(buf, bufsize) 5107db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 5117db96d56Sopenharmony_ci return # try again next time 5127db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 5137db96d56Sopenharmony_ci raise 5147db96d56Sopenharmony_ci except BaseException as exc: 5157db96d56Sopenharmony_ci fut.set_exception(exc) 5167db96d56Sopenharmony_ci else: 5177db96d56Sopenharmony_ci fut.set_result(result) 5187db96d56Sopenharmony_ci 5197db96d56Sopenharmony_ci async def sock_sendall(self, sock, data): 5207db96d56Sopenharmony_ci """Send data to the socket. 5217db96d56Sopenharmony_ci 5227db96d56Sopenharmony_ci The socket must be connected to a remote socket. This method continues 5237db96d56Sopenharmony_ci to send data from data until either all data has been sent or an 5247db96d56Sopenharmony_ci error occurs. None is returned on success. On error, an exception is 5257db96d56Sopenharmony_ci raised, and there is no way to determine how much data, if any, was 5267db96d56Sopenharmony_ci successfully processed by the receiving end of the connection. 5277db96d56Sopenharmony_ci """ 5287db96d56Sopenharmony_ci base_events._check_ssl_socket(sock) 5297db96d56Sopenharmony_ci if self._debug and sock.gettimeout() != 0: 5307db96d56Sopenharmony_ci raise ValueError("the socket must be non-blocking") 5317db96d56Sopenharmony_ci try: 5327db96d56Sopenharmony_ci n = sock.send(data) 5337db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 5347db96d56Sopenharmony_ci n = 0 5357db96d56Sopenharmony_ci 5367db96d56Sopenharmony_ci if n == len(data): 5377db96d56Sopenharmony_ci # all data sent 5387db96d56Sopenharmony_ci return 5397db96d56Sopenharmony_ci 5407db96d56Sopenharmony_ci fut = self.create_future() 5417db96d56Sopenharmony_ci fd = sock.fileno() 5427db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 5437db96d56Sopenharmony_ci # use a trick with a list in closure to store a mutable state 5447db96d56Sopenharmony_ci handle = self._add_writer(fd, self._sock_sendall, fut, sock, 5457db96d56Sopenharmony_ci memoryview(data), [n]) 5467db96d56Sopenharmony_ci fut.add_done_callback( 5477db96d56Sopenharmony_ci functools.partial(self._sock_write_done, fd, handle=handle)) 5487db96d56Sopenharmony_ci return await fut 5497db96d56Sopenharmony_ci 5507db96d56Sopenharmony_ci def _sock_sendall(self, fut, sock, view, pos): 5517db96d56Sopenharmony_ci if fut.done(): 5527db96d56Sopenharmony_ci # Future cancellation can be scheduled on previous loop iteration 5537db96d56Sopenharmony_ci return 5547db96d56Sopenharmony_ci start = pos[0] 5557db96d56Sopenharmony_ci try: 5567db96d56Sopenharmony_ci n = sock.send(view[start:]) 5577db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 5587db96d56Sopenharmony_ci return 5597db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 5607db96d56Sopenharmony_ci raise 5617db96d56Sopenharmony_ci except BaseException as exc: 5627db96d56Sopenharmony_ci fut.set_exception(exc) 5637db96d56Sopenharmony_ci return 5647db96d56Sopenharmony_ci 5657db96d56Sopenharmony_ci start += n 5667db96d56Sopenharmony_ci 5677db96d56Sopenharmony_ci if start == len(view): 5687db96d56Sopenharmony_ci fut.set_result(None) 5697db96d56Sopenharmony_ci else: 5707db96d56Sopenharmony_ci pos[0] = start 5717db96d56Sopenharmony_ci 5727db96d56Sopenharmony_ci async def sock_sendto(self, sock, data, address): 5737db96d56Sopenharmony_ci """Send data to the socket. 5747db96d56Sopenharmony_ci 5757db96d56Sopenharmony_ci The socket must be connected to a remote socket. This method continues 5767db96d56Sopenharmony_ci to send data from data until either all data has been sent or an 5777db96d56Sopenharmony_ci error occurs. None is returned on success. On error, an exception is 5787db96d56Sopenharmony_ci raised, and there is no way to determine how much data, if any, was 5797db96d56Sopenharmony_ci successfully processed by the receiving end of the connection. 5807db96d56Sopenharmony_ci """ 5817db96d56Sopenharmony_ci base_events._check_ssl_socket(sock) 5827db96d56Sopenharmony_ci if self._debug and sock.gettimeout() != 0: 5837db96d56Sopenharmony_ci raise ValueError("the socket must be non-blocking") 5847db96d56Sopenharmony_ci try: 5857db96d56Sopenharmony_ci return sock.sendto(data, address) 5867db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 5877db96d56Sopenharmony_ci pass 5887db96d56Sopenharmony_ci 5897db96d56Sopenharmony_ci fut = self.create_future() 5907db96d56Sopenharmony_ci fd = sock.fileno() 5917db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 5927db96d56Sopenharmony_ci # use a trick with a list in closure to store a mutable state 5937db96d56Sopenharmony_ci handle = self._add_writer(fd, self._sock_sendto, fut, sock, data, 5947db96d56Sopenharmony_ci address) 5957db96d56Sopenharmony_ci fut.add_done_callback( 5967db96d56Sopenharmony_ci functools.partial(self._sock_write_done, fd, handle=handle)) 5977db96d56Sopenharmony_ci return await fut 5987db96d56Sopenharmony_ci 5997db96d56Sopenharmony_ci def _sock_sendto(self, fut, sock, data, address): 6007db96d56Sopenharmony_ci if fut.done(): 6017db96d56Sopenharmony_ci # Future cancellation can be scheduled on previous loop iteration 6027db96d56Sopenharmony_ci return 6037db96d56Sopenharmony_ci try: 6047db96d56Sopenharmony_ci n = sock.sendto(data, 0, address) 6057db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 6067db96d56Sopenharmony_ci return 6077db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 6087db96d56Sopenharmony_ci raise 6097db96d56Sopenharmony_ci except BaseException as exc: 6107db96d56Sopenharmony_ci fut.set_exception(exc) 6117db96d56Sopenharmony_ci else: 6127db96d56Sopenharmony_ci fut.set_result(n) 6137db96d56Sopenharmony_ci 6147db96d56Sopenharmony_ci async def sock_connect(self, sock, address): 6157db96d56Sopenharmony_ci """Connect to a remote socket at address. 6167db96d56Sopenharmony_ci 6177db96d56Sopenharmony_ci This method is a coroutine. 6187db96d56Sopenharmony_ci """ 6197db96d56Sopenharmony_ci base_events._check_ssl_socket(sock) 6207db96d56Sopenharmony_ci if self._debug and sock.gettimeout() != 0: 6217db96d56Sopenharmony_ci raise ValueError("the socket must be non-blocking") 6227db96d56Sopenharmony_ci 6237db96d56Sopenharmony_ci if sock.family == socket.AF_INET or ( 6247db96d56Sopenharmony_ci base_events._HAS_IPv6 and sock.family == socket.AF_INET6): 6257db96d56Sopenharmony_ci resolved = await self._ensure_resolved( 6267db96d56Sopenharmony_ci address, family=sock.family, type=sock.type, proto=sock.proto, 6277db96d56Sopenharmony_ci loop=self, 6287db96d56Sopenharmony_ci ) 6297db96d56Sopenharmony_ci _, _, _, _, address = resolved[0] 6307db96d56Sopenharmony_ci 6317db96d56Sopenharmony_ci fut = self.create_future() 6327db96d56Sopenharmony_ci self._sock_connect(fut, sock, address) 6337db96d56Sopenharmony_ci try: 6347db96d56Sopenharmony_ci return await fut 6357db96d56Sopenharmony_ci finally: 6367db96d56Sopenharmony_ci # Needed to break cycles when an exception occurs. 6377db96d56Sopenharmony_ci fut = None 6387db96d56Sopenharmony_ci 6397db96d56Sopenharmony_ci def _sock_connect(self, fut, sock, address): 6407db96d56Sopenharmony_ci fd = sock.fileno() 6417db96d56Sopenharmony_ci try: 6427db96d56Sopenharmony_ci sock.connect(address) 6437db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 6447db96d56Sopenharmony_ci # Issue #23618: When the C function connect() fails with EINTR, the 6457db96d56Sopenharmony_ci # connection runs in background. We have to wait until the socket 6467db96d56Sopenharmony_ci # becomes writable to be notified when the connection succeed or 6477db96d56Sopenharmony_ci # fails. 6487db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 6497db96d56Sopenharmony_ci handle = self._add_writer( 6507db96d56Sopenharmony_ci fd, self._sock_connect_cb, fut, sock, address) 6517db96d56Sopenharmony_ci fut.add_done_callback( 6527db96d56Sopenharmony_ci functools.partial(self._sock_write_done, fd, handle=handle)) 6537db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 6547db96d56Sopenharmony_ci raise 6557db96d56Sopenharmony_ci except BaseException as exc: 6567db96d56Sopenharmony_ci fut.set_exception(exc) 6577db96d56Sopenharmony_ci else: 6587db96d56Sopenharmony_ci fut.set_result(None) 6597db96d56Sopenharmony_ci finally: 6607db96d56Sopenharmony_ci fut = None 6617db96d56Sopenharmony_ci 6627db96d56Sopenharmony_ci def _sock_write_done(self, fd, fut, handle=None): 6637db96d56Sopenharmony_ci if handle is None or not handle.cancelled(): 6647db96d56Sopenharmony_ci self.remove_writer(fd) 6657db96d56Sopenharmony_ci 6667db96d56Sopenharmony_ci def _sock_connect_cb(self, fut, sock, address): 6677db96d56Sopenharmony_ci if fut.done(): 6687db96d56Sopenharmony_ci return 6697db96d56Sopenharmony_ci 6707db96d56Sopenharmony_ci try: 6717db96d56Sopenharmony_ci err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 6727db96d56Sopenharmony_ci if err != 0: 6737db96d56Sopenharmony_ci # Jump to any except clause below. 6747db96d56Sopenharmony_ci raise OSError(err, f'Connect call failed {address}') 6757db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 6767db96d56Sopenharmony_ci # socket is still registered, the callback will be retried later 6777db96d56Sopenharmony_ci pass 6787db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 6797db96d56Sopenharmony_ci raise 6807db96d56Sopenharmony_ci except BaseException as exc: 6817db96d56Sopenharmony_ci fut.set_exception(exc) 6827db96d56Sopenharmony_ci else: 6837db96d56Sopenharmony_ci fut.set_result(None) 6847db96d56Sopenharmony_ci finally: 6857db96d56Sopenharmony_ci fut = None 6867db96d56Sopenharmony_ci 6877db96d56Sopenharmony_ci async def sock_accept(self, sock): 6887db96d56Sopenharmony_ci """Accept a connection. 6897db96d56Sopenharmony_ci 6907db96d56Sopenharmony_ci The socket must be bound to an address and listening for connections. 6917db96d56Sopenharmony_ci The return value is a pair (conn, address) where conn is a new socket 6927db96d56Sopenharmony_ci object usable to send and receive data on the connection, and address 6937db96d56Sopenharmony_ci is the address bound to the socket on the other end of the connection. 6947db96d56Sopenharmony_ci """ 6957db96d56Sopenharmony_ci base_events._check_ssl_socket(sock) 6967db96d56Sopenharmony_ci if self._debug and sock.gettimeout() != 0: 6977db96d56Sopenharmony_ci raise ValueError("the socket must be non-blocking") 6987db96d56Sopenharmony_ci fut = self.create_future() 6997db96d56Sopenharmony_ci self._sock_accept(fut, sock) 7007db96d56Sopenharmony_ci return await fut 7017db96d56Sopenharmony_ci 7027db96d56Sopenharmony_ci def _sock_accept(self, fut, sock): 7037db96d56Sopenharmony_ci fd = sock.fileno() 7047db96d56Sopenharmony_ci try: 7057db96d56Sopenharmony_ci conn, address = sock.accept() 7067db96d56Sopenharmony_ci conn.setblocking(False) 7077db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 7087db96d56Sopenharmony_ci self._ensure_fd_no_transport(fd) 7097db96d56Sopenharmony_ci handle = self._add_reader(fd, self._sock_accept, fut, sock) 7107db96d56Sopenharmony_ci fut.add_done_callback( 7117db96d56Sopenharmony_ci functools.partial(self._sock_read_done, fd, handle=handle)) 7127db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 7137db96d56Sopenharmony_ci raise 7147db96d56Sopenharmony_ci except BaseException as exc: 7157db96d56Sopenharmony_ci fut.set_exception(exc) 7167db96d56Sopenharmony_ci else: 7177db96d56Sopenharmony_ci fut.set_result((conn, address)) 7187db96d56Sopenharmony_ci 7197db96d56Sopenharmony_ci async def _sendfile_native(self, transp, file, offset, count): 7207db96d56Sopenharmony_ci del self._transports[transp._sock_fd] 7217db96d56Sopenharmony_ci resume_reading = transp.is_reading() 7227db96d56Sopenharmony_ci transp.pause_reading() 7237db96d56Sopenharmony_ci await transp._make_empty_waiter() 7247db96d56Sopenharmony_ci try: 7257db96d56Sopenharmony_ci return await self.sock_sendfile(transp._sock, file, offset, count, 7267db96d56Sopenharmony_ci fallback=False) 7277db96d56Sopenharmony_ci finally: 7287db96d56Sopenharmony_ci transp._reset_empty_waiter() 7297db96d56Sopenharmony_ci if resume_reading: 7307db96d56Sopenharmony_ci transp.resume_reading() 7317db96d56Sopenharmony_ci self._transports[transp._sock_fd] = transp 7327db96d56Sopenharmony_ci 7337db96d56Sopenharmony_ci def _process_events(self, event_list): 7347db96d56Sopenharmony_ci for key, mask in event_list: 7357db96d56Sopenharmony_ci fileobj, (reader, writer) = key.fileobj, key.data 7367db96d56Sopenharmony_ci if mask & selectors.EVENT_READ and reader is not None: 7377db96d56Sopenharmony_ci if reader._cancelled: 7387db96d56Sopenharmony_ci self._remove_reader(fileobj) 7397db96d56Sopenharmony_ci else: 7407db96d56Sopenharmony_ci self._add_callback(reader) 7417db96d56Sopenharmony_ci if mask & selectors.EVENT_WRITE and writer is not None: 7427db96d56Sopenharmony_ci if writer._cancelled: 7437db96d56Sopenharmony_ci self._remove_writer(fileobj) 7447db96d56Sopenharmony_ci else: 7457db96d56Sopenharmony_ci self._add_callback(writer) 7467db96d56Sopenharmony_ci 7477db96d56Sopenharmony_ci def _stop_serving(self, sock): 7487db96d56Sopenharmony_ci self._remove_reader(sock.fileno()) 7497db96d56Sopenharmony_ci sock.close() 7507db96d56Sopenharmony_ci 7517db96d56Sopenharmony_ci 7527db96d56Sopenharmony_ciclass _SelectorTransport(transports._FlowControlMixin, 7537db96d56Sopenharmony_ci transports.Transport): 7547db96d56Sopenharmony_ci 7557db96d56Sopenharmony_ci max_size = 256 * 1024 # Buffer size passed to recv(). 7567db96d56Sopenharmony_ci 7577db96d56Sopenharmony_ci _buffer_factory = bytearray # Constructs initial value for self._buffer. 7587db96d56Sopenharmony_ci 7597db96d56Sopenharmony_ci # Attribute used in the destructor: it must be set even if the constructor 7607db96d56Sopenharmony_ci # is not called (see _SelectorSslTransport which may start by raising an 7617db96d56Sopenharmony_ci # exception) 7627db96d56Sopenharmony_ci _sock = None 7637db96d56Sopenharmony_ci 7647db96d56Sopenharmony_ci def __init__(self, loop, sock, protocol, extra=None, server=None): 7657db96d56Sopenharmony_ci super().__init__(extra, loop) 7667db96d56Sopenharmony_ci self._extra['socket'] = trsock.TransportSocket(sock) 7677db96d56Sopenharmony_ci try: 7687db96d56Sopenharmony_ci self._extra['sockname'] = sock.getsockname() 7697db96d56Sopenharmony_ci except OSError: 7707db96d56Sopenharmony_ci self._extra['sockname'] = None 7717db96d56Sopenharmony_ci if 'peername' not in self._extra: 7727db96d56Sopenharmony_ci try: 7737db96d56Sopenharmony_ci self._extra['peername'] = sock.getpeername() 7747db96d56Sopenharmony_ci except socket.error: 7757db96d56Sopenharmony_ci self._extra['peername'] = None 7767db96d56Sopenharmony_ci self._sock = sock 7777db96d56Sopenharmony_ci self._sock_fd = sock.fileno() 7787db96d56Sopenharmony_ci 7797db96d56Sopenharmony_ci self._protocol_connected = False 7807db96d56Sopenharmony_ci self.set_protocol(protocol) 7817db96d56Sopenharmony_ci 7827db96d56Sopenharmony_ci self._server = server 7837db96d56Sopenharmony_ci self._buffer = self._buffer_factory() 7847db96d56Sopenharmony_ci self._conn_lost = 0 # Set when call to connection_lost scheduled. 7857db96d56Sopenharmony_ci self._closing = False # Set when close() called. 7867db96d56Sopenharmony_ci self._paused = False # Set when pause_reading() called 7877db96d56Sopenharmony_ci 7887db96d56Sopenharmony_ci if self._server is not None: 7897db96d56Sopenharmony_ci self._server._attach() 7907db96d56Sopenharmony_ci loop._transports[self._sock_fd] = self 7917db96d56Sopenharmony_ci 7927db96d56Sopenharmony_ci def __repr__(self): 7937db96d56Sopenharmony_ci info = [self.__class__.__name__] 7947db96d56Sopenharmony_ci if self._sock is None: 7957db96d56Sopenharmony_ci info.append('closed') 7967db96d56Sopenharmony_ci elif self._closing: 7977db96d56Sopenharmony_ci info.append('closing') 7987db96d56Sopenharmony_ci info.append(f'fd={self._sock_fd}') 7997db96d56Sopenharmony_ci # test if the transport was closed 8007db96d56Sopenharmony_ci if self._loop is not None and not self._loop.is_closed(): 8017db96d56Sopenharmony_ci polling = _test_selector_event(self._loop._selector, 8027db96d56Sopenharmony_ci self._sock_fd, selectors.EVENT_READ) 8037db96d56Sopenharmony_ci if polling: 8047db96d56Sopenharmony_ci info.append('read=polling') 8057db96d56Sopenharmony_ci else: 8067db96d56Sopenharmony_ci info.append('read=idle') 8077db96d56Sopenharmony_ci 8087db96d56Sopenharmony_ci polling = _test_selector_event(self._loop._selector, 8097db96d56Sopenharmony_ci self._sock_fd, 8107db96d56Sopenharmony_ci selectors.EVENT_WRITE) 8117db96d56Sopenharmony_ci if polling: 8127db96d56Sopenharmony_ci state = 'polling' 8137db96d56Sopenharmony_ci else: 8147db96d56Sopenharmony_ci state = 'idle' 8157db96d56Sopenharmony_ci 8167db96d56Sopenharmony_ci bufsize = self.get_write_buffer_size() 8177db96d56Sopenharmony_ci info.append(f'write=<{state}, bufsize={bufsize}>') 8187db96d56Sopenharmony_ci return '<{}>'.format(' '.join(info)) 8197db96d56Sopenharmony_ci 8207db96d56Sopenharmony_ci def abort(self): 8217db96d56Sopenharmony_ci self._force_close(None) 8227db96d56Sopenharmony_ci 8237db96d56Sopenharmony_ci def set_protocol(self, protocol): 8247db96d56Sopenharmony_ci self._protocol = protocol 8257db96d56Sopenharmony_ci self._protocol_connected = True 8267db96d56Sopenharmony_ci 8277db96d56Sopenharmony_ci def get_protocol(self): 8287db96d56Sopenharmony_ci return self._protocol 8297db96d56Sopenharmony_ci 8307db96d56Sopenharmony_ci def is_closing(self): 8317db96d56Sopenharmony_ci return self._closing 8327db96d56Sopenharmony_ci 8337db96d56Sopenharmony_ci def is_reading(self): 8347db96d56Sopenharmony_ci return not self.is_closing() and not self._paused 8357db96d56Sopenharmony_ci 8367db96d56Sopenharmony_ci def pause_reading(self): 8377db96d56Sopenharmony_ci if not self.is_reading(): 8387db96d56Sopenharmony_ci return 8397db96d56Sopenharmony_ci self._paused = True 8407db96d56Sopenharmony_ci self._loop._remove_reader(self._sock_fd) 8417db96d56Sopenharmony_ci if self._loop.get_debug(): 8427db96d56Sopenharmony_ci logger.debug("%r pauses reading", self) 8437db96d56Sopenharmony_ci 8447db96d56Sopenharmony_ci def resume_reading(self): 8457db96d56Sopenharmony_ci if self._closing or not self._paused: 8467db96d56Sopenharmony_ci return 8477db96d56Sopenharmony_ci self._paused = False 8487db96d56Sopenharmony_ci self._add_reader(self._sock_fd, self._read_ready) 8497db96d56Sopenharmony_ci if self._loop.get_debug(): 8507db96d56Sopenharmony_ci logger.debug("%r resumes reading", self) 8517db96d56Sopenharmony_ci 8527db96d56Sopenharmony_ci def close(self): 8537db96d56Sopenharmony_ci if self._closing: 8547db96d56Sopenharmony_ci return 8557db96d56Sopenharmony_ci self._closing = True 8567db96d56Sopenharmony_ci self._loop._remove_reader(self._sock_fd) 8577db96d56Sopenharmony_ci if not self._buffer: 8587db96d56Sopenharmony_ci self._conn_lost += 1 8597db96d56Sopenharmony_ci self._loop._remove_writer(self._sock_fd) 8607db96d56Sopenharmony_ci self._loop.call_soon(self._call_connection_lost, None) 8617db96d56Sopenharmony_ci 8627db96d56Sopenharmony_ci def __del__(self, _warn=warnings.warn): 8637db96d56Sopenharmony_ci if self._sock is not None: 8647db96d56Sopenharmony_ci _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 8657db96d56Sopenharmony_ci self._sock.close() 8667db96d56Sopenharmony_ci 8677db96d56Sopenharmony_ci def _fatal_error(self, exc, message='Fatal error on transport'): 8687db96d56Sopenharmony_ci # Should be called from exception handler only. 8697db96d56Sopenharmony_ci if isinstance(exc, OSError): 8707db96d56Sopenharmony_ci if self._loop.get_debug(): 8717db96d56Sopenharmony_ci logger.debug("%r: %s", self, message, exc_info=True) 8727db96d56Sopenharmony_ci else: 8737db96d56Sopenharmony_ci self._loop.call_exception_handler({ 8747db96d56Sopenharmony_ci 'message': message, 8757db96d56Sopenharmony_ci 'exception': exc, 8767db96d56Sopenharmony_ci 'transport': self, 8777db96d56Sopenharmony_ci 'protocol': self._protocol, 8787db96d56Sopenharmony_ci }) 8797db96d56Sopenharmony_ci self._force_close(exc) 8807db96d56Sopenharmony_ci 8817db96d56Sopenharmony_ci def _force_close(self, exc): 8827db96d56Sopenharmony_ci if self._conn_lost: 8837db96d56Sopenharmony_ci return 8847db96d56Sopenharmony_ci if self._buffer: 8857db96d56Sopenharmony_ci self._buffer.clear() 8867db96d56Sopenharmony_ci self._loop._remove_writer(self._sock_fd) 8877db96d56Sopenharmony_ci if not self._closing: 8887db96d56Sopenharmony_ci self._closing = True 8897db96d56Sopenharmony_ci self._loop._remove_reader(self._sock_fd) 8907db96d56Sopenharmony_ci self._conn_lost += 1 8917db96d56Sopenharmony_ci self._loop.call_soon(self._call_connection_lost, exc) 8927db96d56Sopenharmony_ci 8937db96d56Sopenharmony_ci def _call_connection_lost(self, exc): 8947db96d56Sopenharmony_ci try: 8957db96d56Sopenharmony_ci if self._protocol_connected: 8967db96d56Sopenharmony_ci self._protocol.connection_lost(exc) 8977db96d56Sopenharmony_ci finally: 8987db96d56Sopenharmony_ci self._sock.close() 8997db96d56Sopenharmony_ci self._sock = None 9007db96d56Sopenharmony_ci self._protocol = None 9017db96d56Sopenharmony_ci self._loop = None 9027db96d56Sopenharmony_ci server = self._server 9037db96d56Sopenharmony_ci if server is not None: 9047db96d56Sopenharmony_ci server._detach() 9057db96d56Sopenharmony_ci self._server = None 9067db96d56Sopenharmony_ci 9077db96d56Sopenharmony_ci def get_write_buffer_size(self): 9087db96d56Sopenharmony_ci return len(self._buffer) 9097db96d56Sopenharmony_ci 9107db96d56Sopenharmony_ci def _add_reader(self, fd, callback, *args): 9117db96d56Sopenharmony_ci if not self.is_reading(): 9127db96d56Sopenharmony_ci return 9137db96d56Sopenharmony_ci self._loop._add_reader(fd, callback, *args) 9147db96d56Sopenharmony_ci 9157db96d56Sopenharmony_ci 9167db96d56Sopenharmony_ciclass _SelectorSocketTransport(_SelectorTransport): 9177db96d56Sopenharmony_ci 9187db96d56Sopenharmony_ci _start_tls_compatible = True 9197db96d56Sopenharmony_ci _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 9207db96d56Sopenharmony_ci 9217db96d56Sopenharmony_ci def __init__(self, loop, sock, protocol, waiter=None, 9227db96d56Sopenharmony_ci extra=None, server=None): 9237db96d56Sopenharmony_ci 9247db96d56Sopenharmony_ci self._read_ready_cb = None 9257db96d56Sopenharmony_ci super().__init__(loop, sock, protocol, extra, server) 9267db96d56Sopenharmony_ci self._eof = False 9277db96d56Sopenharmony_ci self._empty_waiter = None 9287db96d56Sopenharmony_ci 9297db96d56Sopenharmony_ci # Disable the Nagle algorithm -- small writes will be 9307db96d56Sopenharmony_ci # sent without waiting for the TCP ACK. This generally 9317db96d56Sopenharmony_ci # decreases the latency (in some cases significantly.) 9327db96d56Sopenharmony_ci base_events._set_nodelay(self._sock) 9337db96d56Sopenharmony_ci 9347db96d56Sopenharmony_ci self._loop.call_soon(self._protocol.connection_made, self) 9357db96d56Sopenharmony_ci # only start reading when connection_made() has been called 9367db96d56Sopenharmony_ci self._loop.call_soon(self._add_reader, 9377db96d56Sopenharmony_ci self._sock_fd, self._read_ready) 9387db96d56Sopenharmony_ci if waiter is not None: 9397db96d56Sopenharmony_ci # only wake up the waiter when connection_made() has been called 9407db96d56Sopenharmony_ci self._loop.call_soon(futures._set_result_unless_cancelled, 9417db96d56Sopenharmony_ci waiter, None) 9427db96d56Sopenharmony_ci 9437db96d56Sopenharmony_ci def set_protocol(self, protocol): 9447db96d56Sopenharmony_ci if isinstance(protocol, protocols.BufferedProtocol): 9457db96d56Sopenharmony_ci self._read_ready_cb = self._read_ready__get_buffer 9467db96d56Sopenharmony_ci else: 9477db96d56Sopenharmony_ci self._read_ready_cb = self._read_ready__data_received 9487db96d56Sopenharmony_ci 9497db96d56Sopenharmony_ci super().set_protocol(protocol) 9507db96d56Sopenharmony_ci 9517db96d56Sopenharmony_ci def _read_ready(self): 9527db96d56Sopenharmony_ci self._read_ready_cb() 9537db96d56Sopenharmony_ci 9547db96d56Sopenharmony_ci def _read_ready__get_buffer(self): 9557db96d56Sopenharmony_ci if self._conn_lost: 9567db96d56Sopenharmony_ci return 9577db96d56Sopenharmony_ci 9587db96d56Sopenharmony_ci try: 9597db96d56Sopenharmony_ci buf = self._protocol.get_buffer(-1) 9607db96d56Sopenharmony_ci if not len(buf): 9617db96d56Sopenharmony_ci raise RuntimeError('get_buffer() returned an empty buffer') 9627db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 9637db96d56Sopenharmony_ci raise 9647db96d56Sopenharmony_ci except BaseException as exc: 9657db96d56Sopenharmony_ci self._fatal_error( 9667db96d56Sopenharmony_ci exc, 'Fatal error: protocol.get_buffer() call failed.') 9677db96d56Sopenharmony_ci return 9687db96d56Sopenharmony_ci 9697db96d56Sopenharmony_ci try: 9707db96d56Sopenharmony_ci nbytes = self._sock.recv_into(buf) 9717db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 9727db96d56Sopenharmony_ci return 9737db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 9747db96d56Sopenharmony_ci raise 9757db96d56Sopenharmony_ci except BaseException as exc: 9767db96d56Sopenharmony_ci self._fatal_error(exc, 'Fatal read error on socket transport') 9777db96d56Sopenharmony_ci return 9787db96d56Sopenharmony_ci 9797db96d56Sopenharmony_ci if not nbytes: 9807db96d56Sopenharmony_ci self._read_ready__on_eof() 9817db96d56Sopenharmony_ci return 9827db96d56Sopenharmony_ci 9837db96d56Sopenharmony_ci try: 9847db96d56Sopenharmony_ci self._protocol.buffer_updated(nbytes) 9857db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 9867db96d56Sopenharmony_ci raise 9877db96d56Sopenharmony_ci except BaseException as exc: 9887db96d56Sopenharmony_ci self._fatal_error( 9897db96d56Sopenharmony_ci exc, 'Fatal error: protocol.buffer_updated() call failed.') 9907db96d56Sopenharmony_ci 9917db96d56Sopenharmony_ci def _read_ready__data_received(self): 9927db96d56Sopenharmony_ci if self._conn_lost: 9937db96d56Sopenharmony_ci return 9947db96d56Sopenharmony_ci try: 9957db96d56Sopenharmony_ci data = self._sock.recv(self.max_size) 9967db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 9977db96d56Sopenharmony_ci return 9987db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 9997db96d56Sopenharmony_ci raise 10007db96d56Sopenharmony_ci except BaseException as exc: 10017db96d56Sopenharmony_ci self._fatal_error(exc, 'Fatal read error on socket transport') 10027db96d56Sopenharmony_ci return 10037db96d56Sopenharmony_ci 10047db96d56Sopenharmony_ci if not data: 10057db96d56Sopenharmony_ci self._read_ready__on_eof() 10067db96d56Sopenharmony_ci return 10077db96d56Sopenharmony_ci 10087db96d56Sopenharmony_ci try: 10097db96d56Sopenharmony_ci self._protocol.data_received(data) 10107db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 10117db96d56Sopenharmony_ci raise 10127db96d56Sopenharmony_ci except BaseException as exc: 10137db96d56Sopenharmony_ci self._fatal_error( 10147db96d56Sopenharmony_ci exc, 'Fatal error: protocol.data_received() call failed.') 10157db96d56Sopenharmony_ci 10167db96d56Sopenharmony_ci def _read_ready__on_eof(self): 10177db96d56Sopenharmony_ci if self._loop.get_debug(): 10187db96d56Sopenharmony_ci logger.debug("%r received EOF", self) 10197db96d56Sopenharmony_ci 10207db96d56Sopenharmony_ci try: 10217db96d56Sopenharmony_ci keep_open = self._protocol.eof_received() 10227db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 10237db96d56Sopenharmony_ci raise 10247db96d56Sopenharmony_ci except BaseException as exc: 10257db96d56Sopenharmony_ci self._fatal_error( 10267db96d56Sopenharmony_ci exc, 'Fatal error: protocol.eof_received() call failed.') 10277db96d56Sopenharmony_ci return 10287db96d56Sopenharmony_ci 10297db96d56Sopenharmony_ci if keep_open: 10307db96d56Sopenharmony_ci # We're keeping the connection open so the 10317db96d56Sopenharmony_ci # protocol can write more, but we still can't 10327db96d56Sopenharmony_ci # receive more, so remove the reader callback. 10337db96d56Sopenharmony_ci self._loop._remove_reader(self._sock_fd) 10347db96d56Sopenharmony_ci else: 10357db96d56Sopenharmony_ci self.close() 10367db96d56Sopenharmony_ci 10377db96d56Sopenharmony_ci def write(self, data): 10387db96d56Sopenharmony_ci if not isinstance(data, (bytes, bytearray, memoryview)): 10397db96d56Sopenharmony_ci raise TypeError(f'data argument must be a bytes-like object, ' 10407db96d56Sopenharmony_ci f'not {type(data).__name__!r}') 10417db96d56Sopenharmony_ci if self._eof: 10427db96d56Sopenharmony_ci raise RuntimeError('Cannot call write() after write_eof()') 10437db96d56Sopenharmony_ci if self._empty_waiter is not None: 10447db96d56Sopenharmony_ci raise RuntimeError('unable to write; sendfile is in progress') 10457db96d56Sopenharmony_ci if not data: 10467db96d56Sopenharmony_ci return 10477db96d56Sopenharmony_ci 10487db96d56Sopenharmony_ci if self._conn_lost: 10497db96d56Sopenharmony_ci if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 10507db96d56Sopenharmony_ci logger.warning('socket.send() raised exception.') 10517db96d56Sopenharmony_ci self._conn_lost += 1 10527db96d56Sopenharmony_ci return 10537db96d56Sopenharmony_ci 10547db96d56Sopenharmony_ci if not self._buffer: 10557db96d56Sopenharmony_ci # Optimization: try to send now. 10567db96d56Sopenharmony_ci try: 10577db96d56Sopenharmony_ci n = self._sock.send(data) 10587db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 10597db96d56Sopenharmony_ci pass 10607db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 10617db96d56Sopenharmony_ci raise 10627db96d56Sopenharmony_ci except BaseException as exc: 10637db96d56Sopenharmony_ci self._fatal_error(exc, 'Fatal write error on socket transport') 10647db96d56Sopenharmony_ci return 10657db96d56Sopenharmony_ci else: 10667db96d56Sopenharmony_ci data = data[n:] 10677db96d56Sopenharmony_ci if not data: 10687db96d56Sopenharmony_ci return 10697db96d56Sopenharmony_ci # Not all was written; register write handler. 10707db96d56Sopenharmony_ci self._loop._add_writer(self._sock_fd, self._write_ready) 10717db96d56Sopenharmony_ci 10727db96d56Sopenharmony_ci # Add it to the buffer. 10737db96d56Sopenharmony_ci self._buffer.extend(data) 10747db96d56Sopenharmony_ci self._maybe_pause_protocol() 10757db96d56Sopenharmony_ci 10767db96d56Sopenharmony_ci def _write_ready(self): 10777db96d56Sopenharmony_ci assert self._buffer, 'Data should not be empty' 10787db96d56Sopenharmony_ci 10797db96d56Sopenharmony_ci if self._conn_lost: 10807db96d56Sopenharmony_ci return 10817db96d56Sopenharmony_ci try: 10827db96d56Sopenharmony_ci n = self._sock.send(self._buffer) 10837db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 10847db96d56Sopenharmony_ci pass 10857db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 10867db96d56Sopenharmony_ci raise 10877db96d56Sopenharmony_ci except BaseException as exc: 10887db96d56Sopenharmony_ci self._loop._remove_writer(self._sock_fd) 10897db96d56Sopenharmony_ci self._buffer.clear() 10907db96d56Sopenharmony_ci self._fatal_error(exc, 'Fatal write error on socket transport') 10917db96d56Sopenharmony_ci if self._empty_waiter is not None: 10927db96d56Sopenharmony_ci self._empty_waiter.set_exception(exc) 10937db96d56Sopenharmony_ci else: 10947db96d56Sopenharmony_ci if n: 10957db96d56Sopenharmony_ci del self._buffer[:n] 10967db96d56Sopenharmony_ci self._maybe_resume_protocol() # May append to buffer. 10977db96d56Sopenharmony_ci if not self._buffer: 10987db96d56Sopenharmony_ci self._loop._remove_writer(self._sock_fd) 10997db96d56Sopenharmony_ci if self._empty_waiter is not None: 11007db96d56Sopenharmony_ci self._empty_waiter.set_result(None) 11017db96d56Sopenharmony_ci if self._closing: 11027db96d56Sopenharmony_ci self._call_connection_lost(None) 11037db96d56Sopenharmony_ci elif self._eof: 11047db96d56Sopenharmony_ci self._sock.shutdown(socket.SHUT_WR) 11057db96d56Sopenharmony_ci 11067db96d56Sopenharmony_ci def write_eof(self): 11077db96d56Sopenharmony_ci if self._closing or self._eof: 11087db96d56Sopenharmony_ci return 11097db96d56Sopenharmony_ci self._eof = True 11107db96d56Sopenharmony_ci if not self._buffer: 11117db96d56Sopenharmony_ci self._sock.shutdown(socket.SHUT_WR) 11127db96d56Sopenharmony_ci 11137db96d56Sopenharmony_ci def can_write_eof(self): 11147db96d56Sopenharmony_ci return True 11157db96d56Sopenharmony_ci 11167db96d56Sopenharmony_ci def _call_connection_lost(self, exc): 11177db96d56Sopenharmony_ci super()._call_connection_lost(exc) 11187db96d56Sopenharmony_ci if self._empty_waiter is not None: 11197db96d56Sopenharmony_ci self._empty_waiter.set_exception( 11207db96d56Sopenharmony_ci ConnectionError("Connection is closed by peer")) 11217db96d56Sopenharmony_ci 11227db96d56Sopenharmony_ci def _make_empty_waiter(self): 11237db96d56Sopenharmony_ci if self._empty_waiter is not None: 11247db96d56Sopenharmony_ci raise RuntimeError("Empty waiter is already set") 11257db96d56Sopenharmony_ci self._empty_waiter = self._loop.create_future() 11267db96d56Sopenharmony_ci if not self._buffer: 11277db96d56Sopenharmony_ci self._empty_waiter.set_result(None) 11287db96d56Sopenharmony_ci return self._empty_waiter 11297db96d56Sopenharmony_ci 11307db96d56Sopenharmony_ci def _reset_empty_waiter(self): 11317db96d56Sopenharmony_ci self._empty_waiter = None 11327db96d56Sopenharmony_ci 11337db96d56Sopenharmony_ci 11347db96d56Sopenharmony_ciclass _SelectorDatagramTransport(_SelectorTransport): 11357db96d56Sopenharmony_ci 11367db96d56Sopenharmony_ci _buffer_factory = collections.deque 11377db96d56Sopenharmony_ci 11387db96d56Sopenharmony_ci def __init__(self, loop, sock, protocol, address=None, 11397db96d56Sopenharmony_ci waiter=None, extra=None): 11407db96d56Sopenharmony_ci super().__init__(loop, sock, protocol, extra) 11417db96d56Sopenharmony_ci self._address = address 11427db96d56Sopenharmony_ci self._buffer_size = 0 11437db96d56Sopenharmony_ci self._loop.call_soon(self._protocol.connection_made, self) 11447db96d56Sopenharmony_ci # only start reading when connection_made() has been called 11457db96d56Sopenharmony_ci self._loop.call_soon(self._add_reader, 11467db96d56Sopenharmony_ci self._sock_fd, self._read_ready) 11477db96d56Sopenharmony_ci if waiter is not None: 11487db96d56Sopenharmony_ci # only wake up the waiter when connection_made() has been called 11497db96d56Sopenharmony_ci self._loop.call_soon(futures._set_result_unless_cancelled, 11507db96d56Sopenharmony_ci waiter, None) 11517db96d56Sopenharmony_ci 11527db96d56Sopenharmony_ci def get_write_buffer_size(self): 11537db96d56Sopenharmony_ci return self._buffer_size 11547db96d56Sopenharmony_ci 11557db96d56Sopenharmony_ci def _read_ready(self): 11567db96d56Sopenharmony_ci if self._conn_lost: 11577db96d56Sopenharmony_ci return 11587db96d56Sopenharmony_ci try: 11597db96d56Sopenharmony_ci data, addr = self._sock.recvfrom(self.max_size) 11607db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 11617db96d56Sopenharmony_ci pass 11627db96d56Sopenharmony_ci except OSError as exc: 11637db96d56Sopenharmony_ci self._protocol.error_received(exc) 11647db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 11657db96d56Sopenharmony_ci raise 11667db96d56Sopenharmony_ci except BaseException as exc: 11677db96d56Sopenharmony_ci self._fatal_error(exc, 'Fatal read error on datagram transport') 11687db96d56Sopenharmony_ci else: 11697db96d56Sopenharmony_ci self._protocol.datagram_received(data, addr) 11707db96d56Sopenharmony_ci 11717db96d56Sopenharmony_ci def sendto(self, data, addr=None): 11727db96d56Sopenharmony_ci if not isinstance(data, (bytes, bytearray, memoryview)): 11737db96d56Sopenharmony_ci raise TypeError(f'data argument must be a bytes-like object, ' 11747db96d56Sopenharmony_ci f'not {type(data).__name__!r}') 11757db96d56Sopenharmony_ci if not data: 11767db96d56Sopenharmony_ci return 11777db96d56Sopenharmony_ci 11787db96d56Sopenharmony_ci if self._address: 11797db96d56Sopenharmony_ci if addr not in (None, self._address): 11807db96d56Sopenharmony_ci raise ValueError( 11817db96d56Sopenharmony_ci f'Invalid address: must be None or {self._address}') 11827db96d56Sopenharmony_ci addr = self._address 11837db96d56Sopenharmony_ci 11847db96d56Sopenharmony_ci if self._conn_lost and self._address: 11857db96d56Sopenharmony_ci if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 11867db96d56Sopenharmony_ci logger.warning('socket.send() raised exception.') 11877db96d56Sopenharmony_ci self._conn_lost += 1 11887db96d56Sopenharmony_ci return 11897db96d56Sopenharmony_ci 11907db96d56Sopenharmony_ci if not self._buffer: 11917db96d56Sopenharmony_ci # Attempt to send it right away first. 11927db96d56Sopenharmony_ci try: 11937db96d56Sopenharmony_ci if self._extra['peername']: 11947db96d56Sopenharmony_ci self._sock.send(data) 11957db96d56Sopenharmony_ci else: 11967db96d56Sopenharmony_ci self._sock.sendto(data, addr) 11977db96d56Sopenharmony_ci return 11987db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 11997db96d56Sopenharmony_ci self._loop._add_writer(self._sock_fd, self._sendto_ready) 12007db96d56Sopenharmony_ci except OSError as exc: 12017db96d56Sopenharmony_ci self._protocol.error_received(exc) 12027db96d56Sopenharmony_ci return 12037db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 12047db96d56Sopenharmony_ci raise 12057db96d56Sopenharmony_ci except BaseException as exc: 12067db96d56Sopenharmony_ci self._fatal_error( 12077db96d56Sopenharmony_ci exc, 'Fatal write error on datagram transport') 12087db96d56Sopenharmony_ci return 12097db96d56Sopenharmony_ci 12107db96d56Sopenharmony_ci # Ensure that what we buffer is immutable. 12117db96d56Sopenharmony_ci self._buffer.append((bytes(data), addr)) 12127db96d56Sopenharmony_ci self._buffer_size += len(data) 12137db96d56Sopenharmony_ci self._maybe_pause_protocol() 12147db96d56Sopenharmony_ci 12157db96d56Sopenharmony_ci def _sendto_ready(self): 12167db96d56Sopenharmony_ci while self._buffer: 12177db96d56Sopenharmony_ci data, addr = self._buffer.popleft() 12187db96d56Sopenharmony_ci self._buffer_size -= len(data) 12197db96d56Sopenharmony_ci try: 12207db96d56Sopenharmony_ci if self._extra['peername']: 12217db96d56Sopenharmony_ci self._sock.send(data) 12227db96d56Sopenharmony_ci else: 12237db96d56Sopenharmony_ci self._sock.sendto(data, addr) 12247db96d56Sopenharmony_ci except (BlockingIOError, InterruptedError): 12257db96d56Sopenharmony_ci self._buffer.appendleft((data, addr)) # Try again later. 12267db96d56Sopenharmony_ci self._buffer_size += len(data) 12277db96d56Sopenharmony_ci break 12287db96d56Sopenharmony_ci except OSError as exc: 12297db96d56Sopenharmony_ci self._protocol.error_received(exc) 12307db96d56Sopenharmony_ci return 12317db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 12327db96d56Sopenharmony_ci raise 12337db96d56Sopenharmony_ci except BaseException as exc: 12347db96d56Sopenharmony_ci self._fatal_error( 12357db96d56Sopenharmony_ci exc, 'Fatal write error on datagram transport') 12367db96d56Sopenharmony_ci return 12377db96d56Sopenharmony_ci 12387db96d56Sopenharmony_ci self._maybe_resume_protocol() # May append to buffer. 12397db96d56Sopenharmony_ci if not self._buffer: 12407db96d56Sopenharmony_ci self._loop._remove_writer(self._sock_fd) 12417db96d56Sopenharmony_ci if self._closing: 12427db96d56Sopenharmony_ci self._call_connection_lost(None) 1243