17db96d56Sopenharmony_ci"""Base implementation of event loop. 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciThe event loop can be broken up into a multiplexer (the part 47db96d56Sopenharmony_ciresponsible for notifying us of I/O events) and the event loop proper, 57db96d56Sopenharmony_ciwhich wraps a multiplexer with functionality for scheduling callbacks, 67db96d56Sopenharmony_ciimmediately or at a given time in the future. 77db96d56Sopenharmony_ci 87db96d56Sopenharmony_ciWhenever a public API takes a callback, subsequent positional 97db96d56Sopenharmony_ciarguments will be passed to the callback if/when it is called. This 107db96d56Sopenharmony_ciavoids the proliferation of trivial lambdas implementing closures. 117db96d56Sopenharmony_ciKeyword arguments for the callback are not supported; this is a 127db96d56Sopenharmony_ciconscious design decision, leaving the door open for keyword arguments 137db96d56Sopenharmony_cito modify the meaning of the API call itself. 147db96d56Sopenharmony_ci""" 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_ciimport collections 177db96d56Sopenharmony_ciimport collections.abc 187db96d56Sopenharmony_ciimport concurrent.futures 197db96d56Sopenharmony_ciimport functools 207db96d56Sopenharmony_ciimport heapq 217db96d56Sopenharmony_ciimport itertools 227db96d56Sopenharmony_ciimport os 237db96d56Sopenharmony_ciimport socket 247db96d56Sopenharmony_ciimport stat 257db96d56Sopenharmony_ciimport subprocess 267db96d56Sopenharmony_ciimport threading 277db96d56Sopenharmony_ciimport time 287db96d56Sopenharmony_ciimport traceback 297db96d56Sopenharmony_ciimport sys 307db96d56Sopenharmony_ciimport warnings 317db96d56Sopenharmony_ciimport weakref 327db96d56Sopenharmony_ci 337db96d56Sopenharmony_citry: 347db96d56Sopenharmony_ci import ssl 357db96d56Sopenharmony_ciexcept ImportError: # pragma: no cover 367db96d56Sopenharmony_ci ssl = None 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_cifrom . import constants 397db96d56Sopenharmony_cifrom . import coroutines 407db96d56Sopenharmony_cifrom . import events 417db96d56Sopenharmony_cifrom . import exceptions 427db96d56Sopenharmony_cifrom . import futures 437db96d56Sopenharmony_cifrom . import protocols 447db96d56Sopenharmony_cifrom . import sslproto 457db96d56Sopenharmony_cifrom . import staggered 467db96d56Sopenharmony_cifrom . import tasks 477db96d56Sopenharmony_cifrom . import transports 487db96d56Sopenharmony_cifrom . import trsock 497db96d56Sopenharmony_cifrom .log import logger 507db96d56Sopenharmony_ci 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ci__all__ = 'BaseEventLoop','Server', 537db96d56Sopenharmony_ci 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ci# Minimum number of _scheduled timer handles before cleanup of 567db96d56Sopenharmony_ci# cancelled handles is performed. 577db96d56Sopenharmony_ci_MIN_SCHEDULED_TIMER_HANDLES = 100 587db96d56Sopenharmony_ci 597db96d56Sopenharmony_ci# Minimum fraction of _scheduled timer handles that are cancelled 607db96d56Sopenharmony_ci# before cleanup of cancelled handles is performed. 617db96d56Sopenharmony_ci_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ci 647db96d56Sopenharmony_ci_HAS_IPv6 = hasattr(socket, 'AF_INET6') 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ci# Maximum timeout passed to select to avoid OS limitations 677db96d56Sopenharmony_ciMAXIMUM_SELECT_TIMEOUT = 24 * 3600 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ci 707db96d56Sopenharmony_cidef _format_handle(handle): 717db96d56Sopenharmony_ci cb = handle._callback 727db96d56Sopenharmony_ci if isinstance(getattr(cb, '__self__', None), tasks.Task): 737db96d56Sopenharmony_ci # format the task 747db96d56Sopenharmony_ci return repr(cb.__self__) 757db96d56Sopenharmony_ci else: 767db96d56Sopenharmony_ci return str(handle) 777db96d56Sopenharmony_ci 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_cidef _format_pipe(fd): 807db96d56Sopenharmony_ci if fd == subprocess.PIPE: 817db96d56Sopenharmony_ci return '<pipe>' 827db96d56Sopenharmony_ci elif fd == subprocess.STDOUT: 837db96d56Sopenharmony_ci return '<stdout>' 847db96d56Sopenharmony_ci else: 857db96d56Sopenharmony_ci return repr(fd) 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ci 887db96d56Sopenharmony_cidef _set_reuseport(sock): 897db96d56Sopenharmony_ci if not hasattr(socket, 'SO_REUSEPORT'): 907db96d56Sopenharmony_ci raise ValueError('reuse_port not supported by socket module') 917db96d56Sopenharmony_ci else: 927db96d56Sopenharmony_ci try: 937db96d56Sopenharmony_ci sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 947db96d56Sopenharmony_ci except OSError: 957db96d56Sopenharmony_ci raise ValueError('reuse_port not supported by socket module, ' 967db96d56Sopenharmony_ci 'SO_REUSEPORT defined but not implemented.') 977db96d56Sopenharmony_ci 987db96d56Sopenharmony_ci 997db96d56Sopenharmony_cidef _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 1007db96d56Sopenharmony_ci # Try to skip getaddrinfo if "host" is already an IP. Users might have 1017db96d56Sopenharmony_ci # handled name resolution in their own code and pass in resolved IPs. 1027db96d56Sopenharmony_ci if not hasattr(socket, 'inet_pton'): 1037db96d56Sopenharmony_ci return 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_ci if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 1067db96d56Sopenharmony_ci host is None: 1077db96d56Sopenharmony_ci return None 1087db96d56Sopenharmony_ci 1097db96d56Sopenharmony_ci if type == socket.SOCK_STREAM: 1107db96d56Sopenharmony_ci proto = socket.IPPROTO_TCP 1117db96d56Sopenharmony_ci elif type == socket.SOCK_DGRAM: 1127db96d56Sopenharmony_ci proto = socket.IPPROTO_UDP 1137db96d56Sopenharmony_ci else: 1147db96d56Sopenharmony_ci return None 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci if port is None: 1177db96d56Sopenharmony_ci port = 0 1187db96d56Sopenharmony_ci elif isinstance(port, bytes) and port == b'': 1197db96d56Sopenharmony_ci port = 0 1207db96d56Sopenharmony_ci elif isinstance(port, str) and port == '': 1217db96d56Sopenharmony_ci port = 0 1227db96d56Sopenharmony_ci else: 1237db96d56Sopenharmony_ci # If port's a service name like "http", don't skip getaddrinfo. 1247db96d56Sopenharmony_ci try: 1257db96d56Sopenharmony_ci port = int(port) 1267db96d56Sopenharmony_ci except (TypeError, ValueError): 1277db96d56Sopenharmony_ci return None 1287db96d56Sopenharmony_ci 1297db96d56Sopenharmony_ci if family == socket.AF_UNSPEC: 1307db96d56Sopenharmony_ci afs = [socket.AF_INET] 1317db96d56Sopenharmony_ci if _HAS_IPv6: 1327db96d56Sopenharmony_ci afs.append(socket.AF_INET6) 1337db96d56Sopenharmony_ci else: 1347db96d56Sopenharmony_ci afs = [family] 1357db96d56Sopenharmony_ci 1367db96d56Sopenharmony_ci if isinstance(host, bytes): 1377db96d56Sopenharmony_ci host = host.decode('idna') 1387db96d56Sopenharmony_ci if '%' in host: 1397db96d56Sopenharmony_ci # Linux's inet_pton doesn't accept an IPv6 zone index after host, 1407db96d56Sopenharmony_ci # like '::1%lo0'. 1417db96d56Sopenharmony_ci return None 1427db96d56Sopenharmony_ci 1437db96d56Sopenharmony_ci for af in afs: 1447db96d56Sopenharmony_ci try: 1457db96d56Sopenharmony_ci socket.inet_pton(af, host) 1467db96d56Sopenharmony_ci # The host has already been resolved. 1477db96d56Sopenharmony_ci if _HAS_IPv6 and af == socket.AF_INET6: 1487db96d56Sopenharmony_ci return af, type, proto, '', (host, port, flowinfo, scopeid) 1497db96d56Sopenharmony_ci else: 1507db96d56Sopenharmony_ci return af, type, proto, '', (host, port) 1517db96d56Sopenharmony_ci except OSError: 1527db96d56Sopenharmony_ci pass 1537db96d56Sopenharmony_ci 1547db96d56Sopenharmony_ci # "host" is not an IP address. 1557db96d56Sopenharmony_ci return None 1567db96d56Sopenharmony_ci 1577db96d56Sopenharmony_ci 1587db96d56Sopenharmony_cidef _interleave_addrinfos(addrinfos, first_address_family_count=1): 1597db96d56Sopenharmony_ci """Interleave list of addrinfo tuples by family.""" 1607db96d56Sopenharmony_ci # Group addresses by family 1617db96d56Sopenharmony_ci addrinfos_by_family = collections.OrderedDict() 1627db96d56Sopenharmony_ci for addr in addrinfos: 1637db96d56Sopenharmony_ci family = addr[0] 1647db96d56Sopenharmony_ci if family not in addrinfos_by_family: 1657db96d56Sopenharmony_ci addrinfos_by_family[family] = [] 1667db96d56Sopenharmony_ci addrinfos_by_family[family].append(addr) 1677db96d56Sopenharmony_ci addrinfos_lists = list(addrinfos_by_family.values()) 1687db96d56Sopenharmony_ci 1697db96d56Sopenharmony_ci reordered = [] 1707db96d56Sopenharmony_ci if first_address_family_count > 1: 1717db96d56Sopenharmony_ci reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 1727db96d56Sopenharmony_ci del addrinfos_lists[0][:first_address_family_count - 1] 1737db96d56Sopenharmony_ci reordered.extend( 1747db96d56Sopenharmony_ci a for a in itertools.chain.from_iterable( 1757db96d56Sopenharmony_ci itertools.zip_longest(*addrinfos_lists) 1767db96d56Sopenharmony_ci ) if a is not None) 1777db96d56Sopenharmony_ci return reordered 1787db96d56Sopenharmony_ci 1797db96d56Sopenharmony_ci 1807db96d56Sopenharmony_cidef _run_until_complete_cb(fut): 1817db96d56Sopenharmony_ci if not fut.cancelled(): 1827db96d56Sopenharmony_ci exc = fut.exception() 1837db96d56Sopenharmony_ci if isinstance(exc, (SystemExit, KeyboardInterrupt)): 1847db96d56Sopenharmony_ci # Issue #22429: run_forever() already finished, no need to 1857db96d56Sopenharmony_ci # stop it. 1867db96d56Sopenharmony_ci return 1877db96d56Sopenharmony_ci futures._get_loop(fut).stop() 1887db96d56Sopenharmony_ci 1897db96d56Sopenharmony_ci 1907db96d56Sopenharmony_ciif hasattr(socket, 'TCP_NODELAY'): 1917db96d56Sopenharmony_ci def _set_nodelay(sock): 1927db96d56Sopenharmony_ci if (sock.family in {socket.AF_INET, socket.AF_INET6} and 1937db96d56Sopenharmony_ci sock.type == socket.SOCK_STREAM and 1947db96d56Sopenharmony_ci sock.proto == socket.IPPROTO_TCP): 1957db96d56Sopenharmony_ci sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 1967db96d56Sopenharmony_cielse: 1977db96d56Sopenharmony_ci def _set_nodelay(sock): 1987db96d56Sopenharmony_ci pass 1997db96d56Sopenharmony_ci 2007db96d56Sopenharmony_ci 2017db96d56Sopenharmony_cidef _check_ssl_socket(sock): 2027db96d56Sopenharmony_ci if ssl is not None and isinstance(sock, ssl.SSLSocket): 2037db96d56Sopenharmony_ci raise TypeError("Socket cannot be of type SSLSocket") 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_ci 2067db96d56Sopenharmony_ciclass _SendfileFallbackProtocol(protocols.Protocol): 2077db96d56Sopenharmony_ci def __init__(self, transp): 2087db96d56Sopenharmony_ci if not isinstance(transp, transports._FlowControlMixin): 2097db96d56Sopenharmony_ci raise TypeError("transport should be _FlowControlMixin instance") 2107db96d56Sopenharmony_ci self._transport = transp 2117db96d56Sopenharmony_ci self._proto = transp.get_protocol() 2127db96d56Sopenharmony_ci self._should_resume_reading = transp.is_reading() 2137db96d56Sopenharmony_ci self._should_resume_writing = transp._protocol_paused 2147db96d56Sopenharmony_ci transp.pause_reading() 2157db96d56Sopenharmony_ci transp.set_protocol(self) 2167db96d56Sopenharmony_ci if self._should_resume_writing: 2177db96d56Sopenharmony_ci self._write_ready_fut = self._transport._loop.create_future() 2187db96d56Sopenharmony_ci else: 2197db96d56Sopenharmony_ci self._write_ready_fut = None 2207db96d56Sopenharmony_ci 2217db96d56Sopenharmony_ci async def drain(self): 2227db96d56Sopenharmony_ci if self._transport.is_closing(): 2237db96d56Sopenharmony_ci raise ConnectionError("Connection closed by peer") 2247db96d56Sopenharmony_ci fut = self._write_ready_fut 2257db96d56Sopenharmony_ci if fut is None: 2267db96d56Sopenharmony_ci return 2277db96d56Sopenharmony_ci await fut 2287db96d56Sopenharmony_ci 2297db96d56Sopenharmony_ci def connection_made(self, transport): 2307db96d56Sopenharmony_ci raise RuntimeError("Invalid state: " 2317db96d56Sopenharmony_ci "connection should have been established already.") 2327db96d56Sopenharmony_ci 2337db96d56Sopenharmony_ci def connection_lost(self, exc): 2347db96d56Sopenharmony_ci if self._write_ready_fut is not None: 2357db96d56Sopenharmony_ci # Never happens if peer disconnects after sending the whole content 2367db96d56Sopenharmony_ci # Thus disconnection is always an exception from user perspective 2377db96d56Sopenharmony_ci if exc is None: 2387db96d56Sopenharmony_ci self._write_ready_fut.set_exception( 2397db96d56Sopenharmony_ci ConnectionError("Connection is closed by peer")) 2407db96d56Sopenharmony_ci else: 2417db96d56Sopenharmony_ci self._write_ready_fut.set_exception(exc) 2427db96d56Sopenharmony_ci self._proto.connection_lost(exc) 2437db96d56Sopenharmony_ci 2447db96d56Sopenharmony_ci def pause_writing(self): 2457db96d56Sopenharmony_ci if self._write_ready_fut is not None: 2467db96d56Sopenharmony_ci return 2477db96d56Sopenharmony_ci self._write_ready_fut = self._transport._loop.create_future() 2487db96d56Sopenharmony_ci 2497db96d56Sopenharmony_ci def resume_writing(self): 2507db96d56Sopenharmony_ci if self._write_ready_fut is None: 2517db96d56Sopenharmony_ci return 2527db96d56Sopenharmony_ci self._write_ready_fut.set_result(False) 2537db96d56Sopenharmony_ci self._write_ready_fut = None 2547db96d56Sopenharmony_ci 2557db96d56Sopenharmony_ci def data_received(self, data): 2567db96d56Sopenharmony_ci raise RuntimeError("Invalid state: reading should be paused") 2577db96d56Sopenharmony_ci 2587db96d56Sopenharmony_ci def eof_received(self): 2597db96d56Sopenharmony_ci raise RuntimeError("Invalid state: reading should be paused") 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ci async def restore(self): 2627db96d56Sopenharmony_ci self._transport.set_protocol(self._proto) 2637db96d56Sopenharmony_ci if self._should_resume_reading: 2647db96d56Sopenharmony_ci self._transport.resume_reading() 2657db96d56Sopenharmony_ci if self._write_ready_fut is not None: 2667db96d56Sopenharmony_ci # Cancel the future. 2677db96d56Sopenharmony_ci # Basically it has no effect because protocol is switched back, 2687db96d56Sopenharmony_ci # no code should wait for it anymore. 2697db96d56Sopenharmony_ci self._write_ready_fut.cancel() 2707db96d56Sopenharmony_ci if self._should_resume_writing: 2717db96d56Sopenharmony_ci self._proto.resume_writing() 2727db96d56Sopenharmony_ci 2737db96d56Sopenharmony_ci 2747db96d56Sopenharmony_ciclass Server(events.AbstractServer): 2757db96d56Sopenharmony_ci 2767db96d56Sopenharmony_ci def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 2777db96d56Sopenharmony_ci ssl_handshake_timeout, ssl_shutdown_timeout=None): 2787db96d56Sopenharmony_ci self._loop = loop 2797db96d56Sopenharmony_ci self._sockets = sockets 2807db96d56Sopenharmony_ci self._active_count = 0 2817db96d56Sopenharmony_ci self._waiters = [] 2827db96d56Sopenharmony_ci self._protocol_factory = protocol_factory 2837db96d56Sopenharmony_ci self._backlog = backlog 2847db96d56Sopenharmony_ci self._ssl_context = ssl_context 2857db96d56Sopenharmony_ci self._ssl_handshake_timeout = ssl_handshake_timeout 2867db96d56Sopenharmony_ci self._ssl_shutdown_timeout = ssl_shutdown_timeout 2877db96d56Sopenharmony_ci self._serving = False 2887db96d56Sopenharmony_ci self._serving_forever_fut = None 2897db96d56Sopenharmony_ci 2907db96d56Sopenharmony_ci def __repr__(self): 2917db96d56Sopenharmony_ci return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 2927db96d56Sopenharmony_ci 2937db96d56Sopenharmony_ci def _attach(self): 2947db96d56Sopenharmony_ci assert self._sockets is not None 2957db96d56Sopenharmony_ci self._active_count += 1 2967db96d56Sopenharmony_ci 2977db96d56Sopenharmony_ci def _detach(self): 2987db96d56Sopenharmony_ci assert self._active_count > 0 2997db96d56Sopenharmony_ci self._active_count -= 1 3007db96d56Sopenharmony_ci if self._active_count == 0 and self._sockets is None: 3017db96d56Sopenharmony_ci self._wakeup() 3027db96d56Sopenharmony_ci 3037db96d56Sopenharmony_ci def _wakeup(self): 3047db96d56Sopenharmony_ci waiters = self._waiters 3057db96d56Sopenharmony_ci self._waiters = None 3067db96d56Sopenharmony_ci for waiter in waiters: 3077db96d56Sopenharmony_ci if not waiter.done(): 3087db96d56Sopenharmony_ci waiter.set_result(waiter) 3097db96d56Sopenharmony_ci 3107db96d56Sopenharmony_ci def _start_serving(self): 3117db96d56Sopenharmony_ci if self._serving: 3127db96d56Sopenharmony_ci return 3137db96d56Sopenharmony_ci self._serving = True 3147db96d56Sopenharmony_ci for sock in self._sockets: 3157db96d56Sopenharmony_ci sock.listen(self._backlog) 3167db96d56Sopenharmony_ci self._loop._start_serving( 3177db96d56Sopenharmony_ci self._protocol_factory, sock, self._ssl_context, 3187db96d56Sopenharmony_ci self, self._backlog, self._ssl_handshake_timeout, 3197db96d56Sopenharmony_ci self._ssl_shutdown_timeout) 3207db96d56Sopenharmony_ci 3217db96d56Sopenharmony_ci def get_loop(self): 3227db96d56Sopenharmony_ci return self._loop 3237db96d56Sopenharmony_ci 3247db96d56Sopenharmony_ci def is_serving(self): 3257db96d56Sopenharmony_ci return self._serving 3267db96d56Sopenharmony_ci 3277db96d56Sopenharmony_ci @property 3287db96d56Sopenharmony_ci def sockets(self): 3297db96d56Sopenharmony_ci if self._sockets is None: 3307db96d56Sopenharmony_ci return () 3317db96d56Sopenharmony_ci return tuple(trsock.TransportSocket(s) for s in self._sockets) 3327db96d56Sopenharmony_ci 3337db96d56Sopenharmony_ci def close(self): 3347db96d56Sopenharmony_ci sockets = self._sockets 3357db96d56Sopenharmony_ci if sockets is None: 3367db96d56Sopenharmony_ci return 3377db96d56Sopenharmony_ci self._sockets = None 3387db96d56Sopenharmony_ci 3397db96d56Sopenharmony_ci for sock in sockets: 3407db96d56Sopenharmony_ci self._loop._stop_serving(sock) 3417db96d56Sopenharmony_ci 3427db96d56Sopenharmony_ci self._serving = False 3437db96d56Sopenharmony_ci 3447db96d56Sopenharmony_ci if (self._serving_forever_fut is not None and 3457db96d56Sopenharmony_ci not self._serving_forever_fut.done()): 3467db96d56Sopenharmony_ci self._serving_forever_fut.cancel() 3477db96d56Sopenharmony_ci self._serving_forever_fut = None 3487db96d56Sopenharmony_ci 3497db96d56Sopenharmony_ci if self._active_count == 0: 3507db96d56Sopenharmony_ci self._wakeup() 3517db96d56Sopenharmony_ci 3527db96d56Sopenharmony_ci async def start_serving(self): 3537db96d56Sopenharmony_ci self._start_serving() 3547db96d56Sopenharmony_ci # Skip one loop iteration so that all 'loop.add_reader' 3557db96d56Sopenharmony_ci # go through. 3567db96d56Sopenharmony_ci await tasks.sleep(0) 3577db96d56Sopenharmony_ci 3587db96d56Sopenharmony_ci async def serve_forever(self): 3597db96d56Sopenharmony_ci if self._serving_forever_fut is not None: 3607db96d56Sopenharmony_ci raise RuntimeError( 3617db96d56Sopenharmony_ci f'server {self!r} is already being awaited on serve_forever()') 3627db96d56Sopenharmony_ci if self._sockets is None: 3637db96d56Sopenharmony_ci raise RuntimeError(f'server {self!r} is closed') 3647db96d56Sopenharmony_ci 3657db96d56Sopenharmony_ci self._start_serving() 3667db96d56Sopenharmony_ci self._serving_forever_fut = self._loop.create_future() 3677db96d56Sopenharmony_ci 3687db96d56Sopenharmony_ci try: 3697db96d56Sopenharmony_ci await self._serving_forever_fut 3707db96d56Sopenharmony_ci except exceptions.CancelledError: 3717db96d56Sopenharmony_ci try: 3727db96d56Sopenharmony_ci self.close() 3737db96d56Sopenharmony_ci await self.wait_closed() 3747db96d56Sopenharmony_ci finally: 3757db96d56Sopenharmony_ci raise 3767db96d56Sopenharmony_ci finally: 3777db96d56Sopenharmony_ci self._serving_forever_fut = None 3787db96d56Sopenharmony_ci 3797db96d56Sopenharmony_ci async def wait_closed(self): 3807db96d56Sopenharmony_ci if self._sockets is None or self._waiters is None: 3817db96d56Sopenharmony_ci return 3827db96d56Sopenharmony_ci waiter = self._loop.create_future() 3837db96d56Sopenharmony_ci self._waiters.append(waiter) 3847db96d56Sopenharmony_ci await waiter 3857db96d56Sopenharmony_ci 3867db96d56Sopenharmony_ci 3877db96d56Sopenharmony_ciclass BaseEventLoop(events.AbstractEventLoop): 3887db96d56Sopenharmony_ci 3897db96d56Sopenharmony_ci def __init__(self): 3907db96d56Sopenharmony_ci self._timer_cancelled_count = 0 3917db96d56Sopenharmony_ci self._closed = False 3927db96d56Sopenharmony_ci self._stopping = False 3937db96d56Sopenharmony_ci self._ready = collections.deque() 3947db96d56Sopenharmony_ci self._scheduled = [] 3957db96d56Sopenharmony_ci self._default_executor = None 3967db96d56Sopenharmony_ci self._internal_fds = 0 3977db96d56Sopenharmony_ci # Identifier of the thread running the event loop, or None if the 3987db96d56Sopenharmony_ci # event loop is not running 3997db96d56Sopenharmony_ci self._thread_id = None 4007db96d56Sopenharmony_ci self._clock_resolution = time.get_clock_info('monotonic').resolution 4017db96d56Sopenharmony_ci self._exception_handler = None 4027db96d56Sopenharmony_ci self.set_debug(coroutines._is_debug_mode()) 4037db96d56Sopenharmony_ci # In debug mode, if the execution of a callback or a step of a task 4047db96d56Sopenharmony_ci # exceed this duration in seconds, the slow callback/task is logged. 4057db96d56Sopenharmony_ci self.slow_callback_duration = 0.1 4067db96d56Sopenharmony_ci self._current_handle = None 4077db96d56Sopenharmony_ci self._task_factory = None 4087db96d56Sopenharmony_ci self._coroutine_origin_tracking_enabled = False 4097db96d56Sopenharmony_ci self._coroutine_origin_tracking_saved_depth = None 4107db96d56Sopenharmony_ci 4117db96d56Sopenharmony_ci # A weak set of all asynchronous generators that are 4127db96d56Sopenharmony_ci # being iterated by the loop. 4137db96d56Sopenharmony_ci self._asyncgens = weakref.WeakSet() 4147db96d56Sopenharmony_ci # Set to True when `loop.shutdown_asyncgens` is called. 4157db96d56Sopenharmony_ci self._asyncgens_shutdown_called = False 4167db96d56Sopenharmony_ci # Set to True when `loop.shutdown_default_executor` is called. 4177db96d56Sopenharmony_ci self._executor_shutdown_called = False 4187db96d56Sopenharmony_ci 4197db96d56Sopenharmony_ci def __repr__(self): 4207db96d56Sopenharmony_ci return ( 4217db96d56Sopenharmony_ci f'<{self.__class__.__name__} running={self.is_running()} ' 4227db96d56Sopenharmony_ci f'closed={self.is_closed()} debug={self.get_debug()}>' 4237db96d56Sopenharmony_ci ) 4247db96d56Sopenharmony_ci 4257db96d56Sopenharmony_ci def create_future(self): 4267db96d56Sopenharmony_ci """Create a Future object attached to the loop.""" 4277db96d56Sopenharmony_ci return futures.Future(loop=self) 4287db96d56Sopenharmony_ci 4297db96d56Sopenharmony_ci def create_task(self, coro, *, name=None, context=None): 4307db96d56Sopenharmony_ci """Schedule a coroutine object. 4317db96d56Sopenharmony_ci 4327db96d56Sopenharmony_ci Return a task object. 4337db96d56Sopenharmony_ci """ 4347db96d56Sopenharmony_ci self._check_closed() 4357db96d56Sopenharmony_ci if self._task_factory is None: 4367db96d56Sopenharmony_ci task = tasks.Task(coro, loop=self, name=name, context=context) 4377db96d56Sopenharmony_ci if task._source_traceback: 4387db96d56Sopenharmony_ci del task._source_traceback[-1] 4397db96d56Sopenharmony_ci else: 4407db96d56Sopenharmony_ci if context is None: 4417db96d56Sopenharmony_ci # Use legacy API if context is not needed 4427db96d56Sopenharmony_ci task = self._task_factory(self, coro) 4437db96d56Sopenharmony_ci else: 4447db96d56Sopenharmony_ci task = self._task_factory(self, coro, context=context) 4457db96d56Sopenharmony_ci 4467db96d56Sopenharmony_ci tasks._set_task_name(task, name) 4477db96d56Sopenharmony_ci 4487db96d56Sopenharmony_ci return task 4497db96d56Sopenharmony_ci 4507db96d56Sopenharmony_ci def set_task_factory(self, factory): 4517db96d56Sopenharmony_ci """Set a task factory that will be used by loop.create_task(). 4527db96d56Sopenharmony_ci 4537db96d56Sopenharmony_ci If factory is None the default task factory will be set. 4547db96d56Sopenharmony_ci 4557db96d56Sopenharmony_ci If factory is a callable, it should have a signature matching 4567db96d56Sopenharmony_ci '(loop, coro)', where 'loop' will be a reference to the active 4577db96d56Sopenharmony_ci event loop, 'coro' will be a coroutine object. The callable 4587db96d56Sopenharmony_ci must return a Future. 4597db96d56Sopenharmony_ci """ 4607db96d56Sopenharmony_ci if factory is not None and not callable(factory): 4617db96d56Sopenharmony_ci raise TypeError('task factory must be a callable or None') 4627db96d56Sopenharmony_ci self._task_factory = factory 4637db96d56Sopenharmony_ci 4647db96d56Sopenharmony_ci def get_task_factory(self): 4657db96d56Sopenharmony_ci """Return a task factory, or None if the default one is in use.""" 4667db96d56Sopenharmony_ci return self._task_factory 4677db96d56Sopenharmony_ci 4687db96d56Sopenharmony_ci def _make_socket_transport(self, sock, protocol, waiter=None, *, 4697db96d56Sopenharmony_ci extra=None, server=None): 4707db96d56Sopenharmony_ci """Create socket transport.""" 4717db96d56Sopenharmony_ci raise NotImplementedError 4727db96d56Sopenharmony_ci 4737db96d56Sopenharmony_ci def _make_ssl_transport( 4747db96d56Sopenharmony_ci self, rawsock, protocol, sslcontext, waiter=None, 4757db96d56Sopenharmony_ci *, server_side=False, server_hostname=None, 4767db96d56Sopenharmony_ci extra=None, server=None, 4777db96d56Sopenharmony_ci ssl_handshake_timeout=None, 4787db96d56Sopenharmony_ci ssl_shutdown_timeout=None, 4797db96d56Sopenharmony_ci call_connection_made=True): 4807db96d56Sopenharmony_ci """Create SSL transport.""" 4817db96d56Sopenharmony_ci raise NotImplementedError 4827db96d56Sopenharmony_ci 4837db96d56Sopenharmony_ci def _make_datagram_transport(self, sock, protocol, 4847db96d56Sopenharmony_ci address=None, waiter=None, extra=None): 4857db96d56Sopenharmony_ci """Create datagram transport.""" 4867db96d56Sopenharmony_ci raise NotImplementedError 4877db96d56Sopenharmony_ci 4887db96d56Sopenharmony_ci def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 4897db96d56Sopenharmony_ci extra=None): 4907db96d56Sopenharmony_ci """Create read pipe transport.""" 4917db96d56Sopenharmony_ci raise NotImplementedError 4927db96d56Sopenharmony_ci 4937db96d56Sopenharmony_ci def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 4947db96d56Sopenharmony_ci extra=None): 4957db96d56Sopenharmony_ci """Create write pipe transport.""" 4967db96d56Sopenharmony_ci raise NotImplementedError 4977db96d56Sopenharmony_ci 4987db96d56Sopenharmony_ci async def _make_subprocess_transport(self, protocol, args, shell, 4997db96d56Sopenharmony_ci stdin, stdout, stderr, bufsize, 5007db96d56Sopenharmony_ci extra=None, **kwargs): 5017db96d56Sopenharmony_ci """Create subprocess transport.""" 5027db96d56Sopenharmony_ci raise NotImplementedError 5037db96d56Sopenharmony_ci 5047db96d56Sopenharmony_ci def _write_to_self(self): 5057db96d56Sopenharmony_ci """Write a byte to self-pipe, to wake up the event loop. 5067db96d56Sopenharmony_ci 5077db96d56Sopenharmony_ci This may be called from a different thread. 5087db96d56Sopenharmony_ci 5097db96d56Sopenharmony_ci The subclass is responsible for implementing the self-pipe. 5107db96d56Sopenharmony_ci """ 5117db96d56Sopenharmony_ci raise NotImplementedError 5127db96d56Sopenharmony_ci 5137db96d56Sopenharmony_ci def _process_events(self, event_list): 5147db96d56Sopenharmony_ci """Process selector events.""" 5157db96d56Sopenharmony_ci raise NotImplementedError 5167db96d56Sopenharmony_ci 5177db96d56Sopenharmony_ci def _check_closed(self): 5187db96d56Sopenharmony_ci if self._closed: 5197db96d56Sopenharmony_ci raise RuntimeError('Event loop is closed') 5207db96d56Sopenharmony_ci 5217db96d56Sopenharmony_ci def _check_default_executor(self): 5227db96d56Sopenharmony_ci if self._executor_shutdown_called: 5237db96d56Sopenharmony_ci raise RuntimeError('Executor shutdown has been called') 5247db96d56Sopenharmony_ci 5257db96d56Sopenharmony_ci def _asyncgen_finalizer_hook(self, agen): 5267db96d56Sopenharmony_ci self._asyncgens.discard(agen) 5277db96d56Sopenharmony_ci if not self.is_closed(): 5287db96d56Sopenharmony_ci self.call_soon_threadsafe(self.create_task, agen.aclose()) 5297db96d56Sopenharmony_ci 5307db96d56Sopenharmony_ci def _asyncgen_firstiter_hook(self, agen): 5317db96d56Sopenharmony_ci if self._asyncgens_shutdown_called: 5327db96d56Sopenharmony_ci warnings.warn( 5337db96d56Sopenharmony_ci f"asynchronous generator {agen!r} was scheduled after " 5347db96d56Sopenharmony_ci f"loop.shutdown_asyncgens() call", 5357db96d56Sopenharmony_ci ResourceWarning, source=self) 5367db96d56Sopenharmony_ci 5377db96d56Sopenharmony_ci self._asyncgens.add(agen) 5387db96d56Sopenharmony_ci 5397db96d56Sopenharmony_ci async def shutdown_asyncgens(self): 5407db96d56Sopenharmony_ci """Shutdown all active asynchronous generators.""" 5417db96d56Sopenharmony_ci self._asyncgens_shutdown_called = True 5427db96d56Sopenharmony_ci 5437db96d56Sopenharmony_ci if not len(self._asyncgens): 5447db96d56Sopenharmony_ci # If Python version is <3.6 or we don't have any asynchronous 5457db96d56Sopenharmony_ci # generators alive. 5467db96d56Sopenharmony_ci return 5477db96d56Sopenharmony_ci 5487db96d56Sopenharmony_ci closing_agens = list(self._asyncgens) 5497db96d56Sopenharmony_ci self._asyncgens.clear() 5507db96d56Sopenharmony_ci 5517db96d56Sopenharmony_ci results = await tasks.gather( 5527db96d56Sopenharmony_ci *[ag.aclose() for ag in closing_agens], 5537db96d56Sopenharmony_ci return_exceptions=True) 5547db96d56Sopenharmony_ci 5557db96d56Sopenharmony_ci for result, agen in zip(results, closing_agens): 5567db96d56Sopenharmony_ci if isinstance(result, Exception): 5577db96d56Sopenharmony_ci self.call_exception_handler({ 5587db96d56Sopenharmony_ci 'message': f'an error occurred during closing of ' 5597db96d56Sopenharmony_ci f'asynchronous generator {agen!r}', 5607db96d56Sopenharmony_ci 'exception': result, 5617db96d56Sopenharmony_ci 'asyncgen': agen 5627db96d56Sopenharmony_ci }) 5637db96d56Sopenharmony_ci 5647db96d56Sopenharmony_ci async def shutdown_default_executor(self): 5657db96d56Sopenharmony_ci """Schedule the shutdown of the default executor.""" 5667db96d56Sopenharmony_ci self._executor_shutdown_called = True 5677db96d56Sopenharmony_ci if self._default_executor is None: 5687db96d56Sopenharmony_ci return 5697db96d56Sopenharmony_ci future = self.create_future() 5707db96d56Sopenharmony_ci thread = threading.Thread(target=self._do_shutdown, args=(future,)) 5717db96d56Sopenharmony_ci thread.start() 5727db96d56Sopenharmony_ci try: 5737db96d56Sopenharmony_ci await future 5747db96d56Sopenharmony_ci finally: 5757db96d56Sopenharmony_ci thread.join() 5767db96d56Sopenharmony_ci 5777db96d56Sopenharmony_ci def _do_shutdown(self, future): 5787db96d56Sopenharmony_ci try: 5797db96d56Sopenharmony_ci self._default_executor.shutdown(wait=True) 5807db96d56Sopenharmony_ci if not self.is_closed(): 5817db96d56Sopenharmony_ci self.call_soon_threadsafe(future.set_result, None) 5827db96d56Sopenharmony_ci except Exception as ex: 5837db96d56Sopenharmony_ci if not self.is_closed(): 5847db96d56Sopenharmony_ci self.call_soon_threadsafe(future.set_exception, ex) 5857db96d56Sopenharmony_ci 5867db96d56Sopenharmony_ci def _check_running(self): 5877db96d56Sopenharmony_ci if self.is_running(): 5887db96d56Sopenharmony_ci raise RuntimeError('This event loop is already running') 5897db96d56Sopenharmony_ci if events._get_running_loop() is not None: 5907db96d56Sopenharmony_ci raise RuntimeError( 5917db96d56Sopenharmony_ci 'Cannot run the event loop while another loop is running') 5927db96d56Sopenharmony_ci 5937db96d56Sopenharmony_ci def run_forever(self): 5947db96d56Sopenharmony_ci """Run until stop() is called.""" 5957db96d56Sopenharmony_ci self._check_closed() 5967db96d56Sopenharmony_ci self._check_running() 5977db96d56Sopenharmony_ci self._set_coroutine_origin_tracking(self._debug) 5987db96d56Sopenharmony_ci 5997db96d56Sopenharmony_ci old_agen_hooks = sys.get_asyncgen_hooks() 6007db96d56Sopenharmony_ci try: 6017db96d56Sopenharmony_ci self._thread_id = threading.get_ident() 6027db96d56Sopenharmony_ci sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, 6037db96d56Sopenharmony_ci finalizer=self._asyncgen_finalizer_hook) 6047db96d56Sopenharmony_ci 6057db96d56Sopenharmony_ci events._set_running_loop(self) 6067db96d56Sopenharmony_ci while True: 6077db96d56Sopenharmony_ci self._run_once() 6087db96d56Sopenharmony_ci if self._stopping: 6097db96d56Sopenharmony_ci break 6107db96d56Sopenharmony_ci finally: 6117db96d56Sopenharmony_ci self._stopping = False 6127db96d56Sopenharmony_ci self._thread_id = None 6137db96d56Sopenharmony_ci events._set_running_loop(None) 6147db96d56Sopenharmony_ci self._set_coroutine_origin_tracking(False) 6157db96d56Sopenharmony_ci sys.set_asyncgen_hooks(*old_agen_hooks) 6167db96d56Sopenharmony_ci 6177db96d56Sopenharmony_ci def run_until_complete(self, future): 6187db96d56Sopenharmony_ci """Run until the Future is done. 6197db96d56Sopenharmony_ci 6207db96d56Sopenharmony_ci If the argument is a coroutine, it is wrapped in a Task. 6217db96d56Sopenharmony_ci 6227db96d56Sopenharmony_ci WARNING: It would be disastrous to call run_until_complete() 6237db96d56Sopenharmony_ci with the same coroutine twice -- it would wrap it in two 6247db96d56Sopenharmony_ci different Tasks and that can't be good. 6257db96d56Sopenharmony_ci 6267db96d56Sopenharmony_ci Return the Future's result, or raise its exception. 6277db96d56Sopenharmony_ci """ 6287db96d56Sopenharmony_ci self._check_closed() 6297db96d56Sopenharmony_ci self._check_running() 6307db96d56Sopenharmony_ci 6317db96d56Sopenharmony_ci new_task = not futures.isfuture(future) 6327db96d56Sopenharmony_ci future = tasks.ensure_future(future, loop=self) 6337db96d56Sopenharmony_ci if new_task: 6347db96d56Sopenharmony_ci # An exception is raised if the future didn't complete, so there 6357db96d56Sopenharmony_ci # is no need to log the "destroy pending task" message 6367db96d56Sopenharmony_ci future._log_destroy_pending = False 6377db96d56Sopenharmony_ci 6387db96d56Sopenharmony_ci future.add_done_callback(_run_until_complete_cb) 6397db96d56Sopenharmony_ci try: 6407db96d56Sopenharmony_ci self.run_forever() 6417db96d56Sopenharmony_ci except: 6427db96d56Sopenharmony_ci if new_task and future.done() and not future.cancelled(): 6437db96d56Sopenharmony_ci # The coroutine raised a BaseException. Consume the exception 6447db96d56Sopenharmony_ci # to not log a warning, the caller doesn't have access to the 6457db96d56Sopenharmony_ci # local task. 6467db96d56Sopenharmony_ci future.exception() 6477db96d56Sopenharmony_ci raise 6487db96d56Sopenharmony_ci finally: 6497db96d56Sopenharmony_ci future.remove_done_callback(_run_until_complete_cb) 6507db96d56Sopenharmony_ci if not future.done(): 6517db96d56Sopenharmony_ci raise RuntimeError('Event loop stopped before Future completed.') 6527db96d56Sopenharmony_ci 6537db96d56Sopenharmony_ci return future.result() 6547db96d56Sopenharmony_ci 6557db96d56Sopenharmony_ci def stop(self): 6567db96d56Sopenharmony_ci """Stop running the event loop. 6577db96d56Sopenharmony_ci 6587db96d56Sopenharmony_ci Every callback already scheduled will still run. This simply informs 6597db96d56Sopenharmony_ci run_forever to stop looping after a complete iteration. 6607db96d56Sopenharmony_ci """ 6617db96d56Sopenharmony_ci self._stopping = True 6627db96d56Sopenharmony_ci 6637db96d56Sopenharmony_ci def close(self): 6647db96d56Sopenharmony_ci """Close the event loop. 6657db96d56Sopenharmony_ci 6667db96d56Sopenharmony_ci This clears the queues and shuts down the executor, 6677db96d56Sopenharmony_ci but does not wait for the executor to finish. 6687db96d56Sopenharmony_ci 6697db96d56Sopenharmony_ci The event loop must not be running. 6707db96d56Sopenharmony_ci """ 6717db96d56Sopenharmony_ci if self.is_running(): 6727db96d56Sopenharmony_ci raise RuntimeError("Cannot close a running event loop") 6737db96d56Sopenharmony_ci if self._closed: 6747db96d56Sopenharmony_ci return 6757db96d56Sopenharmony_ci if self._debug: 6767db96d56Sopenharmony_ci logger.debug("Close %r", self) 6777db96d56Sopenharmony_ci self._closed = True 6787db96d56Sopenharmony_ci self._ready.clear() 6797db96d56Sopenharmony_ci self._scheduled.clear() 6807db96d56Sopenharmony_ci self._executor_shutdown_called = True 6817db96d56Sopenharmony_ci executor = self._default_executor 6827db96d56Sopenharmony_ci if executor is not None: 6837db96d56Sopenharmony_ci self._default_executor = None 6847db96d56Sopenharmony_ci executor.shutdown(wait=False) 6857db96d56Sopenharmony_ci 6867db96d56Sopenharmony_ci def is_closed(self): 6877db96d56Sopenharmony_ci """Returns True if the event loop was closed.""" 6887db96d56Sopenharmony_ci return self._closed 6897db96d56Sopenharmony_ci 6907db96d56Sopenharmony_ci def __del__(self, _warn=warnings.warn): 6917db96d56Sopenharmony_ci if not self.is_closed(): 6927db96d56Sopenharmony_ci _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 6937db96d56Sopenharmony_ci if not self.is_running(): 6947db96d56Sopenharmony_ci self.close() 6957db96d56Sopenharmony_ci 6967db96d56Sopenharmony_ci def is_running(self): 6977db96d56Sopenharmony_ci """Returns True if the event loop is running.""" 6987db96d56Sopenharmony_ci return (self._thread_id is not None) 6997db96d56Sopenharmony_ci 7007db96d56Sopenharmony_ci def time(self): 7017db96d56Sopenharmony_ci """Return the time according to the event loop's clock. 7027db96d56Sopenharmony_ci 7037db96d56Sopenharmony_ci This is a float expressed in seconds since an epoch, but the 7047db96d56Sopenharmony_ci epoch, precision, accuracy and drift are unspecified and may 7057db96d56Sopenharmony_ci differ per event loop. 7067db96d56Sopenharmony_ci """ 7077db96d56Sopenharmony_ci return time.monotonic() 7087db96d56Sopenharmony_ci 7097db96d56Sopenharmony_ci def call_later(self, delay, callback, *args, context=None): 7107db96d56Sopenharmony_ci """Arrange for a callback to be called at a given time. 7117db96d56Sopenharmony_ci 7127db96d56Sopenharmony_ci Return a Handle: an opaque object with a cancel() method that 7137db96d56Sopenharmony_ci can be used to cancel the call. 7147db96d56Sopenharmony_ci 7157db96d56Sopenharmony_ci The delay can be an int or float, expressed in seconds. It is 7167db96d56Sopenharmony_ci always relative to the current time. 7177db96d56Sopenharmony_ci 7187db96d56Sopenharmony_ci Each callback will be called exactly once. If two callbacks 7197db96d56Sopenharmony_ci are scheduled for exactly the same time, it undefined which 7207db96d56Sopenharmony_ci will be called first. 7217db96d56Sopenharmony_ci 7227db96d56Sopenharmony_ci Any positional arguments after the callback will be passed to 7237db96d56Sopenharmony_ci the callback when it is called. 7247db96d56Sopenharmony_ci """ 7257db96d56Sopenharmony_ci if delay is None: 7267db96d56Sopenharmony_ci raise TypeError('delay must not be None') 7277db96d56Sopenharmony_ci timer = self.call_at(self.time() + delay, callback, *args, 7287db96d56Sopenharmony_ci context=context) 7297db96d56Sopenharmony_ci if timer._source_traceback: 7307db96d56Sopenharmony_ci del timer._source_traceback[-1] 7317db96d56Sopenharmony_ci return timer 7327db96d56Sopenharmony_ci 7337db96d56Sopenharmony_ci def call_at(self, when, callback, *args, context=None): 7347db96d56Sopenharmony_ci """Like call_later(), but uses an absolute time. 7357db96d56Sopenharmony_ci 7367db96d56Sopenharmony_ci Absolute time corresponds to the event loop's time() method. 7377db96d56Sopenharmony_ci """ 7387db96d56Sopenharmony_ci if when is None: 7397db96d56Sopenharmony_ci raise TypeError("when cannot be None") 7407db96d56Sopenharmony_ci self._check_closed() 7417db96d56Sopenharmony_ci if self._debug: 7427db96d56Sopenharmony_ci self._check_thread() 7437db96d56Sopenharmony_ci self._check_callback(callback, 'call_at') 7447db96d56Sopenharmony_ci timer = events.TimerHandle(when, callback, args, self, context) 7457db96d56Sopenharmony_ci if timer._source_traceback: 7467db96d56Sopenharmony_ci del timer._source_traceback[-1] 7477db96d56Sopenharmony_ci heapq.heappush(self._scheduled, timer) 7487db96d56Sopenharmony_ci timer._scheduled = True 7497db96d56Sopenharmony_ci return timer 7507db96d56Sopenharmony_ci 7517db96d56Sopenharmony_ci def call_soon(self, callback, *args, context=None): 7527db96d56Sopenharmony_ci """Arrange for a callback to be called as soon as possible. 7537db96d56Sopenharmony_ci 7547db96d56Sopenharmony_ci This operates as a FIFO queue: callbacks are called in the 7557db96d56Sopenharmony_ci order in which they are registered. Each callback will be 7567db96d56Sopenharmony_ci called exactly once. 7577db96d56Sopenharmony_ci 7587db96d56Sopenharmony_ci Any positional arguments after the callback will be passed to 7597db96d56Sopenharmony_ci the callback when it is called. 7607db96d56Sopenharmony_ci """ 7617db96d56Sopenharmony_ci self._check_closed() 7627db96d56Sopenharmony_ci if self._debug: 7637db96d56Sopenharmony_ci self._check_thread() 7647db96d56Sopenharmony_ci self._check_callback(callback, 'call_soon') 7657db96d56Sopenharmony_ci handle = self._call_soon(callback, args, context) 7667db96d56Sopenharmony_ci if handle._source_traceback: 7677db96d56Sopenharmony_ci del handle._source_traceback[-1] 7687db96d56Sopenharmony_ci return handle 7697db96d56Sopenharmony_ci 7707db96d56Sopenharmony_ci def _check_callback(self, callback, method): 7717db96d56Sopenharmony_ci if (coroutines.iscoroutine(callback) or 7727db96d56Sopenharmony_ci coroutines.iscoroutinefunction(callback)): 7737db96d56Sopenharmony_ci raise TypeError( 7747db96d56Sopenharmony_ci f"coroutines cannot be used with {method}()") 7757db96d56Sopenharmony_ci if not callable(callback): 7767db96d56Sopenharmony_ci raise TypeError( 7777db96d56Sopenharmony_ci f'a callable object was expected by {method}(), ' 7787db96d56Sopenharmony_ci f'got {callback!r}') 7797db96d56Sopenharmony_ci 7807db96d56Sopenharmony_ci def _call_soon(self, callback, args, context): 7817db96d56Sopenharmony_ci handle = events.Handle(callback, args, self, context) 7827db96d56Sopenharmony_ci if handle._source_traceback: 7837db96d56Sopenharmony_ci del handle._source_traceback[-1] 7847db96d56Sopenharmony_ci self._ready.append(handle) 7857db96d56Sopenharmony_ci return handle 7867db96d56Sopenharmony_ci 7877db96d56Sopenharmony_ci def _check_thread(self): 7887db96d56Sopenharmony_ci """Check that the current thread is the thread running the event loop. 7897db96d56Sopenharmony_ci 7907db96d56Sopenharmony_ci Non-thread-safe methods of this class make this assumption and will 7917db96d56Sopenharmony_ci likely behave incorrectly when the assumption is violated. 7927db96d56Sopenharmony_ci 7937db96d56Sopenharmony_ci Should only be called when (self._debug == True). The caller is 7947db96d56Sopenharmony_ci responsible for checking this condition for performance reasons. 7957db96d56Sopenharmony_ci """ 7967db96d56Sopenharmony_ci if self._thread_id is None: 7977db96d56Sopenharmony_ci return 7987db96d56Sopenharmony_ci thread_id = threading.get_ident() 7997db96d56Sopenharmony_ci if thread_id != self._thread_id: 8007db96d56Sopenharmony_ci raise RuntimeError( 8017db96d56Sopenharmony_ci "Non-thread-safe operation invoked on an event loop other " 8027db96d56Sopenharmony_ci "than the current one") 8037db96d56Sopenharmony_ci 8047db96d56Sopenharmony_ci def call_soon_threadsafe(self, callback, *args, context=None): 8057db96d56Sopenharmony_ci """Like call_soon(), but thread-safe.""" 8067db96d56Sopenharmony_ci self._check_closed() 8077db96d56Sopenharmony_ci if self._debug: 8087db96d56Sopenharmony_ci self._check_callback(callback, 'call_soon_threadsafe') 8097db96d56Sopenharmony_ci handle = self._call_soon(callback, args, context) 8107db96d56Sopenharmony_ci if handle._source_traceback: 8117db96d56Sopenharmony_ci del handle._source_traceback[-1] 8127db96d56Sopenharmony_ci self._write_to_self() 8137db96d56Sopenharmony_ci return handle 8147db96d56Sopenharmony_ci 8157db96d56Sopenharmony_ci def run_in_executor(self, executor, func, *args): 8167db96d56Sopenharmony_ci self._check_closed() 8177db96d56Sopenharmony_ci if self._debug: 8187db96d56Sopenharmony_ci self._check_callback(func, 'run_in_executor') 8197db96d56Sopenharmony_ci if executor is None: 8207db96d56Sopenharmony_ci executor = self._default_executor 8217db96d56Sopenharmony_ci # Only check when the default executor is being used 8227db96d56Sopenharmony_ci self._check_default_executor() 8237db96d56Sopenharmony_ci if executor is None: 8247db96d56Sopenharmony_ci executor = concurrent.futures.ThreadPoolExecutor( 8257db96d56Sopenharmony_ci thread_name_prefix='asyncio' 8267db96d56Sopenharmony_ci ) 8277db96d56Sopenharmony_ci self._default_executor = executor 8287db96d56Sopenharmony_ci return futures.wrap_future( 8297db96d56Sopenharmony_ci executor.submit(func, *args), loop=self) 8307db96d56Sopenharmony_ci 8317db96d56Sopenharmony_ci def set_default_executor(self, executor): 8327db96d56Sopenharmony_ci if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 8337db96d56Sopenharmony_ci raise TypeError('executor must be ThreadPoolExecutor instance') 8347db96d56Sopenharmony_ci self._default_executor = executor 8357db96d56Sopenharmony_ci 8367db96d56Sopenharmony_ci def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 8377db96d56Sopenharmony_ci msg = [f"{host}:{port!r}"] 8387db96d56Sopenharmony_ci if family: 8397db96d56Sopenharmony_ci msg.append(f'family={family!r}') 8407db96d56Sopenharmony_ci if type: 8417db96d56Sopenharmony_ci msg.append(f'type={type!r}') 8427db96d56Sopenharmony_ci if proto: 8437db96d56Sopenharmony_ci msg.append(f'proto={proto!r}') 8447db96d56Sopenharmony_ci if flags: 8457db96d56Sopenharmony_ci msg.append(f'flags={flags!r}') 8467db96d56Sopenharmony_ci msg = ', '.join(msg) 8477db96d56Sopenharmony_ci logger.debug('Get address info %s', msg) 8487db96d56Sopenharmony_ci 8497db96d56Sopenharmony_ci t0 = self.time() 8507db96d56Sopenharmony_ci addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 8517db96d56Sopenharmony_ci dt = self.time() - t0 8527db96d56Sopenharmony_ci 8537db96d56Sopenharmony_ci msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 8547db96d56Sopenharmony_ci if dt >= self.slow_callback_duration: 8557db96d56Sopenharmony_ci logger.info(msg) 8567db96d56Sopenharmony_ci else: 8577db96d56Sopenharmony_ci logger.debug(msg) 8587db96d56Sopenharmony_ci return addrinfo 8597db96d56Sopenharmony_ci 8607db96d56Sopenharmony_ci async def getaddrinfo(self, host, port, *, 8617db96d56Sopenharmony_ci family=0, type=0, proto=0, flags=0): 8627db96d56Sopenharmony_ci if self._debug: 8637db96d56Sopenharmony_ci getaddr_func = self._getaddrinfo_debug 8647db96d56Sopenharmony_ci else: 8657db96d56Sopenharmony_ci getaddr_func = socket.getaddrinfo 8667db96d56Sopenharmony_ci 8677db96d56Sopenharmony_ci return await self.run_in_executor( 8687db96d56Sopenharmony_ci None, getaddr_func, host, port, family, type, proto, flags) 8697db96d56Sopenharmony_ci 8707db96d56Sopenharmony_ci async def getnameinfo(self, sockaddr, flags=0): 8717db96d56Sopenharmony_ci return await self.run_in_executor( 8727db96d56Sopenharmony_ci None, socket.getnameinfo, sockaddr, flags) 8737db96d56Sopenharmony_ci 8747db96d56Sopenharmony_ci async def sock_sendfile(self, sock, file, offset=0, count=None, 8757db96d56Sopenharmony_ci *, fallback=True): 8767db96d56Sopenharmony_ci if self._debug and sock.gettimeout() != 0: 8777db96d56Sopenharmony_ci raise ValueError("the socket must be non-blocking") 8787db96d56Sopenharmony_ci _check_ssl_socket(sock) 8797db96d56Sopenharmony_ci self._check_sendfile_params(sock, file, offset, count) 8807db96d56Sopenharmony_ci try: 8817db96d56Sopenharmony_ci return await self._sock_sendfile_native(sock, file, 8827db96d56Sopenharmony_ci offset, count) 8837db96d56Sopenharmony_ci except exceptions.SendfileNotAvailableError as exc: 8847db96d56Sopenharmony_ci if not fallback: 8857db96d56Sopenharmony_ci raise 8867db96d56Sopenharmony_ci return await self._sock_sendfile_fallback(sock, file, 8877db96d56Sopenharmony_ci offset, count) 8887db96d56Sopenharmony_ci 8897db96d56Sopenharmony_ci async def _sock_sendfile_native(self, sock, file, offset, count): 8907db96d56Sopenharmony_ci # NB: sendfile syscall is not supported for SSL sockets and 8917db96d56Sopenharmony_ci # non-mmap files even if sendfile is supported by OS 8927db96d56Sopenharmony_ci raise exceptions.SendfileNotAvailableError( 8937db96d56Sopenharmony_ci f"syscall sendfile is not available for socket {sock!r} " 8947db96d56Sopenharmony_ci f"and file {file!r} combination") 8957db96d56Sopenharmony_ci 8967db96d56Sopenharmony_ci async def _sock_sendfile_fallback(self, sock, file, offset, count): 8977db96d56Sopenharmony_ci if offset: 8987db96d56Sopenharmony_ci file.seek(offset) 8997db96d56Sopenharmony_ci blocksize = ( 9007db96d56Sopenharmony_ci min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 9017db96d56Sopenharmony_ci if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 9027db96d56Sopenharmony_ci ) 9037db96d56Sopenharmony_ci buf = bytearray(blocksize) 9047db96d56Sopenharmony_ci total_sent = 0 9057db96d56Sopenharmony_ci try: 9067db96d56Sopenharmony_ci while True: 9077db96d56Sopenharmony_ci if count: 9087db96d56Sopenharmony_ci blocksize = min(count - total_sent, blocksize) 9097db96d56Sopenharmony_ci if blocksize <= 0: 9107db96d56Sopenharmony_ci break 9117db96d56Sopenharmony_ci view = memoryview(buf)[:blocksize] 9127db96d56Sopenharmony_ci read = await self.run_in_executor(None, file.readinto, view) 9137db96d56Sopenharmony_ci if not read: 9147db96d56Sopenharmony_ci break # EOF 9157db96d56Sopenharmony_ci await self.sock_sendall(sock, view[:read]) 9167db96d56Sopenharmony_ci total_sent += read 9177db96d56Sopenharmony_ci return total_sent 9187db96d56Sopenharmony_ci finally: 9197db96d56Sopenharmony_ci if total_sent > 0 and hasattr(file, 'seek'): 9207db96d56Sopenharmony_ci file.seek(offset + total_sent) 9217db96d56Sopenharmony_ci 9227db96d56Sopenharmony_ci def _check_sendfile_params(self, sock, file, offset, count): 9237db96d56Sopenharmony_ci if 'b' not in getattr(file, 'mode', 'b'): 9247db96d56Sopenharmony_ci raise ValueError("file should be opened in binary mode") 9257db96d56Sopenharmony_ci if not sock.type == socket.SOCK_STREAM: 9267db96d56Sopenharmony_ci raise ValueError("only SOCK_STREAM type sockets are supported") 9277db96d56Sopenharmony_ci if count is not None: 9287db96d56Sopenharmony_ci if not isinstance(count, int): 9297db96d56Sopenharmony_ci raise TypeError( 9307db96d56Sopenharmony_ci "count must be a positive integer (got {!r})".format(count)) 9317db96d56Sopenharmony_ci if count <= 0: 9327db96d56Sopenharmony_ci raise ValueError( 9337db96d56Sopenharmony_ci "count must be a positive integer (got {!r})".format(count)) 9347db96d56Sopenharmony_ci if not isinstance(offset, int): 9357db96d56Sopenharmony_ci raise TypeError( 9367db96d56Sopenharmony_ci "offset must be a non-negative integer (got {!r})".format( 9377db96d56Sopenharmony_ci offset)) 9387db96d56Sopenharmony_ci if offset < 0: 9397db96d56Sopenharmony_ci raise ValueError( 9407db96d56Sopenharmony_ci "offset must be a non-negative integer (got {!r})".format( 9417db96d56Sopenharmony_ci offset)) 9427db96d56Sopenharmony_ci 9437db96d56Sopenharmony_ci async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 9447db96d56Sopenharmony_ci """Create, bind and connect one socket.""" 9457db96d56Sopenharmony_ci my_exceptions = [] 9467db96d56Sopenharmony_ci exceptions.append(my_exceptions) 9477db96d56Sopenharmony_ci family, type_, proto, _, address = addr_info 9487db96d56Sopenharmony_ci sock = None 9497db96d56Sopenharmony_ci try: 9507db96d56Sopenharmony_ci sock = socket.socket(family=family, type=type_, proto=proto) 9517db96d56Sopenharmony_ci sock.setblocking(False) 9527db96d56Sopenharmony_ci if local_addr_infos is not None: 9537db96d56Sopenharmony_ci for lfamily, _, _, _, laddr in local_addr_infos: 9547db96d56Sopenharmony_ci # skip local addresses of different family 9557db96d56Sopenharmony_ci if lfamily != family: 9567db96d56Sopenharmony_ci continue 9577db96d56Sopenharmony_ci try: 9587db96d56Sopenharmony_ci sock.bind(laddr) 9597db96d56Sopenharmony_ci break 9607db96d56Sopenharmony_ci except OSError as exc: 9617db96d56Sopenharmony_ci msg = ( 9627db96d56Sopenharmony_ci f'error while attempting to bind on ' 9637db96d56Sopenharmony_ci f'address {laddr!r}: ' 9647db96d56Sopenharmony_ci f'{exc.strerror.lower()}' 9657db96d56Sopenharmony_ci ) 9667db96d56Sopenharmony_ci exc = OSError(exc.errno, msg) 9677db96d56Sopenharmony_ci my_exceptions.append(exc) 9687db96d56Sopenharmony_ci else: # all bind attempts failed 9697db96d56Sopenharmony_ci if my_exceptions: 9707db96d56Sopenharmony_ci raise my_exceptions.pop() 9717db96d56Sopenharmony_ci else: 9727db96d56Sopenharmony_ci raise OSError(f"no matching local address with {family=} found") 9737db96d56Sopenharmony_ci await self.sock_connect(sock, address) 9747db96d56Sopenharmony_ci return sock 9757db96d56Sopenharmony_ci except OSError as exc: 9767db96d56Sopenharmony_ci my_exceptions.append(exc) 9777db96d56Sopenharmony_ci if sock is not None: 9787db96d56Sopenharmony_ci sock.close() 9797db96d56Sopenharmony_ci raise 9807db96d56Sopenharmony_ci except: 9817db96d56Sopenharmony_ci if sock is not None: 9827db96d56Sopenharmony_ci sock.close() 9837db96d56Sopenharmony_ci raise 9847db96d56Sopenharmony_ci finally: 9857db96d56Sopenharmony_ci exceptions = my_exceptions = None 9867db96d56Sopenharmony_ci 9877db96d56Sopenharmony_ci async def create_connection( 9887db96d56Sopenharmony_ci self, protocol_factory, host=None, port=None, 9897db96d56Sopenharmony_ci *, ssl=None, family=0, 9907db96d56Sopenharmony_ci proto=0, flags=0, sock=None, 9917db96d56Sopenharmony_ci local_addr=None, server_hostname=None, 9927db96d56Sopenharmony_ci ssl_handshake_timeout=None, 9937db96d56Sopenharmony_ci ssl_shutdown_timeout=None, 9947db96d56Sopenharmony_ci happy_eyeballs_delay=None, interleave=None): 9957db96d56Sopenharmony_ci """Connect to a TCP server. 9967db96d56Sopenharmony_ci 9977db96d56Sopenharmony_ci Create a streaming transport connection to a given internet host and 9987db96d56Sopenharmony_ci port: socket family AF_INET or socket.AF_INET6 depending on host (or 9997db96d56Sopenharmony_ci family if specified), socket type SOCK_STREAM. protocol_factory must be 10007db96d56Sopenharmony_ci a callable returning a protocol instance. 10017db96d56Sopenharmony_ci 10027db96d56Sopenharmony_ci This method is a coroutine which will try to establish the connection 10037db96d56Sopenharmony_ci in the background. When successful, the coroutine returns a 10047db96d56Sopenharmony_ci (transport, protocol) pair. 10057db96d56Sopenharmony_ci """ 10067db96d56Sopenharmony_ci if server_hostname is not None and not ssl: 10077db96d56Sopenharmony_ci raise ValueError('server_hostname is only meaningful with ssl') 10087db96d56Sopenharmony_ci 10097db96d56Sopenharmony_ci if server_hostname is None and ssl: 10107db96d56Sopenharmony_ci # Use host as default for server_hostname. It is an error 10117db96d56Sopenharmony_ci # if host is empty or not set, e.g. when an 10127db96d56Sopenharmony_ci # already-connected socket was passed or when only a port 10137db96d56Sopenharmony_ci # is given. To avoid this error, you can pass 10147db96d56Sopenharmony_ci # server_hostname='' -- this will bypass the hostname 10157db96d56Sopenharmony_ci # check. (This also means that if host is a numeric 10167db96d56Sopenharmony_ci # IP/IPv6 address, we will attempt to verify that exact 10177db96d56Sopenharmony_ci # address; this will probably fail, but it is possible to 10187db96d56Sopenharmony_ci # create a certificate for a specific IP address, so we 10197db96d56Sopenharmony_ci # don't judge it here.) 10207db96d56Sopenharmony_ci if not host: 10217db96d56Sopenharmony_ci raise ValueError('You must set server_hostname ' 10227db96d56Sopenharmony_ci 'when using ssl without a host') 10237db96d56Sopenharmony_ci server_hostname = host 10247db96d56Sopenharmony_ci 10257db96d56Sopenharmony_ci if ssl_handshake_timeout is not None and not ssl: 10267db96d56Sopenharmony_ci raise ValueError( 10277db96d56Sopenharmony_ci 'ssl_handshake_timeout is only meaningful with ssl') 10287db96d56Sopenharmony_ci 10297db96d56Sopenharmony_ci if ssl_shutdown_timeout is not None and not ssl: 10307db96d56Sopenharmony_ci raise ValueError( 10317db96d56Sopenharmony_ci 'ssl_shutdown_timeout is only meaningful with ssl') 10327db96d56Sopenharmony_ci 10337db96d56Sopenharmony_ci if sock is not None: 10347db96d56Sopenharmony_ci _check_ssl_socket(sock) 10357db96d56Sopenharmony_ci 10367db96d56Sopenharmony_ci if happy_eyeballs_delay is not None and interleave is None: 10377db96d56Sopenharmony_ci # If using happy eyeballs, default to interleave addresses by family 10387db96d56Sopenharmony_ci interleave = 1 10397db96d56Sopenharmony_ci 10407db96d56Sopenharmony_ci if host is not None or port is not None: 10417db96d56Sopenharmony_ci if sock is not None: 10427db96d56Sopenharmony_ci raise ValueError( 10437db96d56Sopenharmony_ci 'host/port and sock can not be specified at the same time') 10447db96d56Sopenharmony_ci 10457db96d56Sopenharmony_ci infos = await self._ensure_resolved( 10467db96d56Sopenharmony_ci (host, port), family=family, 10477db96d56Sopenharmony_ci type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 10487db96d56Sopenharmony_ci if not infos: 10497db96d56Sopenharmony_ci raise OSError('getaddrinfo() returned empty list') 10507db96d56Sopenharmony_ci 10517db96d56Sopenharmony_ci if local_addr is not None: 10527db96d56Sopenharmony_ci laddr_infos = await self._ensure_resolved( 10537db96d56Sopenharmony_ci local_addr, family=family, 10547db96d56Sopenharmony_ci type=socket.SOCK_STREAM, proto=proto, 10557db96d56Sopenharmony_ci flags=flags, loop=self) 10567db96d56Sopenharmony_ci if not laddr_infos: 10577db96d56Sopenharmony_ci raise OSError('getaddrinfo() returned empty list') 10587db96d56Sopenharmony_ci else: 10597db96d56Sopenharmony_ci laddr_infos = None 10607db96d56Sopenharmony_ci 10617db96d56Sopenharmony_ci if interleave: 10627db96d56Sopenharmony_ci infos = _interleave_addrinfos(infos, interleave) 10637db96d56Sopenharmony_ci 10647db96d56Sopenharmony_ci exceptions = [] 10657db96d56Sopenharmony_ci if happy_eyeballs_delay is None: 10667db96d56Sopenharmony_ci # not using happy eyeballs 10677db96d56Sopenharmony_ci for addrinfo in infos: 10687db96d56Sopenharmony_ci try: 10697db96d56Sopenharmony_ci sock = await self._connect_sock( 10707db96d56Sopenharmony_ci exceptions, addrinfo, laddr_infos) 10717db96d56Sopenharmony_ci break 10727db96d56Sopenharmony_ci except OSError: 10737db96d56Sopenharmony_ci continue 10747db96d56Sopenharmony_ci else: # using happy eyeballs 10757db96d56Sopenharmony_ci sock, _, _ = await staggered.staggered_race( 10767db96d56Sopenharmony_ci (functools.partial(self._connect_sock, 10777db96d56Sopenharmony_ci exceptions, addrinfo, laddr_infos) 10787db96d56Sopenharmony_ci for addrinfo in infos), 10797db96d56Sopenharmony_ci happy_eyeballs_delay, loop=self) 10807db96d56Sopenharmony_ci 10817db96d56Sopenharmony_ci if sock is None: 10827db96d56Sopenharmony_ci exceptions = [exc for sub in exceptions for exc in sub] 10837db96d56Sopenharmony_ci try: 10847db96d56Sopenharmony_ci if len(exceptions) == 1: 10857db96d56Sopenharmony_ci raise exceptions[0] 10867db96d56Sopenharmony_ci else: 10877db96d56Sopenharmony_ci # If they all have the same str(), raise one. 10887db96d56Sopenharmony_ci model = str(exceptions[0]) 10897db96d56Sopenharmony_ci if all(str(exc) == model for exc in exceptions): 10907db96d56Sopenharmony_ci raise exceptions[0] 10917db96d56Sopenharmony_ci # Raise a combined exception so the user can see all 10927db96d56Sopenharmony_ci # the various error messages. 10937db96d56Sopenharmony_ci raise OSError('Multiple exceptions: {}'.format( 10947db96d56Sopenharmony_ci ', '.join(str(exc) for exc in exceptions))) 10957db96d56Sopenharmony_ci finally: 10967db96d56Sopenharmony_ci exceptions = None 10977db96d56Sopenharmony_ci 10987db96d56Sopenharmony_ci else: 10997db96d56Sopenharmony_ci if sock is None: 11007db96d56Sopenharmony_ci raise ValueError( 11017db96d56Sopenharmony_ci 'host and port was not specified and no sock specified') 11027db96d56Sopenharmony_ci if sock.type != socket.SOCK_STREAM: 11037db96d56Sopenharmony_ci # We allow AF_INET, AF_INET6, AF_UNIX as long as they 11047db96d56Sopenharmony_ci # are SOCK_STREAM. 11057db96d56Sopenharmony_ci # We support passing AF_UNIX sockets even though we have 11067db96d56Sopenharmony_ci # a dedicated API for that: create_unix_connection. 11077db96d56Sopenharmony_ci # Disallowing AF_UNIX in this method, breaks backwards 11087db96d56Sopenharmony_ci # compatibility. 11097db96d56Sopenharmony_ci raise ValueError( 11107db96d56Sopenharmony_ci f'A Stream Socket was expected, got {sock!r}') 11117db96d56Sopenharmony_ci 11127db96d56Sopenharmony_ci transport, protocol = await self._create_connection_transport( 11137db96d56Sopenharmony_ci sock, protocol_factory, ssl, server_hostname, 11147db96d56Sopenharmony_ci ssl_handshake_timeout=ssl_handshake_timeout, 11157db96d56Sopenharmony_ci ssl_shutdown_timeout=ssl_shutdown_timeout) 11167db96d56Sopenharmony_ci if self._debug: 11177db96d56Sopenharmony_ci # Get the socket from the transport because SSL transport closes 11187db96d56Sopenharmony_ci # the old socket and creates a new SSL socket 11197db96d56Sopenharmony_ci sock = transport.get_extra_info('socket') 11207db96d56Sopenharmony_ci logger.debug("%r connected to %s:%r: (%r, %r)", 11217db96d56Sopenharmony_ci sock, host, port, transport, protocol) 11227db96d56Sopenharmony_ci return transport, protocol 11237db96d56Sopenharmony_ci 11247db96d56Sopenharmony_ci async def _create_connection_transport( 11257db96d56Sopenharmony_ci self, sock, protocol_factory, ssl, 11267db96d56Sopenharmony_ci server_hostname, server_side=False, 11277db96d56Sopenharmony_ci ssl_handshake_timeout=None, 11287db96d56Sopenharmony_ci ssl_shutdown_timeout=None): 11297db96d56Sopenharmony_ci 11307db96d56Sopenharmony_ci sock.setblocking(False) 11317db96d56Sopenharmony_ci 11327db96d56Sopenharmony_ci protocol = protocol_factory() 11337db96d56Sopenharmony_ci waiter = self.create_future() 11347db96d56Sopenharmony_ci if ssl: 11357db96d56Sopenharmony_ci sslcontext = None if isinstance(ssl, bool) else ssl 11367db96d56Sopenharmony_ci transport = self._make_ssl_transport( 11377db96d56Sopenharmony_ci sock, protocol, sslcontext, waiter, 11387db96d56Sopenharmony_ci server_side=server_side, server_hostname=server_hostname, 11397db96d56Sopenharmony_ci ssl_handshake_timeout=ssl_handshake_timeout, 11407db96d56Sopenharmony_ci ssl_shutdown_timeout=ssl_shutdown_timeout) 11417db96d56Sopenharmony_ci else: 11427db96d56Sopenharmony_ci transport = self._make_socket_transport(sock, protocol, waiter) 11437db96d56Sopenharmony_ci 11447db96d56Sopenharmony_ci try: 11457db96d56Sopenharmony_ci await waiter 11467db96d56Sopenharmony_ci except: 11477db96d56Sopenharmony_ci transport.close() 11487db96d56Sopenharmony_ci raise 11497db96d56Sopenharmony_ci 11507db96d56Sopenharmony_ci return transport, protocol 11517db96d56Sopenharmony_ci 11527db96d56Sopenharmony_ci async def sendfile(self, transport, file, offset=0, count=None, 11537db96d56Sopenharmony_ci *, fallback=True): 11547db96d56Sopenharmony_ci """Send a file to transport. 11557db96d56Sopenharmony_ci 11567db96d56Sopenharmony_ci Return the total number of bytes which were sent. 11577db96d56Sopenharmony_ci 11587db96d56Sopenharmony_ci The method uses high-performance os.sendfile if available. 11597db96d56Sopenharmony_ci 11607db96d56Sopenharmony_ci file must be a regular file object opened in binary mode. 11617db96d56Sopenharmony_ci 11627db96d56Sopenharmony_ci offset tells from where to start reading the file. If specified, 11637db96d56Sopenharmony_ci count is the total number of bytes to transmit as opposed to 11647db96d56Sopenharmony_ci sending the file until EOF is reached. File position is updated on 11657db96d56Sopenharmony_ci return or also in case of error in which case file.tell() 11667db96d56Sopenharmony_ci can be used to figure out the number of bytes 11677db96d56Sopenharmony_ci which were sent. 11687db96d56Sopenharmony_ci 11697db96d56Sopenharmony_ci fallback set to True makes asyncio to manually read and send 11707db96d56Sopenharmony_ci the file when the platform does not support the sendfile syscall 11717db96d56Sopenharmony_ci (e.g. Windows or SSL socket on Unix). 11727db96d56Sopenharmony_ci 11737db96d56Sopenharmony_ci Raise SendfileNotAvailableError if the system does not support 11747db96d56Sopenharmony_ci sendfile syscall and fallback is False. 11757db96d56Sopenharmony_ci """ 11767db96d56Sopenharmony_ci if transport.is_closing(): 11777db96d56Sopenharmony_ci raise RuntimeError("Transport is closing") 11787db96d56Sopenharmony_ci mode = getattr(transport, '_sendfile_compatible', 11797db96d56Sopenharmony_ci constants._SendfileMode.UNSUPPORTED) 11807db96d56Sopenharmony_ci if mode is constants._SendfileMode.UNSUPPORTED: 11817db96d56Sopenharmony_ci raise RuntimeError( 11827db96d56Sopenharmony_ci f"sendfile is not supported for transport {transport!r}") 11837db96d56Sopenharmony_ci if mode is constants._SendfileMode.TRY_NATIVE: 11847db96d56Sopenharmony_ci try: 11857db96d56Sopenharmony_ci return await self._sendfile_native(transport, file, 11867db96d56Sopenharmony_ci offset, count) 11877db96d56Sopenharmony_ci except exceptions.SendfileNotAvailableError as exc: 11887db96d56Sopenharmony_ci if not fallback: 11897db96d56Sopenharmony_ci raise 11907db96d56Sopenharmony_ci 11917db96d56Sopenharmony_ci if not fallback: 11927db96d56Sopenharmony_ci raise RuntimeError( 11937db96d56Sopenharmony_ci f"fallback is disabled and native sendfile is not " 11947db96d56Sopenharmony_ci f"supported for transport {transport!r}") 11957db96d56Sopenharmony_ci 11967db96d56Sopenharmony_ci return await self._sendfile_fallback(transport, file, 11977db96d56Sopenharmony_ci offset, count) 11987db96d56Sopenharmony_ci 11997db96d56Sopenharmony_ci async def _sendfile_native(self, transp, file, offset, count): 12007db96d56Sopenharmony_ci raise exceptions.SendfileNotAvailableError( 12017db96d56Sopenharmony_ci "sendfile syscall is not supported") 12027db96d56Sopenharmony_ci 12037db96d56Sopenharmony_ci async def _sendfile_fallback(self, transp, file, offset, count): 12047db96d56Sopenharmony_ci if offset: 12057db96d56Sopenharmony_ci file.seek(offset) 12067db96d56Sopenharmony_ci blocksize = min(count, 16384) if count else 16384 12077db96d56Sopenharmony_ci buf = bytearray(blocksize) 12087db96d56Sopenharmony_ci total_sent = 0 12097db96d56Sopenharmony_ci proto = _SendfileFallbackProtocol(transp) 12107db96d56Sopenharmony_ci try: 12117db96d56Sopenharmony_ci while True: 12127db96d56Sopenharmony_ci if count: 12137db96d56Sopenharmony_ci blocksize = min(count - total_sent, blocksize) 12147db96d56Sopenharmony_ci if blocksize <= 0: 12157db96d56Sopenharmony_ci return total_sent 12167db96d56Sopenharmony_ci view = memoryview(buf)[:blocksize] 12177db96d56Sopenharmony_ci read = await self.run_in_executor(None, file.readinto, view) 12187db96d56Sopenharmony_ci if not read: 12197db96d56Sopenharmony_ci return total_sent # EOF 12207db96d56Sopenharmony_ci await proto.drain() 12217db96d56Sopenharmony_ci transp.write(view[:read]) 12227db96d56Sopenharmony_ci total_sent += read 12237db96d56Sopenharmony_ci finally: 12247db96d56Sopenharmony_ci if total_sent > 0 and hasattr(file, 'seek'): 12257db96d56Sopenharmony_ci file.seek(offset + total_sent) 12267db96d56Sopenharmony_ci await proto.restore() 12277db96d56Sopenharmony_ci 12287db96d56Sopenharmony_ci async def start_tls(self, transport, protocol, sslcontext, *, 12297db96d56Sopenharmony_ci server_side=False, 12307db96d56Sopenharmony_ci server_hostname=None, 12317db96d56Sopenharmony_ci ssl_handshake_timeout=None, 12327db96d56Sopenharmony_ci ssl_shutdown_timeout=None): 12337db96d56Sopenharmony_ci """Upgrade transport to TLS. 12347db96d56Sopenharmony_ci 12357db96d56Sopenharmony_ci Return a new transport that *protocol* should start using 12367db96d56Sopenharmony_ci immediately. 12377db96d56Sopenharmony_ci """ 12387db96d56Sopenharmony_ci if ssl is None: 12397db96d56Sopenharmony_ci raise RuntimeError('Python ssl module is not available') 12407db96d56Sopenharmony_ci 12417db96d56Sopenharmony_ci if not isinstance(sslcontext, ssl.SSLContext): 12427db96d56Sopenharmony_ci raise TypeError( 12437db96d56Sopenharmony_ci f'sslcontext is expected to be an instance of ssl.SSLContext, ' 12447db96d56Sopenharmony_ci f'got {sslcontext!r}') 12457db96d56Sopenharmony_ci 12467db96d56Sopenharmony_ci if not getattr(transport, '_start_tls_compatible', False): 12477db96d56Sopenharmony_ci raise TypeError( 12487db96d56Sopenharmony_ci f'transport {transport!r} is not supported by start_tls()') 12497db96d56Sopenharmony_ci 12507db96d56Sopenharmony_ci waiter = self.create_future() 12517db96d56Sopenharmony_ci ssl_protocol = sslproto.SSLProtocol( 12527db96d56Sopenharmony_ci self, protocol, sslcontext, waiter, 12537db96d56Sopenharmony_ci server_side, server_hostname, 12547db96d56Sopenharmony_ci ssl_handshake_timeout=ssl_handshake_timeout, 12557db96d56Sopenharmony_ci ssl_shutdown_timeout=ssl_shutdown_timeout, 12567db96d56Sopenharmony_ci call_connection_made=False) 12577db96d56Sopenharmony_ci 12587db96d56Sopenharmony_ci # Pause early so that "ssl_protocol.data_received()" doesn't 12597db96d56Sopenharmony_ci # have a chance to get called before "ssl_protocol.connection_made()". 12607db96d56Sopenharmony_ci transport.pause_reading() 12617db96d56Sopenharmony_ci 12627db96d56Sopenharmony_ci transport.set_protocol(ssl_protocol) 12637db96d56Sopenharmony_ci conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 12647db96d56Sopenharmony_ci resume_cb = self.call_soon(transport.resume_reading) 12657db96d56Sopenharmony_ci 12667db96d56Sopenharmony_ci try: 12677db96d56Sopenharmony_ci await waiter 12687db96d56Sopenharmony_ci except BaseException: 12697db96d56Sopenharmony_ci transport.close() 12707db96d56Sopenharmony_ci conmade_cb.cancel() 12717db96d56Sopenharmony_ci resume_cb.cancel() 12727db96d56Sopenharmony_ci raise 12737db96d56Sopenharmony_ci 12747db96d56Sopenharmony_ci return ssl_protocol._app_transport 12757db96d56Sopenharmony_ci 12767db96d56Sopenharmony_ci async def create_datagram_endpoint(self, protocol_factory, 12777db96d56Sopenharmony_ci local_addr=None, remote_addr=None, *, 12787db96d56Sopenharmony_ci family=0, proto=0, flags=0, 12797db96d56Sopenharmony_ci reuse_port=None, 12807db96d56Sopenharmony_ci allow_broadcast=None, sock=None): 12817db96d56Sopenharmony_ci """Create datagram connection.""" 12827db96d56Sopenharmony_ci if sock is not None: 12837db96d56Sopenharmony_ci if sock.type != socket.SOCK_DGRAM: 12847db96d56Sopenharmony_ci raise ValueError( 12857db96d56Sopenharmony_ci f'A UDP Socket was expected, got {sock!r}') 12867db96d56Sopenharmony_ci if (local_addr or remote_addr or 12877db96d56Sopenharmony_ci family or proto or flags or 12887db96d56Sopenharmony_ci reuse_port or allow_broadcast): 12897db96d56Sopenharmony_ci # show the problematic kwargs in exception msg 12907db96d56Sopenharmony_ci opts = dict(local_addr=local_addr, remote_addr=remote_addr, 12917db96d56Sopenharmony_ci family=family, proto=proto, flags=flags, 12927db96d56Sopenharmony_ci reuse_port=reuse_port, 12937db96d56Sopenharmony_ci allow_broadcast=allow_broadcast) 12947db96d56Sopenharmony_ci problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 12957db96d56Sopenharmony_ci raise ValueError( 12967db96d56Sopenharmony_ci f'socket modifier keyword arguments can not be used ' 12977db96d56Sopenharmony_ci f'when sock is specified. ({problems})') 12987db96d56Sopenharmony_ci sock.setblocking(False) 12997db96d56Sopenharmony_ci r_addr = None 13007db96d56Sopenharmony_ci else: 13017db96d56Sopenharmony_ci if not (local_addr or remote_addr): 13027db96d56Sopenharmony_ci if family == 0: 13037db96d56Sopenharmony_ci raise ValueError('unexpected address family') 13047db96d56Sopenharmony_ci addr_pairs_info = (((family, proto), (None, None)),) 13057db96d56Sopenharmony_ci elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 13067db96d56Sopenharmony_ci for addr in (local_addr, remote_addr): 13077db96d56Sopenharmony_ci if addr is not None and not isinstance(addr, str): 13087db96d56Sopenharmony_ci raise TypeError('string is expected') 13097db96d56Sopenharmony_ci 13107db96d56Sopenharmony_ci if local_addr and local_addr[0] not in (0, '\x00'): 13117db96d56Sopenharmony_ci try: 13127db96d56Sopenharmony_ci if stat.S_ISSOCK(os.stat(local_addr).st_mode): 13137db96d56Sopenharmony_ci os.remove(local_addr) 13147db96d56Sopenharmony_ci except FileNotFoundError: 13157db96d56Sopenharmony_ci pass 13167db96d56Sopenharmony_ci except OSError as err: 13177db96d56Sopenharmony_ci # Directory may have permissions only to create socket. 13187db96d56Sopenharmony_ci logger.error('Unable to check or remove stale UNIX ' 13197db96d56Sopenharmony_ci 'socket %r: %r', 13207db96d56Sopenharmony_ci local_addr, err) 13217db96d56Sopenharmony_ci 13227db96d56Sopenharmony_ci addr_pairs_info = (((family, proto), 13237db96d56Sopenharmony_ci (local_addr, remote_addr)), ) 13247db96d56Sopenharmony_ci else: 13257db96d56Sopenharmony_ci # join address by (family, protocol) 13267db96d56Sopenharmony_ci addr_infos = {} # Using order preserving dict 13277db96d56Sopenharmony_ci for idx, addr in ((0, local_addr), (1, remote_addr)): 13287db96d56Sopenharmony_ci if addr is not None: 13297db96d56Sopenharmony_ci if not (isinstance(addr, tuple) and len(addr) == 2): 13307db96d56Sopenharmony_ci raise TypeError('2-tuple is expected') 13317db96d56Sopenharmony_ci 13327db96d56Sopenharmony_ci infos = await self._ensure_resolved( 13337db96d56Sopenharmony_ci addr, family=family, type=socket.SOCK_DGRAM, 13347db96d56Sopenharmony_ci proto=proto, flags=flags, loop=self) 13357db96d56Sopenharmony_ci if not infos: 13367db96d56Sopenharmony_ci raise OSError('getaddrinfo() returned empty list') 13377db96d56Sopenharmony_ci 13387db96d56Sopenharmony_ci for fam, _, pro, _, address in infos: 13397db96d56Sopenharmony_ci key = (fam, pro) 13407db96d56Sopenharmony_ci if key not in addr_infos: 13417db96d56Sopenharmony_ci addr_infos[key] = [None, None] 13427db96d56Sopenharmony_ci addr_infos[key][idx] = address 13437db96d56Sopenharmony_ci 13447db96d56Sopenharmony_ci # each addr has to have info for each (family, proto) pair 13457db96d56Sopenharmony_ci addr_pairs_info = [ 13467db96d56Sopenharmony_ci (key, addr_pair) for key, addr_pair in addr_infos.items() 13477db96d56Sopenharmony_ci if not ((local_addr and addr_pair[0] is None) or 13487db96d56Sopenharmony_ci (remote_addr and addr_pair[1] is None))] 13497db96d56Sopenharmony_ci 13507db96d56Sopenharmony_ci if not addr_pairs_info: 13517db96d56Sopenharmony_ci raise ValueError('can not get address information') 13527db96d56Sopenharmony_ci 13537db96d56Sopenharmony_ci exceptions = [] 13547db96d56Sopenharmony_ci 13557db96d56Sopenharmony_ci for ((family, proto), 13567db96d56Sopenharmony_ci (local_address, remote_address)) in addr_pairs_info: 13577db96d56Sopenharmony_ci sock = None 13587db96d56Sopenharmony_ci r_addr = None 13597db96d56Sopenharmony_ci try: 13607db96d56Sopenharmony_ci sock = socket.socket( 13617db96d56Sopenharmony_ci family=family, type=socket.SOCK_DGRAM, proto=proto) 13627db96d56Sopenharmony_ci if reuse_port: 13637db96d56Sopenharmony_ci _set_reuseport(sock) 13647db96d56Sopenharmony_ci if allow_broadcast: 13657db96d56Sopenharmony_ci sock.setsockopt( 13667db96d56Sopenharmony_ci socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 13677db96d56Sopenharmony_ci sock.setblocking(False) 13687db96d56Sopenharmony_ci 13697db96d56Sopenharmony_ci if local_addr: 13707db96d56Sopenharmony_ci sock.bind(local_address) 13717db96d56Sopenharmony_ci if remote_addr: 13727db96d56Sopenharmony_ci if not allow_broadcast: 13737db96d56Sopenharmony_ci await self.sock_connect(sock, remote_address) 13747db96d56Sopenharmony_ci r_addr = remote_address 13757db96d56Sopenharmony_ci except OSError as exc: 13767db96d56Sopenharmony_ci if sock is not None: 13777db96d56Sopenharmony_ci sock.close() 13787db96d56Sopenharmony_ci exceptions.append(exc) 13797db96d56Sopenharmony_ci except: 13807db96d56Sopenharmony_ci if sock is not None: 13817db96d56Sopenharmony_ci sock.close() 13827db96d56Sopenharmony_ci raise 13837db96d56Sopenharmony_ci else: 13847db96d56Sopenharmony_ci break 13857db96d56Sopenharmony_ci else: 13867db96d56Sopenharmony_ci raise exceptions[0] 13877db96d56Sopenharmony_ci 13887db96d56Sopenharmony_ci protocol = protocol_factory() 13897db96d56Sopenharmony_ci waiter = self.create_future() 13907db96d56Sopenharmony_ci transport = self._make_datagram_transport( 13917db96d56Sopenharmony_ci sock, protocol, r_addr, waiter) 13927db96d56Sopenharmony_ci if self._debug: 13937db96d56Sopenharmony_ci if local_addr: 13947db96d56Sopenharmony_ci logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 13957db96d56Sopenharmony_ci "created: (%r, %r)", 13967db96d56Sopenharmony_ci local_addr, remote_addr, transport, protocol) 13977db96d56Sopenharmony_ci else: 13987db96d56Sopenharmony_ci logger.debug("Datagram endpoint remote_addr=%r created: " 13997db96d56Sopenharmony_ci "(%r, %r)", 14007db96d56Sopenharmony_ci remote_addr, transport, protocol) 14017db96d56Sopenharmony_ci 14027db96d56Sopenharmony_ci try: 14037db96d56Sopenharmony_ci await waiter 14047db96d56Sopenharmony_ci except: 14057db96d56Sopenharmony_ci transport.close() 14067db96d56Sopenharmony_ci raise 14077db96d56Sopenharmony_ci 14087db96d56Sopenharmony_ci return transport, protocol 14097db96d56Sopenharmony_ci 14107db96d56Sopenharmony_ci async def _ensure_resolved(self, address, *, 14117db96d56Sopenharmony_ci family=0, type=socket.SOCK_STREAM, 14127db96d56Sopenharmony_ci proto=0, flags=0, loop): 14137db96d56Sopenharmony_ci host, port = address[:2] 14147db96d56Sopenharmony_ci info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 14157db96d56Sopenharmony_ci if info is not None: 14167db96d56Sopenharmony_ci # "host" is already a resolved IP. 14177db96d56Sopenharmony_ci return [info] 14187db96d56Sopenharmony_ci else: 14197db96d56Sopenharmony_ci return await loop.getaddrinfo(host, port, family=family, type=type, 14207db96d56Sopenharmony_ci proto=proto, flags=flags) 14217db96d56Sopenharmony_ci 14227db96d56Sopenharmony_ci async def _create_server_getaddrinfo(self, host, port, family, flags): 14237db96d56Sopenharmony_ci infos = await self._ensure_resolved((host, port), family=family, 14247db96d56Sopenharmony_ci type=socket.SOCK_STREAM, 14257db96d56Sopenharmony_ci flags=flags, loop=self) 14267db96d56Sopenharmony_ci if not infos: 14277db96d56Sopenharmony_ci raise OSError(f'getaddrinfo({host!r}) returned empty list') 14287db96d56Sopenharmony_ci return infos 14297db96d56Sopenharmony_ci 14307db96d56Sopenharmony_ci async def create_server( 14317db96d56Sopenharmony_ci self, protocol_factory, host=None, port=None, 14327db96d56Sopenharmony_ci *, 14337db96d56Sopenharmony_ci family=socket.AF_UNSPEC, 14347db96d56Sopenharmony_ci flags=socket.AI_PASSIVE, 14357db96d56Sopenharmony_ci sock=None, 14367db96d56Sopenharmony_ci backlog=100, 14377db96d56Sopenharmony_ci ssl=None, 14387db96d56Sopenharmony_ci reuse_address=None, 14397db96d56Sopenharmony_ci reuse_port=None, 14407db96d56Sopenharmony_ci ssl_handshake_timeout=None, 14417db96d56Sopenharmony_ci ssl_shutdown_timeout=None, 14427db96d56Sopenharmony_ci start_serving=True): 14437db96d56Sopenharmony_ci """Create a TCP server. 14447db96d56Sopenharmony_ci 14457db96d56Sopenharmony_ci The host parameter can be a string, in that case the TCP server is 14467db96d56Sopenharmony_ci bound to host and port. 14477db96d56Sopenharmony_ci 14487db96d56Sopenharmony_ci The host parameter can also be a sequence of strings and in that case 14497db96d56Sopenharmony_ci the TCP server is bound to all hosts of the sequence. If a host 14507db96d56Sopenharmony_ci appears multiple times (possibly indirectly e.g. when hostnames 14517db96d56Sopenharmony_ci resolve to the same IP address), the server is only bound once to that 14527db96d56Sopenharmony_ci host. 14537db96d56Sopenharmony_ci 14547db96d56Sopenharmony_ci Return a Server object which can be used to stop the service. 14557db96d56Sopenharmony_ci 14567db96d56Sopenharmony_ci This method is a coroutine. 14577db96d56Sopenharmony_ci """ 14587db96d56Sopenharmony_ci if isinstance(ssl, bool): 14597db96d56Sopenharmony_ci raise TypeError('ssl argument must be an SSLContext or None') 14607db96d56Sopenharmony_ci 14617db96d56Sopenharmony_ci if ssl_handshake_timeout is not None and ssl is None: 14627db96d56Sopenharmony_ci raise ValueError( 14637db96d56Sopenharmony_ci 'ssl_handshake_timeout is only meaningful with ssl') 14647db96d56Sopenharmony_ci 14657db96d56Sopenharmony_ci if ssl_shutdown_timeout is not None and ssl is None: 14667db96d56Sopenharmony_ci raise ValueError( 14677db96d56Sopenharmony_ci 'ssl_shutdown_timeout is only meaningful with ssl') 14687db96d56Sopenharmony_ci 14697db96d56Sopenharmony_ci if sock is not None: 14707db96d56Sopenharmony_ci _check_ssl_socket(sock) 14717db96d56Sopenharmony_ci 14727db96d56Sopenharmony_ci if host is not None or port is not None: 14737db96d56Sopenharmony_ci if sock is not None: 14747db96d56Sopenharmony_ci raise ValueError( 14757db96d56Sopenharmony_ci 'host/port and sock can not be specified at the same time') 14767db96d56Sopenharmony_ci 14777db96d56Sopenharmony_ci if reuse_address is None: 14787db96d56Sopenharmony_ci reuse_address = os.name == "posix" and sys.platform != "cygwin" 14797db96d56Sopenharmony_ci sockets = [] 14807db96d56Sopenharmony_ci if host == '': 14817db96d56Sopenharmony_ci hosts = [None] 14827db96d56Sopenharmony_ci elif (isinstance(host, str) or 14837db96d56Sopenharmony_ci not isinstance(host, collections.abc.Iterable)): 14847db96d56Sopenharmony_ci hosts = [host] 14857db96d56Sopenharmony_ci else: 14867db96d56Sopenharmony_ci hosts = host 14877db96d56Sopenharmony_ci 14887db96d56Sopenharmony_ci fs = [self._create_server_getaddrinfo(host, port, family=family, 14897db96d56Sopenharmony_ci flags=flags) 14907db96d56Sopenharmony_ci for host in hosts] 14917db96d56Sopenharmony_ci infos = await tasks.gather(*fs) 14927db96d56Sopenharmony_ci infos = set(itertools.chain.from_iterable(infos)) 14937db96d56Sopenharmony_ci 14947db96d56Sopenharmony_ci completed = False 14957db96d56Sopenharmony_ci try: 14967db96d56Sopenharmony_ci for res in infos: 14977db96d56Sopenharmony_ci af, socktype, proto, canonname, sa = res 14987db96d56Sopenharmony_ci try: 14997db96d56Sopenharmony_ci sock = socket.socket(af, socktype, proto) 15007db96d56Sopenharmony_ci except socket.error: 15017db96d56Sopenharmony_ci # Assume it's a bad family/type/protocol combination. 15027db96d56Sopenharmony_ci if self._debug: 15037db96d56Sopenharmony_ci logger.warning('create_server() failed to create ' 15047db96d56Sopenharmony_ci 'socket.socket(%r, %r, %r)', 15057db96d56Sopenharmony_ci af, socktype, proto, exc_info=True) 15067db96d56Sopenharmony_ci continue 15077db96d56Sopenharmony_ci sockets.append(sock) 15087db96d56Sopenharmony_ci if reuse_address: 15097db96d56Sopenharmony_ci sock.setsockopt( 15107db96d56Sopenharmony_ci socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 15117db96d56Sopenharmony_ci if reuse_port: 15127db96d56Sopenharmony_ci _set_reuseport(sock) 15137db96d56Sopenharmony_ci # Disable IPv4/IPv6 dual stack support (enabled by 15147db96d56Sopenharmony_ci # default on Linux) which makes a single socket 15157db96d56Sopenharmony_ci # listen on both address families. 15167db96d56Sopenharmony_ci if (_HAS_IPv6 and 15177db96d56Sopenharmony_ci af == socket.AF_INET6 and 15187db96d56Sopenharmony_ci hasattr(socket, 'IPPROTO_IPV6')): 15197db96d56Sopenharmony_ci sock.setsockopt(socket.IPPROTO_IPV6, 15207db96d56Sopenharmony_ci socket.IPV6_V6ONLY, 15217db96d56Sopenharmony_ci True) 15227db96d56Sopenharmony_ci try: 15237db96d56Sopenharmony_ci sock.bind(sa) 15247db96d56Sopenharmony_ci except OSError as err: 15257db96d56Sopenharmony_ci raise OSError(err.errno, 'error while attempting ' 15267db96d56Sopenharmony_ci 'to bind on address %r: %s' 15277db96d56Sopenharmony_ci % (sa, err.strerror.lower())) from None 15287db96d56Sopenharmony_ci completed = True 15297db96d56Sopenharmony_ci finally: 15307db96d56Sopenharmony_ci if not completed: 15317db96d56Sopenharmony_ci for sock in sockets: 15327db96d56Sopenharmony_ci sock.close() 15337db96d56Sopenharmony_ci else: 15347db96d56Sopenharmony_ci if sock is None: 15357db96d56Sopenharmony_ci raise ValueError('Neither host/port nor sock were specified') 15367db96d56Sopenharmony_ci if sock.type != socket.SOCK_STREAM: 15377db96d56Sopenharmony_ci raise ValueError(f'A Stream Socket was expected, got {sock!r}') 15387db96d56Sopenharmony_ci sockets = [sock] 15397db96d56Sopenharmony_ci 15407db96d56Sopenharmony_ci for sock in sockets: 15417db96d56Sopenharmony_ci sock.setblocking(False) 15427db96d56Sopenharmony_ci 15437db96d56Sopenharmony_ci server = Server(self, sockets, protocol_factory, 15447db96d56Sopenharmony_ci ssl, backlog, ssl_handshake_timeout, 15457db96d56Sopenharmony_ci ssl_shutdown_timeout) 15467db96d56Sopenharmony_ci if start_serving: 15477db96d56Sopenharmony_ci server._start_serving() 15487db96d56Sopenharmony_ci # Skip one loop iteration so that all 'loop.add_reader' 15497db96d56Sopenharmony_ci # go through. 15507db96d56Sopenharmony_ci await tasks.sleep(0) 15517db96d56Sopenharmony_ci 15527db96d56Sopenharmony_ci if self._debug: 15537db96d56Sopenharmony_ci logger.info("%r is serving", server) 15547db96d56Sopenharmony_ci return server 15557db96d56Sopenharmony_ci 15567db96d56Sopenharmony_ci async def connect_accepted_socket( 15577db96d56Sopenharmony_ci self, protocol_factory, sock, 15587db96d56Sopenharmony_ci *, ssl=None, 15597db96d56Sopenharmony_ci ssl_handshake_timeout=None, 15607db96d56Sopenharmony_ci ssl_shutdown_timeout=None): 15617db96d56Sopenharmony_ci if sock.type != socket.SOCK_STREAM: 15627db96d56Sopenharmony_ci raise ValueError(f'A Stream Socket was expected, got {sock!r}') 15637db96d56Sopenharmony_ci 15647db96d56Sopenharmony_ci if ssl_handshake_timeout is not None and not ssl: 15657db96d56Sopenharmony_ci raise ValueError( 15667db96d56Sopenharmony_ci 'ssl_handshake_timeout is only meaningful with ssl') 15677db96d56Sopenharmony_ci 15687db96d56Sopenharmony_ci if ssl_shutdown_timeout is not None and not ssl: 15697db96d56Sopenharmony_ci raise ValueError( 15707db96d56Sopenharmony_ci 'ssl_shutdown_timeout is only meaningful with ssl') 15717db96d56Sopenharmony_ci 15727db96d56Sopenharmony_ci if sock is not None: 15737db96d56Sopenharmony_ci _check_ssl_socket(sock) 15747db96d56Sopenharmony_ci 15757db96d56Sopenharmony_ci transport, protocol = await self._create_connection_transport( 15767db96d56Sopenharmony_ci sock, protocol_factory, ssl, '', server_side=True, 15777db96d56Sopenharmony_ci ssl_handshake_timeout=ssl_handshake_timeout, 15787db96d56Sopenharmony_ci ssl_shutdown_timeout=ssl_shutdown_timeout) 15797db96d56Sopenharmony_ci if self._debug: 15807db96d56Sopenharmony_ci # Get the socket from the transport because SSL transport closes 15817db96d56Sopenharmony_ci # the old socket and creates a new SSL socket 15827db96d56Sopenharmony_ci sock = transport.get_extra_info('socket') 15837db96d56Sopenharmony_ci logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 15847db96d56Sopenharmony_ci return transport, protocol 15857db96d56Sopenharmony_ci 15867db96d56Sopenharmony_ci async def connect_read_pipe(self, protocol_factory, pipe): 15877db96d56Sopenharmony_ci protocol = protocol_factory() 15887db96d56Sopenharmony_ci waiter = self.create_future() 15897db96d56Sopenharmony_ci transport = self._make_read_pipe_transport(pipe, protocol, waiter) 15907db96d56Sopenharmony_ci 15917db96d56Sopenharmony_ci try: 15927db96d56Sopenharmony_ci await waiter 15937db96d56Sopenharmony_ci except: 15947db96d56Sopenharmony_ci transport.close() 15957db96d56Sopenharmony_ci raise 15967db96d56Sopenharmony_ci 15977db96d56Sopenharmony_ci if self._debug: 15987db96d56Sopenharmony_ci logger.debug('Read pipe %r connected: (%r, %r)', 15997db96d56Sopenharmony_ci pipe.fileno(), transport, protocol) 16007db96d56Sopenharmony_ci return transport, protocol 16017db96d56Sopenharmony_ci 16027db96d56Sopenharmony_ci async def connect_write_pipe(self, protocol_factory, pipe): 16037db96d56Sopenharmony_ci protocol = protocol_factory() 16047db96d56Sopenharmony_ci waiter = self.create_future() 16057db96d56Sopenharmony_ci transport = self._make_write_pipe_transport(pipe, protocol, waiter) 16067db96d56Sopenharmony_ci 16077db96d56Sopenharmony_ci try: 16087db96d56Sopenharmony_ci await waiter 16097db96d56Sopenharmony_ci except: 16107db96d56Sopenharmony_ci transport.close() 16117db96d56Sopenharmony_ci raise 16127db96d56Sopenharmony_ci 16137db96d56Sopenharmony_ci if self._debug: 16147db96d56Sopenharmony_ci logger.debug('Write pipe %r connected: (%r, %r)', 16157db96d56Sopenharmony_ci pipe.fileno(), transport, protocol) 16167db96d56Sopenharmony_ci return transport, protocol 16177db96d56Sopenharmony_ci 16187db96d56Sopenharmony_ci def _log_subprocess(self, msg, stdin, stdout, stderr): 16197db96d56Sopenharmony_ci info = [msg] 16207db96d56Sopenharmony_ci if stdin is not None: 16217db96d56Sopenharmony_ci info.append(f'stdin={_format_pipe(stdin)}') 16227db96d56Sopenharmony_ci if stdout is not None and stderr == subprocess.STDOUT: 16237db96d56Sopenharmony_ci info.append(f'stdout=stderr={_format_pipe(stdout)}') 16247db96d56Sopenharmony_ci else: 16257db96d56Sopenharmony_ci if stdout is not None: 16267db96d56Sopenharmony_ci info.append(f'stdout={_format_pipe(stdout)}') 16277db96d56Sopenharmony_ci if stderr is not None: 16287db96d56Sopenharmony_ci info.append(f'stderr={_format_pipe(stderr)}') 16297db96d56Sopenharmony_ci logger.debug(' '.join(info)) 16307db96d56Sopenharmony_ci 16317db96d56Sopenharmony_ci async def subprocess_shell(self, protocol_factory, cmd, *, 16327db96d56Sopenharmony_ci stdin=subprocess.PIPE, 16337db96d56Sopenharmony_ci stdout=subprocess.PIPE, 16347db96d56Sopenharmony_ci stderr=subprocess.PIPE, 16357db96d56Sopenharmony_ci universal_newlines=False, 16367db96d56Sopenharmony_ci shell=True, bufsize=0, 16377db96d56Sopenharmony_ci encoding=None, errors=None, text=None, 16387db96d56Sopenharmony_ci **kwargs): 16397db96d56Sopenharmony_ci if not isinstance(cmd, (bytes, str)): 16407db96d56Sopenharmony_ci raise ValueError("cmd must be a string") 16417db96d56Sopenharmony_ci if universal_newlines: 16427db96d56Sopenharmony_ci raise ValueError("universal_newlines must be False") 16437db96d56Sopenharmony_ci if not shell: 16447db96d56Sopenharmony_ci raise ValueError("shell must be True") 16457db96d56Sopenharmony_ci if bufsize != 0: 16467db96d56Sopenharmony_ci raise ValueError("bufsize must be 0") 16477db96d56Sopenharmony_ci if text: 16487db96d56Sopenharmony_ci raise ValueError("text must be False") 16497db96d56Sopenharmony_ci if encoding is not None: 16507db96d56Sopenharmony_ci raise ValueError("encoding must be None") 16517db96d56Sopenharmony_ci if errors is not None: 16527db96d56Sopenharmony_ci raise ValueError("errors must be None") 16537db96d56Sopenharmony_ci 16547db96d56Sopenharmony_ci protocol = protocol_factory() 16557db96d56Sopenharmony_ci debug_log = None 16567db96d56Sopenharmony_ci if self._debug: 16577db96d56Sopenharmony_ci # don't log parameters: they may contain sensitive information 16587db96d56Sopenharmony_ci # (password) and may be too long 16597db96d56Sopenharmony_ci debug_log = 'run shell command %r' % cmd 16607db96d56Sopenharmony_ci self._log_subprocess(debug_log, stdin, stdout, stderr) 16617db96d56Sopenharmony_ci transport = await self._make_subprocess_transport( 16627db96d56Sopenharmony_ci protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 16637db96d56Sopenharmony_ci if self._debug and debug_log is not None: 16647db96d56Sopenharmony_ci logger.info('%s: %r', debug_log, transport) 16657db96d56Sopenharmony_ci return transport, protocol 16667db96d56Sopenharmony_ci 16677db96d56Sopenharmony_ci async def subprocess_exec(self, protocol_factory, program, *args, 16687db96d56Sopenharmony_ci stdin=subprocess.PIPE, stdout=subprocess.PIPE, 16697db96d56Sopenharmony_ci stderr=subprocess.PIPE, universal_newlines=False, 16707db96d56Sopenharmony_ci shell=False, bufsize=0, 16717db96d56Sopenharmony_ci encoding=None, errors=None, text=None, 16727db96d56Sopenharmony_ci **kwargs): 16737db96d56Sopenharmony_ci if universal_newlines: 16747db96d56Sopenharmony_ci raise ValueError("universal_newlines must be False") 16757db96d56Sopenharmony_ci if shell: 16767db96d56Sopenharmony_ci raise ValueError("shell must be False") 16777db96d56Sopenharmony_ci if bufsize != 0: 16787db96d56Sopenharmony_ci raise ValueError("bufsize must be 0") 16797db96d56Sopenharmony_ci if text: 16807db96d56Sopenharmony_ci raise ValueError("text must be False") 16817db96d56Sopenharmony_ci if encoding is not None: 16827db96d56Sopenharmony_ci raise ValueError("encoding must be None") 16837db96d56Sopenharmony_ci if errors is not None: 16847db96d56Sopenharmony_ci raise ValueError("errors must be None") 16857db96d56Sopenharmony_ci 16867db96d56Sopenharmony_ci popen_args = (program,) + args 16877db96d56Sopenharmony_ci protocol = protocol_factory() 16887db96d56Sopenharmony_ci debug_log = None 16897db96d56Sopenharmony_ci if self._debug: 16907db96d56Sopenharmony_ci # don't log parameters: they may contain sensitive information 16917db96d56Sopenharmony_ci # (password) and may be too long 16927db96d56Sopenharmony_ci debug_log = f'execute program {program!r}' 16937db96d56Sopenharmony_ci self._log_subprocess(debug_log, stdin, stdout, stderr) 16947db96d56Sopenharmony_ci transport = await self._make_subprocess_transport( 16957db96d56Sopenharmony_ci protocol, popen_args, False, stdin, stdout, stderr, 16967db96d56Sopenharmony_ci bufsize, **kwargs) 16977db96d56Sopenharmony_ci if self._debug and debug_log is not None: 16987db96d56Sopenharmony_ci logger.info('%s: %r', debug_log, transport) 16997db96d56Sopenharmony_ci return transport, protocol 17007db96d56Sopenharmony_ci 17017db96d56Sopenharmony_ci def get_exception_handler(self): 17027db96d56Sopenharmony_ci """Return an exception handler, or None if the default one is in use. 17037db96d56Sopenharmony_ci """ 17047db96d56Sopenharmony_ci return self._exception_handler 17057db96d56Sopenharmony_ci 17067db96d56Sopenharmony_ci def set_exception_handler(self, handler): 17077db96d56Sopenharmony_ci """Set handler as the new event loop exception handler. 17087db96d56Sopenharmony_ci 17097db96d56Sopenharmony_ci If handler is None, the default exception handler will 17107db96d56Sopenharmony_ci be set. 17117db96d56Sopenharmony_ci 17127db96d56Sopenharmony_ci If handler is a callable object, it should have a 17137db96d56Sopenharmony_ci signature matching '(loop, context)', where 'loop' 17147db96d56Sopenharmony_ci will be a reference to the active event loop, 'context' 17157db96d56Sopenharmony_ci will be a dict object (see `call_exception_handler()` 17167db96d56Sopenharmony_ci documentation for details about context). 17177db96d56Sopenharmony_ci """ 17187db96d56Sopenharmony_ci if handler is not None and not callable(handler): 17197db96d56Sopenharmony_ci raise TypeError(f'A callable object or None is expected, ' 17207db96d56Sopenharmony_ci f'got {handler!r}') 17217db96d56Sopenharmony_ci self._exception_handler = handler 17227db96d56Sopenharmony_ci 17237db96d56Sopenharmony_ci def default_exception_handler(self, context): 17247db96d56Sopenharmony_ci """Default exception handler. 17257db96d56Sopenharmony_ci 17267db96d56Sopenharmony_ci This is called when an exception occurs and no exception 17277db96d56Sopenharmony_ci handler is set, and can be called by a custom exception 17287db96d56Sopenharmony_ci handler that wants to defer to the default behavior. 17297db96d56Sopenharmony_ci 17307db96d56Sopenharmony_ci This default handler logs the error message and other 17317db96d56Sopenharmony_ci context-dependent information. In debug mode, a truncated 17327db96d56Sopenharmony_ci stack trace is also appended showing where the given object 17337db96d56Sopenharmony_ci (e.g. a handle or future or task) was created, if any. 17347db96d56Sopenharmony_ci 17357db96d56Sopenharmony_ci The context parameter has the same meaning as in 17367db96d56Sopenharmony_ci `call_exception_handler()`. 17377db96d56Sopenharmony_ci """ 17387db96d56Sopenharmony_ci message = context.get('message') 17397db96d56Sopenharmony_ci if not message: 17407db96d56Sopenharmony_ci message = 'Unhandled exception in event loop' 17417db96d56Sopenharmony_ci 17427db96d56Sopenharmony_ci exception = context.get('exception') 17437db96d56Sopenharmony_ci if exception is not None: 17447db96d56Sopenharmony_ci exc_info = (type(exception), exception, exception.__traceback__) 17457db96d56Sopenharmony_ci else: 17467db96d56Sopenharmony_ci exc_info = False 17477db96d56Sopenharmony_ci 17487db96d56Sopenharmony_ci if ('source_traceback' not in context and 17497db96d56Sopenharmony_ci self._current_handle is not None and 17507db96d56Sopenharmony_ci self._current_handle._source_traceback): 17517db96d56Sopenharmony_ci context['handle_traceback'] = \ 17527db96d56Sopenharmony_ci self._current_handle._source_traceback 17537db96d56Sopenharmony_ci 17547db96d56Sopenharmony_ci log_lines = [message] 17557db96d56Sopenharmony_ci for key in sorted(context): 17567db96d56Sopenharmony_ci if key in {'message', 'exception'}: 17577db96d56Sopenharmony_ci continue 17587db96d56Sopenharmony_ci value = context[key] 17597db96d56Sopenharmony_ci if key == 'source_traceback': 17607db96d56Sopenharmony_ci tb = ''.join(traceback.format_list(value)) 17617db96d56Sopenharmony_ci value = 'Object created at (most recent call last):\n' 17627db96d56Sopenharmony_ci value += tb.rstrip() 17637db96d56Sopenharmony_ci elif key == 'handle_traceback': 17647db96d56Sopenharmony_ci tb = ''.join(traceback.format_list(value)) 17657db96d56Sopenharmony_ci value = 'Handle created at (most recent call last):\n' 17667db96d56Sopenharmony_ci value += tb.rstrip() 17677db96d56Sopenharmony_ci else: 17687db96d56Sopenharmony_ci value = repr(value) 17697db96d56Sopenharmony_ci log_lines.append(f'{key}: {value}') 17707db96d56Sopenharmony_ci 17717db96d56Sopenharmony_ci logger.error('\n'.join(log_lines), exc_info=exc_info) 17727db96d56Sopenharmony_ci 17737db96d56Sopenharmony_ci def call_exception_handler(self, context): 17747db96d56Sopenharmony_ci """Call the current event loop's exception handler. 17757db96d56Sopenharmony_ci 17767db96d56Sopenharmony_ci The context argument is a dict containing the following keys: 17777db96d56Sopenharmony_ci 17787db96d56Sopenharmony_ci - 'message': Error message; 17797db96d56Sopenharmony_ci - 'exception' (optional): Exception object; 17807db96d56Sopenharmony_ci - 'future' (optional): Future instance; 17817db96d56Sopenharmony_ci - 'task' (optional): Task instance; 17827db96d56Sopenharmony_ci - 'handle' (optional): Handle instance; 17837db96d56Sopenharmony_ci - 'protocol' (optional): Protocol instance; 17847db96d56Sopenharmony_ci - 'transport' (optional): Transport instance; 17857db96d56Sopenharmony_ci - 'socket' (optional): Socket instance; 17867db96d56Sopenharmony_ci - 'asyncgen' (optional): Asynchronous generator that caused 17877db96d56Sopenharmony_ci the exception. 17887db96d56Sopenharmony_ci 17897db96d56Sopenharmony_ci New keys maybe introduced in the future. 17907db96d56Sopenharmony_ci 17917db96d56Sopenharmony_ci Note: do not overload this method in an event loop subclass. 17927db96d56Sopenharmony_ci For custom exception handling, use the 17937db96d56Sopenharmony_ci `set_exception_handler()` method. 17947db96d56Sopenharmony_ci """ 17957db96d56Sopenharmony_ci if self._exception_handler is None: 17967db96d56Sopenharmony_ci try: 17977db96d56Sopenharmony_ci self.default_exception_handler(context) 17987db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 17997db96d56Sopenharmony_ci raise 18007db96d56Sopenharmony_ci except BaseException: 18017db96d56Sopenharmony_ci # Second protection layer for unexpected errors 18027db96d56Sopenharmony_ci # in the default implementation, as well as for subclassed 18037db96d56Sopenharmony_ci # event loops with overloaded "default_exception_handler". 18047db96d56Sopenharmony_ci logger.error('Exception in default exception handler', 18057db96d56Sopenharmony_ci exc_info=True) 18067db96d56Sopenharmony_ci else: 18077db96d56Sopenharmony_ci try: 18087db96d56Sopenharmony_ci self._exception_handler(self, context) 18097db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 18107db96d56Sopenharmony_ci raise 18117db96d56Sopenharmony_ci except BaseException as exc: 18127db96d56Sopenharmony_ci # Exception in the user set custom exception handler. 18137db96d56Sopenharmony_ci try: 18147db96d56Sopenharmony_ci # Let's try default handler. 18157db96d56Sopenharmony_ci self.default_exception_handler({ 18167db96d56Sopenharmony_ci 'message': 'Unhandled error in exception handler', 18177db96d56Sopenharmony_ci 'exception': exc, 18187db96d56Sopenharmony_ci 'context': context, 18197db96d56Sopenharmony_ci }) 18207db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 18217db96d56Sopenharmony_ci raise 18227db96d56Sopenharmony_ci except BaseException: 18237db96d56Sopenharmony_ci # Guard 'default_exception_handler' in case it is 18247db96d56Sopenharmony_ci # overloaded. 18257db96d56Sopenharmony_ci logger.error('Exception in default exception handler ' 18267db96d56Sopenharmony_ci 'while handling an unexpected error ' 18277db96d56Sopenharmony_ci 'in custom exception handler', 18287db96d56Sopenharmony_ci exc_info=True) 18297db96d56Sopenharmony_ci 18307db96d56Sopenharmony_ci def _add_callback(self, handle): 18317db96d56Sopenharmony_ci """Add a Handle to _ready.""" 18327db96d56Sopenharmony_ci if not handle._cancelled: 18337db96d56Sopenharmony_ci self._ready.append(handle) 18347db96d56Sopenharmony_ci 18357db96d56Sopenharmony_ci def _add_callback_signalsafe(self, handle): 18367db96d56Sopenharmony_ci """Like _add_callback() but called from a signal handler.""" 18377db96d56Sopenharmony_ci self._add_callback(handle) 18387db96d56Sopenharmony_ci self._write_to_self() 18397db96d56Sopenharmony_ci 18407db96d56Sopenharmony_ci def _timer_handle_cancelled(self, handle): 18417db96d56Sopenharmony_ci """Notification that a TimerHandle has been cancelled.""" 18427db96d56Sopenharmony_ci if handle._scheduled: 18437db96d56Sopenharmony_ci self._timer_cancelled_count += 1 18447db96d56Sopenharmony_ci 18457db96d56Sopenharmony_ci def _run_once(self): 18467db96d56Sopenharmony_ci """Run one full iteration of the event loop. 18477db96d56Sopenharmony_ci 18487db96d56Sopenharmony_ci This calls all currently ready callbacks, polls for I/O, 18497db96d56Sopenharmony_ci schedules the resulting callbacks, and finally schedules 18507db96d56Sopenharmony_ci 'call_later' callbacks. 18517db96d56Sopenharmony_ci """ 18527db96d56Sopenharmony_ci 18537db96d56Sopenharmony_ci sched_count = len(self._scheduled) 18547db96d56Sopenharmony_ci if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 18557db96d56Sopenharmony_ci self._timer_cancelled_count / sched_count > 18567db96d56Sopenharmony_ci _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 18577db96d56Sopenharmony_ci # Remove delayed calls that were cancelled if their number 18587db96d56Sopenharmony_ci # is too high 18597db96d56Sopenharmony_ci new_scheduled = [] 18607db96d56Sopenharmony_ci for handle in self._scheduled: 18617db96d56Sopenharmony_ci if handle._cancelled: 18627db96d56Sopenharmony_ci handle._scheduled = False 18637db96d56Sopenharmony_ci else: 18647db96d56Sopenharmony_ci new_scheduled.append(handle) 18657db96d56Sopenharmony_ci 18667db96d56Sopenharmony_ci heapq.heapify(new_scheduled) 18677db96d56Sopenharmony_ci self._scheduled = new_scheduled 18687db96d56Sopenharmony_ci self._timer_cancelled_count = 0 18697db96d56Sopenharmony_ci else: 18707db96d56Sopenharmony_ci # Remove delayed calls that were cancelled from head of queue. 18717db96d56Sopenharmony_ci while self._scheduled and self._scheduled[0]._cancelled: 18727db96d56Sopenharmony_ci self._timer_cancelled_count -= 1 18737db96d56Sopenharmony_ci handle = heapq.heappop(self._scheduled) 18747db96d56Sopenharmony_ci handle._scheduled = False 18757db96d56Sopenharmony_ci 18767db96d56Sopenharmony_ci timeout = None 18777db96d56Sopenharmony_ci if self._ready or self._stopping: 18787db96d56Sopenharmony_ci timeout = 0 18797db96d56Sopenharmony_ci elif self._scheduled: 18807db96d56Sopenharmony_ci # Compute the desired timeout. 18817db96d56Sopenharmony_ci when = self._scheduled[0]._when 18827db96d56Sopenharmony_ci timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) 18837db96d56Sopenharmony_ci 18847db96d56Sopenharmony_ci event_list = self._selector.select(timeout) 18857db96d56Sopenharmony_ci self._process_events(event_list) 18867db96d56Sopenharmony_ci # Needed to break cycles when an exception occurs. 18877db96d56Sopenharmony_ci event_list = None 18887db96d56Sopenharmony_ci 18897db96d56Sopenharmony_ci # Handle 'later' callbacks that are ready. 18907db96d56Sopenharmony_ci end_time = self.time() + self._clock_resolution 18917db96d56Sopenharmony_ci while self._scheduled: 18927db96d56Sopenharmony_ci handle = self._scheduled[0] 18937db96d56Sopenharmony_ci if handle._when >= end_time: 18947db96d56Sopenharmony_ci break 18957db96d56Sopenharmony_ci handle = heapq.heappop(self._scheduled) 18967db96d56Sopenharmony_ci handle._scheduled = False 18977db96d56Sopenharmony_ci self._ready.append(handle) 18987db96d56Sopenharmony_ci 18997db96d56Sopenharmony_ci # This is the only place where callbacks are actually *called*. 19007db96d56Sopenharmony_ci # All other places just add them to ready. 19017db96d56Sopenharmony_ci # Note: We run all currently scheduled callbacks, but not any 19027db96d56Sopenharmony_ci # callbacks scheduled by callbacks run this time around -- 19037db96d56Sopenharmony_ci # they will be run the next time (after another I/O poll). 19047db96d56Sopenharmony_ci # Use an idiom that is thread-safe without using locks. 19057db96d56Sopenharmony_ci ntodo = len(self._ready) 19067db96d56Sopenharmony_ci for i in range(ntodo): 19077db96d56Sopenharmony_ci handle = self._ready.popleft() 19087db96d56Sopenharmony_ci if handle._cancelled: 19097db96d56Sopenharmony_ci continue 19107db96d56Sopenharmony_ci if self._debug: 19117db96d56Sopenharmony_ci try: 19127db96d56Sopenharmony_ci self._current_handle = handle 19137db96d56Sopenharmony_ci t0 = self.time() 19147db96d56Sopenharmony_ci handle._run() 19157db96d56Sopenharmony_ci dt = self.time() - t0 19167db96d56Sopenharmony_ci if dt >= self.slow_callback_duration: 19177db96d56Sopenharmony_ci logger.warning('Executing %s took %.3f seconds', 19187db96d56Sopenharmony_ci _format_handle(handle), dt) 19197db96d56Sopenharmony_ci finally: 19207db96d56Sopenharmony_ci self._current_handle = None 19217db96d56Sopenharmony_ci else: 19227db96d56Sopenharmony_ci handle._run() 19237db96d56Sopenharmony_ci handle = None # Needed to break cycles when an exception occurs. 19247db96d56Sopenharmony_ci 19257db96d56Sopenharmony_ci def _set_coroutine_origin_tracking(self, enabled): 19267db96d56Sopenharmony_ci if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 19277db96d56Sopenharmony_ci return 19287db96d56Sopenharmony_ci 19297db96d56Sopenharmony_ci if enabled: 19307db96d56Sopenharmony_ci self._coroutine_origin_tracking_saved_depth = ( 19317db96d56Sopenharmony_ci sys.get_coroutine_origin_tracking_depth()) 19327db96d56Sopenharmony_ci sys.set_coroutine_origin_tracking_depth( 19337db96d56Sopenharmony_ci constants.DEBUG_STACK_DEPTH) 19347db96d56Sopenharmony_ci else: 19357db96d56Sopenharmony_ci sys.set_coroutine_origin_tracking_depth( 19367db96d56Sopenharmony_ci self._coroutine_origin_tracking_saved_depth) 19377db96d56Sopenharmony_ci 19387db96d56Sopenharmony_ci self._coroutine_origin_tracking_enabled = enabled 19397db96d56Sopenharmony_ci 19407db96d56Sopenharmony_ci def get_debug(self): 19417db96d56Sopenharmony_ci return self._debug 19427db96d56Sopenharmony_ci 19437db96d56Sopenharmony_ci def set_debug(self, enabled): 19447db96d56Sopenharmony_ci self._debug = enabled 19457db96d56Sopenharmony_ci 19467db96d56Sopenharmony_ci if self.is_running(): 19477db96d56Sopenharmony_ci self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled) 1948