17db96d56Sopenharmony_ci"""Event loop using a proactor and related classes. 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciA proactor is a "notify-on-completion" multiplexer. Currently a 47db96d56Sopenharmony_ciproactor is only implemented on Windows with IOCP. 57db96d56Sopenharmony_ci""" 67db96d56Sopenharmony_ci 77db96d56Sopenharmony_ci__all__ = 'BaseProactorEventLoop', 87db96d56Sopenharmony_ci 97db96d56Sopenharmony_ciimport io 107db96d56Sopenharmony_ciimport os 117db96d56Sopenharmony_ciimport socket 127db96d56Sopenharmony_ciimport warnings 137db96d56Sopenharmony_ciimport signal 147db96d56Sopenharmony_ciimport threading 157db96d56Sopenharmony_ciimport collections 167db96d56Sopenharmony_ci 177db96d56Sopenharmony_cifrom . import base_events 187db96d56Sopenharmony_cifrom . import constants 197db96d56Sopenharmony_cifrom . import futures 207db96d56Sopenharmony_cifrom . import exceptions 217db96d56Sopenharmony_cifrom . import protocols 227db96d56Sopenharmony_cifrom . import sslproto 237db96d56Sopenharmony_cifrom . import transports 247db96d56Sopenharmony_cifrom . import trsock 257db96d56Sopenharmony_cifrom .log import logger 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_cidef _set_socket_extra(transport, sock): 297db96d56Sopenharmony_ci transport._extra['socket'] = trsock.TransportSocket(sock) 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci try: 327db96d56Sopenharmony_ci transport._extra['sockname'] = sock.getsockname() 337db96d56Sopenharmony_ci except socket.error: 347db96d56Sopenharmony_ci if transport._loop.get_debug(): 357db96d56Sopenharmony_ci logger.warning( 367db96d56Sopenharmony_ci "getsockname() failed on %r", sock, exc_info=True) 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ci if 'peername' not in transport._extra: 397db96d56Sopenharmony_ci try: 407db96d56Sopenharmony_ci transport._extra['peername'] = sock.getpeername() 417db96d56Sopenharmony_ci except socket.error: 427db96d56Sopenharmony_ci # UDP sockets may not have a peer name 437db96d56Sopenharmony_ci transport._extra['peername'] = None 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_ciclass _ProactorBasePipeTransport(transports._FlowControlMixin, 477db96d56Sopenharmony_ci transports.BaseTransport): 487db96d56Sopenharmony_ci """Base class for pipe and socket transports.""" 497db96d56Sopenharmony_ci 507db96d56Sopenharmony_ci def __init__(self, loop, sock, protocol, waiter=None, 517db96d56Sopenharmony_ci extra=None, server=None): 527db96d56Sopenharmony_ci super().__init__(extra, loop) 537db96d56Sopenharmony_ci self._set_extra(sock) 547db96d56Sopenharmony_ci self._sock = sock 557db96d56Sopenharmony_ci self.set_protocol(protocol) 567db96d56Sopenharmony_ci self._server = server 577db96d56Sopenharmony_ci self._buffer = None # None or bytearray. 587db96d56Sopenharmony_ci self._read_fut = None 597db96d56Sopenharmony_ci self._write_fut = None 607db96d56Sopenharmony_ci self._pending_write = 0 617db96d56Sopenharmony_ci self._conn_lost = 0 627db96d56Sopenharmony_ci self._closing = False # Set when close() called. 637db96d56Sopenharmony_ci self._called_connection_lost = False 647db96d56Sopenharmony_ci self._eof_written = False 657db96d56Sopenharmony_ci if self._server is not None: 667db96d56Sopenharmony_ci self._server._attach() 677db96d56Sopenharmony_ci self._loop.call_soon(self._protocol.connection_made, self) 687db96d56Sopenharmony_ci if waiter is not None: 697db96d56Sopenharmony_ci # only wake up the waiter when connection_made() has been called 707db96d56Sopenharmony_ci self._loop.call_soon(futures._set_result_unless_cancelled, 717db96d56Sopenharmony_ci waiter, None) 727db96d56Sopenharmony_ci 737db96d56Sopenharmony_ci def __repr__(self): 747db96d56Sopenharmony_ci info = [self.__class__.__name__] 757db96d56Sopenharmony_ci if self._sock is None: 767db96d56Sopenharmony_ci info.append('closed') 777db96d56Sopenharmony_ci elif self._closing: 787db96d56Sopenharmony_ci info.append('closing') 797db96d56Sopenharmony_ci if self._sock is not None: 807db96d56Sopenharmony_ci info.append(f'fd={self._sock.fileno()}') 817db96d56Sopenharmony_ci if self._read_fut is not None: 827db96d56Sopenharmony_ci info.append(f'read={self._read_fut!r}') 837db96d56Sopenharmony_ci if self._write_fut is not None: 847db96d56Sopenharmony_ci info.append(f'write={self._write_fut!r}') 857db96d56Sopenharmony_ci if self._buffer: 867db96d56Sopenharmony_ci info.append(f'write_bufsize={len(self._buffer)}') 877db96d56Sopenharmony_ci if self._eof_written: 887db96d56Sopenharmony_ci info.append('EOF written') 897db96d56Sopenharmony_ci return '<{}>'.format(' '.join(info)) 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci def _set_extra(self, sock): 927db96d56Sopenharmony_ci self._extra['pipe'] = sock 937db96d56Sopenharmony_ci 947db96d56Sopenharmony_ci def set_protocol(self, protocol): 957db96d56Sopenharmony_ci self._protocol = protocol 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_ci def get_protocol(self): 987db96d56Sopenharmony_ci return self._protocol 997db96d56Sopenharmony_ci 1007db96d56Sopenharmony_ci def is_closing(self): 1017db96d56Sopenharmony_ci return self._closing 1027db96d56Sopenharmony_ci 1037db96d56Sopenharmony_ci def close(self): 1047db96d56Sopenharmony_ci if self._closing: 1057db96d56Sopenharmony_ci return 1067db96d56Sopenharmony_ci self._closing = True 1077db96d56Sopenharmony_ci self._conn_lost += 1 1087db96d56Sopenharmony_ci if not self._buffer and self._write_fut is None: 1097db96d56Sopenharmony_ci self._loop.call_soon(self._call_connection_lost, None) 1107db96d56Sopenharmony_ci if self._read_fut is not None: 1117db96d56Sopenharmony_ci self._read_fut.cancel() 1127db96d56Sopenharmony_ci self._read_fut = None 1137db96d56Sopenharmony_ci 1147db96d56Sopenharmony_ci def __del__(self, _warn=warnings.warn): 1157db96d56Sopenharmony_ci if self._sock is not None: 1167db96d56Sopenharmony_ci _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 1177db96d56Sopenharmony_ci self._sock.close() 1187db96d56Sopenharmony_ci 1197db96d56Sopenharmony_ci def _fatal_error(self, exc, message='Fatal error on pipe transport'): 1207db96d56Sopenharmony_ci try: 1217db96d56Sopenharmony_ci if isinstance(exc, OSError): 1227db96d56Sopenharmony_ci if self._loop.get_debug(): 1237db96d56Sopenharmony_ci logger.debug("%r: %s", self, message, exc_info=True) 1247db96d56Sopenharmony_ci else: 1257db96d56Sopenharmony_ci self._loop.call_exception_handler({ 1267db96d56Sopenharmony_ci 'message': message, 1277db96d56Sopenharmony_ci 'exception': exc, 1287db96d56Sopenharmony_ci 'transport': self, 1297db96d56Sopenharmony_ci 'protocol': self._protocol, 1307db96d56Sopenharmony_ci }) 1317db96d56Sopenharmony_ci finally: 1327db96d56Sopenharmony_ci self._force_close(exc) 1337db96d56Sopenharmony_ci 1347db96d56Sopenharmony_ci def _force_close(self, exc): 1357db96d56Sopenharmony_ci if self._empty_waiter is not None and not self._empty_waiter.done(): 1367db96d56Sopenharmony_ci if exc is None: 1377db96d56Sopenharmony_ci self._empty_waiter.set_result(None) 1387db96d56Sopenharmony_ci else: 1397db96d56Sopenharmony_ci self._empty_waiter.set_exception(exc) 1407db96d56Sopenharmony_ci if self._closing and self._called_connection_lost: 1417db96d56Sopenharmony_ci return 1427db96d56Sopenharmony_ci self._closing = True 1437db96d56Sopenharmony_ci self._conn_lost += 1 1447db96d56Sopenharmony_ci if self._write_fut: 1457db96d56Sopenharmony_ci self._write_fut.cancel() 1467db96d56Sopenharmony_ci self._write_fut = None 1477db96d56Sopenharmony_ci if self._read_fut: 1487db96d56Sopenharmony_ci self._read_fut.cancel() 1497db96d56Sopenharmony_ci self._read_fut = None 1507db96d56Sopenharmony_ci self._pending_write = 0 1517db96d56Sopenharmony_ci self._buffer = None 1527db96d56Sopenharmony_ci self._loop.call_soon(self._call_connection_lost, exc) 1537db96d56Sopenharmony_ci 1547db96d56Sopenharmony_ci def _call_connection_lost(self, exc): 1557db96d56Sopenharmony_ci if self._called_connection_lost: 1567db96d56Sopenharmony_ci return 1577db96d56Sopenharmony_ci try: 1587db96d56Sopenharmony_ci self._protocol.connection_lost(exc) 1597db96d56Sopenharmony_ci finally: 1607db96d56Sopenharmony_ci # XXX If there is a pending overlapped read on the other 1617db96d56Sopenharmony_ci # end then it may fail with ERROR_NETNAME_DELETED if we 1627db96d56Sopenharmony_ci # just close our end. First calling shutdown() seems to 1637db96d56Sopenharmony_ci # cure it, but maybe using DisconnectEx() would be better. 1647db96d56Sopenharmony_ci if hasattr(self._sock, 'shutdown') and self._sock.fileno() != -1: 1657db96d56Sopenharmony_ci self._sock.shutdown(socket.SHUT_RDWR) 1667db96d56Sopenharmony_ci self._sock.close() 1677db96d56Sopenharmony_ci self._sock = None 1687db96d56Sopenharmony_ci server = self._server 1697db96d56Sopenharmony_ci if server is not None: 1707db96d56Sopenharmony_ci server._detach() 1717db96d56Sopenharmony_ci self._server = None 1727db96d56Sopenharmony_ci self._called_connection_lost = True 1737db96d56Sopenharmony_ci 1747db96d56Sopenharmony_ci def get_write_buffer_size(self): 1757db96d56Sopenharmony_ci size = self._pending_write 1767db96d56Sopenharmony_ci if self._buffer is not None: 1777db96d56Sopenharmony_ci size += len(self._buffer) 1787db96d56Sopenharmony_ci return size 1797db96d56Sopenharmony_ci 1807db96d56Sopenharmony_ci 1817db96d56Sopenharmony_ciclass _ProactorReadPipeTransport(_ProactorBasePipeTransport, 1827db96d56Sopenharmony_ci transports.ReadTransport): 1837db96d56Sopenharmony_ci """Transport for read pipes.""" 1847db96d56Sopenharmony_ci 1857db96d56Sopenharmony_ci def __init__(self, loop, sock, protocol, waiter=None, 1867db96d56Sopenharmony_ci extra=None, server=None, buffer_size=65536): 1877db96d56Sopenharmony_ci self._pending_data_length = -1 1887db96d56Sopenharmony_ci self._paused = True 1897db96d56Sopenharmony_ci super().__init__(loop, sock, protocol, waiter, extra, server) 1907db96d56Sopenharmony_ci 1917db96d56Sopenharmony_ci self._data = bytearray(buffer_size) 1927db96d56Sopenharmony_ci self._loop.call_soon(self._loop_reading) 1937db96d56Sopenharmony_ci self._paused = False 1947db96d56Sopenharmony_ci 1957db96d56Sopenharmony_ci def is_reading(self): 1967db96d56Sopenharmony_ci return not self._paused and not self._closing 1977db96d56Sopenharmony_ci 1987db96d56Sopenharmony_ci def pause_reading(self): 1997db96d56Sopenharmony_ci if self._closing or self._paused: 2007db96d56Sopenharmony_ci return 2017db96d56Sopenharmony_ci self._paused = True 2027db96d56Sopenharmony_ci 2037db96d56Sopenharmony_ci # bpo-33694: Don't cancel self._read_fut because cancelling an 2047db96d56Sopenharmony_ci # overlapped WSASend() loss silently data with the current proactor 2057db96d56Sopenharmony_ci # implementation. 2067db96d56Sopenharmony_ci # 2077db96d56Sopenharmony_ci # If CancelIoEx() fails with ERROR_NOT_FOUND, it means that WSASend() 2087db96d56Sopenharmony_ci # completed (even if HasOverlappedIoCompleted() returns 0), but 2097db96d56Sopenharmony_ci # Overlapped.cancel() currently silently ignores the ERROR_NOT_FOUND 2107db96d56Sopenharmony_ci # error. Once the overlapped is ignored, the IOCP loop will ignores the 2117db96d56Sopenharmony_ci # completion I/O event and so not read the result of the overlapped 2127db96d56Sopenharmony_ci # WSARecv(). 2137db96d56Sopenharmony_ci 2147db96d56Sopenharmony_ci if self._loop.get_debug(): 2157db96d56Sopenharmony_ci logger.debug("%r pauses reading", self) 2167db96d56Sopenharmony_ci 2177db96d56Sopenharmony_ci def resume_reading(self): 2187db96d56Sopenharmony_ci if self._closing or not self._paused: 2197db96d56Sopenharmony_ci return 2207db96d56Sopenharmony_ci 2217db96d56Sopenharmony_ci self._paused = False 2227db96d56Sopenharmony_ci if self._read_fut is None: 2237db96d56Sopenharmony_ci self._loop.call_soon(self._loop_reading, None) 2247db96d56Sopenharmony_ci 2257db96d56Sopenharmony_ci length = self._pending_data_length 2267db96d56Sopenharmony_ci self._pending_data_length = -1 2277db96d56Sopenharmony_ci if length > -1: 2287db96d56Sopenharmony_ci # Call the protocol method after calling _loop_reading(), 2297db96d56Sopenharmony_ci # since the protocol can decide to pause reading again. 2307db96d56Sopenharmony_ci self._loop.call_soon(self._data_received, self._data[:length], length) 2317db96d56Sopenharmony_ci 2327db96d56Sopenharmony_ci if self._loop.get_debug(): 2337db96d56Sopenharmony_ci logger.debug("%r resumes reading", self) 2347db96d56Sopenharmony_ci 2357db96d56Sopenharmony_ci def _eof_received(self): 2367db96d56Sopenharmony_ci if self._loop.get_debug(): 2377db96d56Sopenharmony_ci logger.debug("%r received EOF", self) 2387db96d56Sopenharmony_ci 2397db96d56Sopenharmony_ci try: 2407db96d56Sopenharmony_ci keep_open = self._protocol.eof_received() 2417db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 2427db96d56Sopenharmony_ci raise 2437db96d56Sopenharmony_ci except BaseException as exc: 2447db96d56Sopenharmony_ci self._fatal_error( 2457db96d56Sopenharmony_ci exc, 'Fatal error: protocol.eof_received() call failed.') 2467db96d56Sopenharmony_ci return 2477db96d56Sopenharmony_ci 2487db96d56Sopenharmony_ci if not keep_open: 2497db96d56Sopenharmony_ci self.close() 2507db96d56Sopenharmony_ci 2517db96d56Sopenharmony_ci def _data_received(self, data, length): 2527db96d56Sopenharmony_ci if self._paused: 2537db96d56Sopenharmony_ci # Don't call any protocol method while reading is paused. 2547db96d56Sopenharmony_ci # The protocol will be called on resume_reading(). 2557db96d56Sopenharmony_ci assert self._pending_data_length == -1 2567db96d56Sopenharmony_ci self._pending_data_length = length 2577db96d56Sopenharmony_ci return 2587db96d56Sopenharmony_ci 2597db96d56Sopenharmony_ci if length == 0: 2607db96d56Sopenharmony_ci self._eof_received() 2617db96d56Sopenharmony_ci return 2627db96d56Sopenharmony_ci 2637db96d56Sopenharmony_ci if isinstance(self._protocol, protocols.BufferedProtocol): 2647db96d56Sopenharmony_ci try: 2657db96d56Sopenharmony_ci protocols._feed_data_to_buffered_proto(self._protocol, data) 2667db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 2677db96d56Sopenharmony_ci raise 2687db96d56Sopenharmony_ci except BaseException as exc: 2697db96d56Sopenharmony_ci self._fatal_error(exc, 2707db96d56Sopenharmony_ci 'Fatal error: protocol.buffer_updated() ' 2717db96d56Sopenharmony_ci 'call failed.') 2727db96d56Sopenharmony_ci return 2737db96d56Sopenharmony_ci else: 2747db96d56Sopenharmony_ci self._protocol.data_received(data) 2757db96d56Sopenharmony_ci 2767db96d56Sopenharmony_ci def _loop_reading(self, fut=None): 2777db96d56Sopenharmony_ci length = -1 2787db96d56Sopenharmony_ci data = None 2797db96d56Sopenharmony_ci try: 2807db96d56Sopenharmony_ci if fut is not None: 2817db96d56Sopenharmony_ci assert self._read_fut is fut or (self._read_fut is None and 2827db96d56Sopenharmony_ci self._closing) 2837db96d56Sopenharmony_ci self._read_fut = None 2847db96d56Sopenharmony_ci if fut.done(): 2857db96d56Sopenharmony_ci # deliver data later in "finally" clause 2867db96d56Sopenharmony_ci length = fut.result() 2877db96d56Sopenharmony_ci if length == 0: 2887db96d56Sopenharmony_ci # we got end-of-file so no need to reschedule a new read 2897db96d56Sopenharmony_ci return 2907db96d56Sopenharmony_ci 2917db96d56Sopenharmony_ci data = self._data[:length] 2927db96d56Sopenharmony_ci else: 2937db96d56Sopenharmony_ci # the future will be replaced by next proactor.recv call 2947db96d56Sopenharmony_ci fut.cancel() 2957db96d56Sopenharmony_ci 2967db96d56Sopenharmony_ci if self._closing: 2977db96d56Sopenharmony_ci # since close() has been called we ignore any read data 2987db96d56Sopenharmony_ci return 2997db96d56Sopenharmony_ci 3007db96d56Sopenharmony_ci # bpo-33694: buffer_updated() has currently no fast path because of 3017db96d56Sopenharmony_ci # a data loss issue caused by overlapped WSASend() cancellation. 3027db96d56Sopenharmony_ci 3037db96d56Sopenharmony_ci if not self._paused: 3047db96d56Sopenharmony_ci # reschedule a new read 3057db96d56Sopenharmony_ci self._read_fut = self._loop._proactor.recv_into(self._sock, self._data) 3067db96d56Sopenharmony_ci except ConnectionAbortedError as exc: 3077db96d56Sopenharmony_ci if not self._closing: 3087db96d56Sopenharmony_ci self._fatal_error(exc, 'Fatal read error on pipe transport') 3097db96d56Sopenharmony_ci elif self._loop.get_debug(): 3107db96d56Sopenharmony_ci logger.debug("Read error on pipe transport while closing", 3117db96d56Sopenharmony_ci exc_info=True) 3127db96d56Sopenharmony_ci except ConnectionResetError as exc: 3137db96d56Sopenharmony_ci self._force_close(exc) 3147db96d56Sopenharmony_ci except OSError as exc: 3157db96d56Sopenharmony_ci self._fatal_error(exc, 'Fatal read error on pipe transport') 3167db96d56Sopenharmony_ci except exceptions.CancelledError: 3177db96d56Sopenharmony_ci if not self._closing: 3187db96d56Sopenharmony_ci raise 3197db96d56Sopenharmony_ci else: 3207db96d56Sopenharmony_ci if not self._paused: 3217db96d56Sopenharmony_ci self._read_fut.add_done_callback(self._loop_reading) 3227db96d56Sopenharmony_ci finally: 3237db96d56Sopenharmony_ci if length > -1: 3247db96d56Sopenharmony_ci self._data_received(data, length) 3257db96d56Sopenharmony_ci 3267db96d56Sopenharmony_ci 3277db96d56Sopenharmony_ciclass _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, 3287db96d56Sopenharmony_ci transports.WriteTransport): 3297db96d56Sopenharmony_ci """Transport for write pipes.""" 3307db96d56Sopenharmony_ci 3317db96d56Sopenharmony_ci _start_tls_compatible = True 3327db96d56Sopenharmony_ci 3337db96d56Sopenharmony_ci def __init__(self, *args, **kw): 3347db96d56Sopenharmony_ci super().__init__(*args, **kw) 3357db96d56Sopenharmony_ci self._empty_waiter = None 3367db96d56Sopenharmony_ci 3377db96d56Sopenharmony_ci def write(self, data): 3387db96d56Sopenharmony_ci if not isinstance(data, (bytes, bytearray, memoryview)): 3397db96d56Sopenharmony_ci raise TypeError( 3407db96d56Sopenharmony_ci f"data argument must be a bytes-like object, " 3417db96d56Sopenharmony_ci f"not {type(data).__name__}") 3427db96d56Sopenharmony_ci if self._eof_written: 3437db96d56Sopenharmony_ci raise RuntimeError('write_eof() already called') 3447db96d56Sopenharmony_ci if self._empty_waiter is not None: 3457db96d56Sopenharmony_ci raise RuntimeError('unable to write; sendfile is in progress') 3467db96d56Sopenharmony_ci 3477db96d56Sopenharmony_ci if not data: 3487db96d56Sopenharmony_ci return 3497db96d56Sopenharmony_ci 3507db96d56Sopenharmony_ci if self._conn_lost: 3517db96d56Sopenharmony_ci if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 3527db96d56Sopenharmony_ci logger.warning('socket.send() raised exception.') 3537db96d56Sopenharmony_ci self._conn_lost += 1 3547db96d56Sopenharmony_ci return 3557db96d56Sopenharmony_ci 3567db96d56Sopenharmony_ci # Observable states: 3577db96d56Sopenharmony_ci # 1. IDLE: _write_fut and _buffer both None 3587db96d56Sopenharmony_ci # 2. WRITING: _write_fut set; _buffer None 3597db96d56Sopenharmony_ci # 3. BACKED UP: _write_fut set; _buffer a bytearray 3607db96d56Sopenharmony_ci # We always copy the data, so the caller can't modify it 3617db96d56Sopenharmony_ci # while we're still waiting for the I/O to happen. 3627db96d56Sopenharmony_ci if self._write_fut is None: # IDLE -> WRITING 3637db96d56Sopenharmony_ci assert self._buffer is None 3647db96d56Sopenharmony_ci # Pass a copy, except if it's already immutable. 3657db96d56Sopenharmony_ci self._loop_writing(data=bytes(data)) 3667db96d56Sopenharmony_ci elif not self._buffer: # WRITING -> BACKED UP 3677db96d56Sopenharmony_ci # Make a mutable copy which we can extend. 3687db96d56Sopenharmony_ci self._buffer = bytearray(data) 3697db96d56Sopenharmony_ci self._maybe_pause_protocol() 3707db96d56Sopenharmony_ci else: # BACKED UP 3717db96d56Sopenharmony_ci # Append to buffer (also copies). 3727db96d56Sopenharmony_ci self._buffer.extend(data) 3737db96d56Sopenharmony_ci self._maybe_pause_protocol() 3747db96d56Sopenharmony_ci 3757db96d56Sopenharmony_ci def _loop_writing(self, f=None, data=None): 3767db96d56Sopenharmony_ci try: 3777db96d56Sopenharmony_ci if f is not None and self._write_fut is None and self._closing: 3787db96d56Sopenharmony_ci # XXX most likely self._force_close() has been called, and 3797db96d56Sopenharmony_ci # it has set self._write_fut to None. 3807db96d56Sopenharmony_ci return 3817db96d56Sopenharmony_ci assert f is self._write_fut 3827db96d56Sopenharmony_ci self._write_fut = None 3837db96d56Sopenharmony_ci self._pending_write = 0 3847db96d56Sopenharmony_ci if f: 3857db96d56Sopenharmony_ci f.result() 3867db96d56Sopenharmony_ci if data is None: 3877db96d56Sopenharmony_ci data = self._buffer 3887db96d56Sopenharmony_ci self._buffer = None 3897db96d56Sopenharmony_ci if not data: 3907db96d56Sopenharmony_ci if self._closing: 3917db96d56Sopenharmony_ci self._loop.call_soon(self._call_connection_lost, None) 3927db96d56Sopenharmony_ci if self._eof_written: 3937db96d56Sopenharmony_ci self._sock.shutdown(socket.SHUT_WR) 3947db96d56Sopenharmony_ci # Now that we've reduced the buffer size, tell the 3957db96d56Sopenharmony_ci # protocol to resume writing if it was paused. Note that 3967db96d56Sopenharmony_ci # we do this last since the callback is called immediately 3977db96d56Sopenharmony_ci # and it may add more data to the buffer (even causing the 3987db96d56Sopenharmony_ci # protocol to be paused again). 3997db96d56Sopenharmony_ci self._maybe_resume_protocol() 4007db96d56Sopenharmony_ci else: 4017db96d56Sopenharmony_ci self._write_fut = self._loop._proactor.send(self._sock, data) 4027db96d56Sopenharmony_ci if not self._write_fut.done(): 4037db96d56Sopenharmony_ci assert self._pending_write == 0 4047db96d56Sopenharmony_ci self._pending_write = len(data) 4057db96d56Sopenharmony_ci self._write_fut.add_done_callback(self._loop_writing) 4067db96d56Sopenharmony_ci self._maybe_pause_protocol() 4077db96d56Sopenharmony_ci else: 4087db96d56Sopenharmony_ci self._write_fut.add_done_callback(self._loop_writing) 4097db96d56Sopenharmony_ci if self._empty_waiter is not None and self._write_fut is None: 4107db96d56Sopenharmony_ci self._empty_waiter.set_result(None) 4117db96d56Sopenharmony_ci except ConnectionResetError as exc: 4127db96d56Sopenharmony_ci self._force_close(exc) 4137db96d56Sopenharmony_ci except OSError as exc: 4147db96d56Sopenharmony_ci self._fatal_error(exc, 'Fatal write error on pipe transport') 4157db96d56Sopenharmony_ci 4167db96d56Sopenharmony_ci def can_write_eof(self): 4177db96d56Sopenharmony_ci return True 4187db96d56Sopenharmony_ci 4197db96d56Sopenharmony_ci def write_eof(self): 4207db96d56Sopenharmony_ci self.close() 4217db96d56Sopenharmony_ci 4227db96d56Sopenharmony_ci def abort(self): 4237db96d56Sopenharmony_ci self._force_close(None) 4247db96d56Sopenharmony_ci 4257db96d56Sopenharmony_ci def _make_empty_waiter(self): 4267db96d56Sopenharmony_ci if self._empty_waiter is not None: 4277db96d56Sopenharmony_ci raise RuntimeError("Empty waiter is already set") 4287db96d56Sopenharmony_ci self._empty_waiter = self._loop.create_future() 4297db96d56Sopenharmony_ci if self._write_fut is None: 4307db96d56Sopenharmony_ci self._empty_waiter.set_result(None) 4317db96d56Sopenharmony_ci return self._empty_waiter 4327db96d56Sopenharmony_ci 4337db96d56Sopenharmony_ci def _reset_empty_waiter(self): 4347db96d56Sopenharmony_ci self._empty_waiter = None 4357db96d56Sopenharmony_ci 4367db96d56Sopenharmony_ci 4377db96d56Sopenharmony_ciclass _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport): 4387db96d56Sopenharmony_ci def __init__(self, *args, **kw): 4397db96d56Sopenharmony_ci super().__init__(*args, **kw) 4407db96d56Sopenharmony_ci self._read_fut = self._loop._proactor.recv(self._sock, 16) 4417db96d56Sopenharmony_ci self._read_fut.add_done_callback(self._pipe_closed) 4427db96d56Sopenharmony_ci 4437db96d56Sopenharmony_ci def _pipe_closed(self, fut): 4447db96d56Sopenharmony_ci if fut.cancelled(): 4457db96d56Sopenharmony_ci # the transport has been closed 4467db96d56Sopenharmony_ci return 4477db96d56Sopenharmony_ci assert fut.result() == b'' 4487db96d56Sopenharmony_ci if self._closing: 4497db96d56Sopenharmony_ci assert self._read_fut is None 4507db96d56Sopenharmony_ci return 4517db96d56Sopenharmony_ci assert fut is self._read_fut, (fut, self._read_fut) 4527db96d56Sopenharmony_ci self._read_fut = None 4537db96d56Sopenharmony_ci if self._write_fut is not None: 4547db96d56Sopenharmony_ci self._force_close(BrokenPipeError()) 4557db96d56Sopenharmony_ci else: 4567db96d56Sopenharmony_ci self.close() 4577db96d56Sopenharmony_ci 4587db96d56Sopenharmony_ci 4597db96d56Sopenharmony_ciclass _ProactorDatagramTransport(_ProactorBasePipeTransport, 4607db96d56Sopenharmony_ci transports.DatagramTransport): 4617db96d56Sopenharmony_ci max_size = 256 * 1024 4627db96d56Sopenharmony_ci def __init__(self, loop, sock, protocol, address=None, 4637db96d56Sopenharmony_ci waiter=None, extra=None): 4647db96d56Sopenharmony_ci self._address = address 4657db96d56Sopenharmony_ci self._empty_waiter = None 4667db96d56Sopenharmony_ci self._buffer_size = 0 4677db96d56Sopenharmony_ci # We don't need to call _protocol.connection_made() since our base 4687db96d56Sopenharmony_ci # constructor does it for us. 4697db96d56Sopenharmony_ci super().__init__(loop, sock, protocol, waiter=waiter, extra=extra) 4707db96d56Sopenharmony_ci 4717db96d56Sopenharmony_ci # The base constructor sets _buffer = None, so we set it here 4727db96d56Sopenharmony_ci self._buffer = collections.deque() 4737db96d56Sopenharmony_ci self._loop.call_soon(self._loop_reading) 4747db96d56Sopenharmony_ci 4757db96d56Sopenharmony_ci def _set_extra(self, sock): 4767db96d56Sopenharmony_ci _set_socket_extra(self, sock) 4777db96d56Sopenharmony_ci 4787db96d56Sopenharmony_ci def get_write_buffer_size(self): 4797db96d56Sopenharmony_ci return self._buffer_size 4807db96d56Sopenharmony_ci 4817db96d56Sopenharmony_ci def abort(self): 4827db96d56Sopenharmony_ci self._force_close(None) 4837db96d56Sopenharmony_ci 4847db96d56Sopenharmony_ci def sendto(self, data, addr=None): 4857db96d56Sopenharmony_ci if not isinstance(data, (bytes, bytearray, memoryview)): 4867db96d56Sopenharmony_ci raise TypeError('data argument must be bytes-like object (%r)', 4877db96d56Sopenharmony_ci type(data)) 4887db96d56Sopenharmony_ci 4897db96d56Sopenharmony_ci if not data: 4907db96d56Sopenharmony_ci return 4917db96d56Sopenharmony_ci 4927db96d56Sopenharmony_ci if self._address is not None and addr not in (None, self._address): 4937db96d56Sopenharmony_ci raise ValueError( 4947db96d56Sopenharmony_ci f'Invalid address: must be None or {self._address}') 4957db96d56Sopenharmony_ci 4967db96d56Sopenharmony_ci if self._conn_lost and self._address: 4977db96d56Sopenharmony_ci if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 4987db96d56Sopenharmony_ci logger.warning('socket.sendto() raised exception.') 4997db96d56Sopenharmony_ci self._conn_lost += 1 5007db96d56Sopenharmony_ci return 5017db96d56Sopenharmony_ci 5027db96d56Sopenharmony_ci # Ensure that what we buffer is immutable. 5037db96d56Sopenharmony_ci self._buffer.append((bytes(data), addr)) 5047db96d56Sopenharmony_ci self._buffer_size += len(data) 5057db96d56Sopenharmony_ci 5067db96d56Sopenharmony_ci if self._write_fut is None: 5077db96d56Sopenharmony_ci # No current write operations are active, kick one off 5087db96d56Sopenharmony_ci self._loop_writing() 5097db96d56Sopenharmony_ci # else: A write operation is already kicked off 5107db96d56Sopenharmony_ci 5117db96d56Sopenharmony_ci self._maybe_pause_protocol() 5127db96d56Sopenharmony_ci 5137db96d56Sopenharmony_ci def _loop_writing(self, fut=None): 5147db96d56Sopenharmony_ci try: 5157db96d56Sopenharmony_ci if self._conn_lost: 5167db96d56Sopenharmony_ci return 5177db96d56Sopenharmony_ci 5187db96d56Sopenharmony_ci assert fut is self._write_fut 5197db96d56Sopenharmony_ci self._write_fut = None 5207db96d56Sopenharmony_ci if fut: 5217db96d56Sopenharmony_ci # We are in a _loop_writing() done callback, get the result 5227db96d56Sopenharmony_ci fut.result() 5237db96d56Sopenharmony_ci 5247db96d56Sopenharmony_ci if not self._buffer or (self._conn_lost and self._address): 5257db96d56Sopenharmony_ci # The connection has been closed 5267db96d56Sopenharmony_ci if self._closing: 5277db96d56Sopenharmony_ci self._loop.call_soon(self._call_connection_lost, None) 5287db96d56Sopenharmony_ci return 5297db96d56Sopenharmony_ci 5307db96d56Sopenharmony_ci data, addr = self._buffer.popleft() 5317db96d56Sopenharmony_ci self._buffer_size -= len(data) 5327db96d56Sopenharmony_ci if self._address is not None: 5337db96d56Sopenharmony_ci self._write_fut = self._loop._proactor.send(self._sock, 5347db96d56Sopenharmony_ci data) 5357db96d56Sopenharmony_ci else: 5367db96d56Sopenharmony_ci self._write_fut = self._loop._proactor.sendto(self._sock, 5377db96d56Sopenharmony_ci data, 5387db96d56Sopenharmony_ci addr=addr) 5397db96d56Sopenharmony_ci except OSError as exc: 5407db96d56Sopenharmony_ci self._protocol.error_received(exc) 5417db96d56Sopenharmony_ci except Exception as exc: 5427db96d56Sopenharmony_ci self._fatal_error(exc, 'Fatal write error on datagram transport') 5437db96d56Sopenharmony_ci else: 5447db96d56Sopenharmony_ci self._write_fut.add_done_callback(self._loop_writing) 5457db96d56Sopenharmony_ci self._maybe_resume_protocol() 5467db96d56Sopenharmony_ci 5477db96d56Sopenharmony_ci def _loop_reading(self, fut=None): 5487db96d56Sopenharmony_ci data = None 5497db96d56Sopenharmony_ci try: 5507db96d56Sopenharmony_ci if self._conn_lost: 5517db96d56Sopenharmony_ci return 5527db96d56Sopenharmony_ci 5537db96d56Sopenharmony_ci assert self._read_fut is fut or (self._read_fut is None and 5547db96d56Sopenharmony_ci self._closing) 5557db96d56Sopenharmony_ci 5567db96d56Sopenharmony_ci self._read_fut = None 5577db96d56Sopenharmony_ci if fut is not None: 5587db96d56Sopenharmony_ci res = fut.result() 5597db96d56Sopenharmony_ci 5607db96d56Sopenharmony_ci if self._closing: 5617db96d56Sopenharmony_ci # since close() has been called we ignore any read data 5627db96d56Sopenharmony_ci data = None 5637db96d56Sopenharmony_ci return 5647db96d56Sopenharmony_ci 5657db96d56Sopenharmony_ci if self._address is not None: 5667db96d56Sopenharmony_ci data, addr = res, self._address 5677db96d56Sopenharmony_ci else: 5687db96d56Sopenharmony_ci data, addr = res 5697db96d56Sopenharmony_ci 5707db96d56Sopenharmony_ci if self._conn_lost: 5717db96d56Sopenharmony_ci return 5727db96d56Sopenharmony_ci if self._address is not None: 5737db96d56Sopenharmony_ci self._read_fut = self._loop._proactor.recv(self._sock, 5747db96d56Sopenharmony_ci self.max_size) 5757db96d56Sopenharmony_ci else: 5767db96d56Sopenharmony_ci self._read_fut = self._loop._proactor.recvfrom(self._sock, 5777db96d56Sopenharmony_ci self.max_size) 5787db96d56Sopenharmony_ci except OSError as exc: 5797db96d56Sopenharmony_ci self._protocol.error_received(exc) 5807db96d56Sopenharmony_ci except exceptions.CancelledError: 5817db96d56Sopenharmony_ci if not self._closing: 5827db96d56Sopenharmony_ci raise 5837db96d56Sopenharmony_ci else: 5847db96d56Sopenharmony_ci if self._read_fut is not None: 5857db96d56Sopenharmony_ci self._read_fut.add_done_callback(self._loop_reading) 5867db96d56Sopenharmony_ci finally: 5877db96d56Sopenharmony_ci if data: 5887db96d56Sopenharmony_ci self._protocol.datagram_received(data, addr) 5897db96d56Sopenharmony_ci 5907db96d56Sopenharmony_ci 5917db96d56Sopenharmony_ciclass _ProactorDuplexPipeTransport(_ProactorReadPipeTransport, 5927db96d56Sopenharmony_ci _ProactorBaseWritePipeTransport, 5937db96d56Sopenharmony_ci transports.Transport): 5947db96d56Sopenharmony_ci """Transport for duplex pipes.""" 5957db96d56Sopenharmony_ci 5967db96d56Sopenharmony_ci def can_write_eof(self): 5977db96d56Sopenharmony_ci return False 5987db96d56Sopenharmony_ci 5997db96d56Sopenharmony_ci def write_eof(self): 6007db96d56Sopenharmony_ci raise NotImplementedError 6017db96d56Sopenharmony_ci 6027db96d56Sopenharmony_ci 6037db96d56Sopenharmony_ciclass _ProactorSocketTransport(_ProactorReadPipeTransport, 6047db96d56Sopenharmony_ci _ProactorBaseWritePipeTransport, 6057db96d56Sopenharmony_ci transports.Transport): 6067db96d56Sopenharmony_ci """Transport for connected sockets.""" 6077db96d56Sopenharmony_ci 6087db96d56Sopenharmony_ci _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 6097db96d56Sopenharmony_ci 6107db96d56Sopenharmony_ci def __init__(self, loop, sock, protocol, waiter=None, 6117db96d56Sopenharmony_ci extra=None, server=None): 6127db96d56Sopenharmony_ci super().__init__(loop, sock, protocol, waiter, extra, server) 6137db96d56Sopenharmony_ci base_events._set_nodelay(sock) 6147db96d56Sopenharmony_ci 6157db96d56Sopenharmony_ci def _set_extra(self, sock): 6167db96d56Sopenharmony_ci _set_socket_extra(self, sock) 6177db96d56Sopenharmony_ci 6187db96d56Sopenharmony_ci def can_write_eof(self): 6197db96d56Sopenharmony_ci return True 6207db96d56Sopenharmony_ci 6217db96d56Sopenharmony_ci def write_eof(self): 6227db96d56Sopenharmony_ci if self._closing or self._eof_written: 6237db96d56Sopenharmony_ci return 6247db96d56Sopenharmony_ci self._eof_written = True 6257db96d56Sopenharmony_ci if self._write_fut is None: 6267db96d56Sopenharmony_ci self._sock.shutdown(socket.SHUT_WR) 6277db96d56Sopenharmony_ci 6287db96d56Sopenharmony_ci 6297db96d56Sopenharmony_ciclass BaseProactorEventLoop(base_events.BaseEventLoop): 6307db96d56Sopenharmony_ci 6317db96d56Sopenharmony_ci def __init__(self, proactor): 6327db96d56Sopenharmony_ci super().__init__() 6337db96d56Sopenharmony_ci logger.debug('Using proactor: %s', proactor.__class__.__name__) 6347db96d56Sopenharmony_ci self._proactor = proactor 6357db96d56Sopenharmony_ci self._selector = proactor # convenient alias 6367db96d56Sopenharmony_ci self._self_reading_future = None 6377db96d56Sopenharmony_ci self._accept_futures = {} # socket file descriptor => Future 6387db96d56Sopenharmony_ci proactor.set_loop(self) 6397db96d56Sopenharmony_ci self._make_self_pipe() 6407db96d56Sopenharmony_ci if threading.current_thread() is threading.main_thread(): 6417db96d56Sopenharmony_ci # wakeup fd can only be installed to a file descriptor from the main thread 6427db96d56Sopenharmony_ci signal.set_wakeup_fd(self._csock.fileno()) 6437db96d56Sopenharmony_ci 6447db96d56Sopenharmony_ci def _make_socket_transport(self, sock, protocol, waiter=None, 6457db96d56Sopenharmony_ci extra=None, server=None): 6467db96d56Sopenharmony_ci return _ProactorSocketTransport(self, sock, protocol, waiter, 6477db96d56Sopenharmony_ci extra, server) 6487db96d56Sopenharmony_ci 6497db96d56Sopenharmony_ci def _make_ssl_transport( 6507db96d56Sopenharmony_ci self, rawsock, protocol, sslcontext, waiter=None, 6517db96d56Sopenharmony_ci *, server_side=False, server_hostname=None, 6527db96d56Sopenharmony_ci extra=None, server=None, 6537db96d56Sopenharmony_ci ssl_handshake_timeout=None, 6547db96d56Sopenharmony_ci ssl_shutdown_timeout=None): 6557db96d56Sopenharmony_ci ssl_protocol = sslproto.SSLProtocol( 6567db96d56Sopenharmony_ci self, protocol, sslcontext, waiter, 6577db96d56Sopenharmony_ci server_side, server_hostname, 6587db96d56Sopenharmony_ci ssl_handshake_timeout=ssl_handshake_timeout, 6597db96d56Sopenharmony_ci ssl_shutdown_timeout=ssl_shutdown_timeout) 6607db96d56Sopenharmony_ci _ProactorSocketTransport(self, rawsock, ssl_protocol, 6617db96d56Sopenharmony_ci extra=extra, server=server) 6627db96d56Sopenharmony_ci return ssl_protocol._app_transport 6637db96d56Sopenharmony_ci 6647db96d56Sopenharmony_ci def _make_datagram_transport(self, sock, protocol, 6657db96d56Sopenharmony_ci address=None, waiter=None, extra=None): 6667db96d56Sopenharmony_ci return _ProactorDatagramTransport(self, sock, protocol, address, 6677db96d56Sopenharmony_ci waiter, extra) 6687db96d56Sopenharmony_ci 6697db96d56Sopenharmony_ci def _make_duplex_pipe_transport(self, sock, protocol, waiter=None, 6707db96d56Sopenharmony_ci extra=None): 6717db96d56Sopenharmony_ci return _ProactorDuplexPipeTransport(self, 6727db96d56Sopenharmony_ci sock, protocol, waiter, extra) 6737db96d56Sopenharmony_ci 6747db96d56Sopenharmony_ci def _make_read_pipe_transport(self, sock, protocol, waiter=None, 6757db96d56Sopenharmony_ci extra=None): 6767db96d56Sopenharmony_ci return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra) 6777db96d56Sopenharmony_ci 6787db96d56Sopenharmony_ci def _make_write_pipe_transport(self, sock, protocol, waiter=None, 6797db96d56Sopenharmony_ci extra=None): 6807db96d56Sopenharmony_ci # We want connection_lost() to be called when other end closes 6817db96d56Sopenharmony_ci return _ProactorWritePipeTransport(self, 6827db96d56Sopenharmony_ci sock, protocol, waiter, extra) 6837db96d56Sopenharmony_ci 6847db96d56Sopenharmony_ci def close(self): 6857db96d56Sopenharmony_ci if self.is_running(): 6867db96d56Sopenharmony_ci raise RuntimeError("Cannot close a running event loop") 6877db96d56Sopenharmony_ci if self.is_closed(): 6887db96d56Sopenharmony_ci return 6897db96d56Sopenharmony_ci 6907db96d56Sopenharmony_ci if threading.current_thread() is threading.main_thread(): 6917db96d56Sopenharmony_ci signal.set_wakeup_fd(-1) 6927db96d56Sopenharmony_ci # Call these methods before closing the event loop (before calling 6937db96d56Sopenharmony_ci # BaseEventLoop.close), because they can schedule callbacks with 6947db96d56Sopenharmony_ci # call_soon(), which is forbidden when the event loop is closed. 6957db96d56Sopenharmony_ci self._stop_accept_futures() 6967db96d56Sopenharmony_ci self._close_self_pipe() 6977db96d56Sopenharmony_ci self._proactor.close() 6987db96d56Sopenharmony_ci self._proactor = None 6997db96d56Sopenharmony_ci self._selector = None 7007db96d56Sopenharmony_ci 7017db96d56Sopenharmony_ci # Close the event loop 7027db96d56Sopenharmony_ci super().close() 7037db96d56Sopenharmony_ci 7047db96d56Sopenharmony_ci async def sock_recv(self, sock, n): 7057db96d56Sopenharmony_ci return await self._proactor.recv(sock, n) 7067db96d56Sopenharmony_ci 7077db96d56Sopenharmony_ci async def sock_recv_into(self, sock, buf): 7087db96d56Sopenharmony_ci return await self._proactor.recv_into(sock, buf) 7097db96d56Sopenharmony_ci 7107db96d56Sopenharmony_ci async def sock_recvfrom(self, sock, bufsize): 7117db96d56Sopenharmony_ci return await self._proactor.recvfrom(sock, bufsize) 7127db96d56Sopenharmony_ci 7137db96d56Sopenharmony_ci async def sock_recvfrom_into(self, sock, buf, nbytes=0): 7147db96d56Sopenharmony_ci if not nbytes: 7157db96d56Sopenharmony_ci nbytes = len(buf) 7167db96d56Sopenharmony_ci 7177db96d56Sopenharmony_ci return await self._proactor.recvfrom_into(sock, buf, nbytes) 7187db96d56Sopenharmony_ci 7197db96d56Sopenharmony_ci async def sock_sendall(self, sock, data): 7207db96d56Sopenharmony_ci return await self._proactor.send(sock, data) 7217db96d56Sopenharmony_ci 7227db96d56Sopenharmony_ci async def sock_sendto(self, sock, data, address): 7237db96d56Sopenharmony_ci return await self._proactor.sendto(sock, data, 0, address) 7247db96d56Sopenharmony_ci 7257db96d56Sopenharmony_ci async def sock_connect(self, sock, address): 7267db96d56Sopenharmony_ci return await self._proactor.connect(sock, address) 7277db96d56Sopenharmony_ci 7287db96d56Sopenharmony_ci async def sock_accept(self, sock): 7297db96d56Sopenharmony_ci return await self._proactor.accept(sock) 7307db96d56Sopenharmony_ci 7317db96d56Sopenharmony_ci async def _sock_sendfile_native(self, sock, file, offset, count): 7327db96d56Sopenharmony_ci try: 7337db96d56Sopenharmony_ci fileno = file.fileno() 7347db96d56Sopenharmony_ci except (AttributeError, io.UnsupportedOperation) as err: 7357db96d56Sopenharmony_ci raise exceptions.SendfileNotAvailableError("not a regular file") 7367db96d56Sopenharmony_ci try: 7377db96d56Sopenharmony_ci fsize = os.fstat(fileno).st_size 7387db96d56Sopenharmony_ci except OSError: 7397db96d56Sopenharmony_ci raise exceptions.SendfileNotAvailableError("not a regular file") 7407db96d56Sopenharmony_ci blocksize = count if count else fsize 7417db96d56Sopenharmony_ci if not blocksize: 7427db96d56Sopenharmony_ci return 0 # empty file 7437db96d56Sopenharmony_ci 7447db96d56Sopenharmony_ci blocksize = min(blocksize, 0xffff_ffff) 7457db96d56Sopenharmony_ci end_pos = min(offset + count, fsize) if count else fsize 7467db96d56Sopenharmony_ci offset = min(offset, fsize) 7477db96d56Sopenharmony_ci total_sent = 0 7487db96d56Sopenharmony_ci try: 7497db96d56Sopenharmony_ci while True: 7507db96d56Sopenharmony_ci blocksize = min(end_pos - offset, blocksize) 7517db96d56Sopenharmony_ci if blocksize <= 0: 7527db96d56Sopenharmony_ci return total_sent 7537db96d56Sopenharmony_ci await self._proactor.sendfile(sock, file, offset, blocksize) 7547db96d56Sopenharmony_ci offset += blocksize 7557db96d56Sopenharmony_ci total_sent += blocksize 7567db96d56Sopenharmony_ci finally: 7577db96d56Sopenharmony_ci if total_sent > 0: 7587db96d56Sopenharmony_ci file.seek(offset) 7597db96d56Sopenharmony_ci 7607db96d56Sopenharmony_ci async def _sendfile_native(self, transp, file, offset, count): 7617db96d56Sopenharmony_ci resume_reading = transp.is_reading() 7627db96d56Sopenharmony_ci transp.pause_reading() 7637db96d56Sopenharmony_ci await transp._make_empty_waiter() 7647db96d56Sopenharmony_ci try: 7657db96d56Sopenharmony_ci return await self.sock_sendfile(transp._sock, file, offset, count, 7667db96d56Sopenharmony_ci fallback=False) 7677db96d56Sopenharmony_ci finally: 7687db96d56Sopenharmony_ci transp._reset_empty_waiter() 7697db96d56Sopenharmony_ci if resume_reading: 7707db96d56Sopenharmony_ci transp.resume_reading() 7717db96d56Sopenharmony_ci 7727db96d56Sopenharmony_ci def _close_self_pipe(self): 7737db96d56Sopenharmony_ci if self._self_reading_future is not None: 7747db96d56Sopenharmony_ci self._self_reading_future.cancel() 7757db96d56Sopenharmony_ci self._self_reading_future = None 7767db96d56Sopenharmony_ci self._ssock.close() 7777db96d56Sopenharmony_ci self._ssock = None 7787db96d56Sopenharmony_ci self._csock.close() 7797db96d56Sopenharmony_ci self._csock = None 7807db96d56Sopenharmony_ci self._internal_fds -= 1 7817db96d56Sopenharmony_ci 7827db96d56Sopenharmony_ci def _make_self_pipe(self): 7837db96d56Sopenharmony_ci # A self-socket, really. :-) 7847db96d56Sopenharmony_ci self._ssock, self._csock = socket.socketpair() 7857db96d56Sopenharmony_ci self._ssock.setblocking(False) 7867db96d56Sopenharmony_ci self._csock.setblocking(False) 7877db96d56Sopenharmony_ci self._internal_fds += 1 7887db96d56Sopenharmony_ci 7897db96d56Sopenharmony_ci def _loop_self_reading(self, f=None): 7907db96d56Sopenharmony_ci try: 7917db96d56Sopenharmony_ci if f is not None: 7927db96d56Sopenharmony_ci f.result() # may raise 7937db96d56Sopenharmony_ci if self._self_reading_future is not f: 7947db96d56Sopenharmony_ci # When we scheduled this Future, we assigned it to 7957db96d56Sopenharmony_ci # _self_reading_future. If it's not there now, something has 7967db96d56Sopenharmony_ci # tried to cancel the loop while this callback was still in the 7977db96d56Sopenharmony_ci # queue (see windows_events.ProactorEventLoop.run_forever). In 7987db96d56Sopenharmony_ci # that case stop here instead of continuing to schedule a new 7997db96d56Sopenharmony_ci # iteration. 8007db96d56Sopenharmony_ci return 8017db96d56Sopenharmony_ci f = self._proactor.recv(self._ssock, 4096) 8027db96d56Sopenharmony_ci except exceptions.CancelledError: 8037db96d56Sopenharmony_ci # _close_self_pipe() has been called, stop waiting for data 8047db96d56Sopenharmony_ci return 8057db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 8067db96d56Sopenharmony_ci raise 8077db96d56Sopenharmony_ci except BaseException as exc: 8087db96d56Sopenharmony_ci self.call_exception_handler({ 8097db96d56Sopenharmony_ci 'message': 'Error on reading from the event loop self pipe', 8107db96d56Sopenharmony_ci 'exception': exc, 8117db96d56Sopenharmony_ci 'loop': self, 8127db96d56Sopenharmony_ci }) 8137db96d56Sopenharmony_ci else: 8147db96d56Sopenharmony_ci self._self_reading_future = f 8157db96d56Sopenharmony_ci f.add_done_callback(self._loop_self_reading) 8167db96d56Sopenharmony_ci 8177db96d56Sopenharmony_ci def _write_to_self(self): 8187db96d56Sopenharmony_ci # This may be called from a different thread, possibly after 8197db96d56Sopenharmony_ci # _close_self_pipe() has been called or even while it is 8207db96d56Sopenharmony_ci # running. Guard for self._csock being None or closed. When 8217db96d56Sopenharmony_ci # a socket is closed, send() raises OSError (with errno set to 8227db96d56Sopenharmony_ci # EBADF, but let's not rely on the exact error code). 8237db96d56Sopenharmony_ci csock = self._csock 8247db96d56Sopenharmony_ci if csock is None: 8257db96d56Sopenharmony_ci return 8267db96d56Sopenharmony_ci 8277db96d56Sopenharmony_ci try: 8287db96d56Sopenharmony_ci csock.send(b'\0') 8297db96d56Sopenharmony_ci except OSError: 8307db96d56Sopenharmony_ci if self._debug: 8317db96d56Sopenharmony_ci logger.debug("Fail to write a null byte into the " 8327db96d56Sopenharmony_ci "self-pipe socket", 8337db96d56Sopenharmony_ci exc_info=True) 8347db96d56Sopenharmony_ci 8357db96d56Sopenharmony_ci def _start_serving(self, protocol_factory, sock, 8367db96d56Sopenharmony_ci sslcontext=None, server=None, backlog=100, 8377db96d56Sopenharmony_ci ssl_handshake_timeout=None, 8387db96d56Sopenharmony_ci ssl_shutdown_timeout=None): 8397db96d56Sopenharmony_ci 8407db96d56Sopenharmony_ci def loop(f=None): 8417db96d56Sopenharmony_ci try: 8427db96d56Sopenharmony_ci if f is not None: 8437db96d56Sopenharmony_ci conn, addr = f.result() 8447db96d56Sopenharmony_ci if self._debug: 8457db96d56Sopenharmony_ci logger.debug("%r got a new connection from %r: %r", 8467db96d56Sopenharmony_ci server, addr, conn) 8477db96d56Sopenharmony_ci protocol = protocol_factory() 8487db96d56Sopenharmony_ci if sslcontext is not None: 8497db96d56Sopenharmony_ci self._make_ssl_transport( 8507db96d56Sopenharmony_ci conn, protocol, sslcontext, server_side=True, 8517db96d56Sopenharmony_ci extra={'peername': addr}, server=server, 8527db96d56Sopenharmony_ci ssl_handshake_timeout=ssl_handshake_timeout, 8537db96d56Sopenharmony_ci ssl_shutdown_timeout=ssl_shutdown_timeout) 8547db96d56Sopenharmony_ci else: 8557db96d56Sopenharmony_ci self._make_socket_transport( 8567db96d56Sopenharmony_ci conn, protocol, 8577db96d56Sopenharmony_ci extra={'peername': addr}, server=server) 8587db96d56Sopenharmony_ci if self.is_closed(): 8597db96d56Sopenharmony_ci return 8607db96d56Sopenharmony_ci f = self._proactor.accept(sock) 8617db96d56Sopenharmony_ci except OSError as exc: 8627db96d56Sopenharmony_ci if sock.fileno() != -1: 8637db96d56Sopenharmony_ci self.call_exception_handler({ 8647db96d56Sopenharmony_ci 'message': 'Accept failed on a socket', 8657db96d56Sopenharmony_ci 'exception': exc, 8667db96d56Sopenharmony_ci 'socket': trsock.TransportSocket(sock), 8677db96d56Sopenharmony_ci }) 8687db96d56Sopenharmony_ci sock.close() 8697db96d56Sopenharmony_ci elif self._debug: 8707db96d56Sopenharmony_ci logger.debug("Accept failed on socket %r", 8717db96d56Sopenharmony_ci sock, exc_info=True) 8727db96d56Sopenharmony_ci except exceptions.CancelledError: 8737db96d56Sopenharmony_ci sock.close() 8747db96d56Sopenharmony_ci else: 8757db96d56Sopenharmony_ci self._accept_futures[sock.fileno()] = f 8767db96d56Sopenharmony_ci f.add_done_callback(loop) 8777db96d56Sopenharmony_ci 8787db96d56Sopenharmony_ci self.call_soon(loop) 8797db96d56Sopenharmony_ci 8807db96d56Sopenharmony_ci def _process_events(self, event_list): 8817db96d56Sopenharmony_ci # Events are processed in the IocpProactor._poll() method 8827db96d56Sopenharmony_ci pass 8837db96d56Sopenharmony_ci 8847db96d56Sopenharmony_ci def _stop_accept_futures(self): 8857db96d56Sopenharmony_ci for future in self._accept_futures.values(): 8867db96d56Sopenharmony_ci future.cancel() 8877db96d56Sopenharmony_ci self._accept_futures.clear() 8887db96d56Sopenharmony_ci 8897db96d56Sopenharmony_ci def _stop_serving(self, sock): 8907db96d56Sopenharmony_ci future = self._accept_futures.pop(sock.fileno(), None) 8917db96d56Sopenharmony_ci if future: 8927db96d56Sopenharmony_ci future.cancel() 8937db96d56Sopenharmony_ci self._proactor._stop_serving(sock) 8947db96d56Sopenharmony_ci sock.close() 895