17db96d56Sopenharmony_ci"""Selector and proactor event loops for Windows.""" 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport sys 47db96d56Sopenharmony_ci 57db96d56Sopenharmony_ciif sys.platform != 'win32': # pragma: no cover 67db96d56Sopenharmony_ci raise ImportError('win32 only') 77db96d56Sopenharmony_ci 87db96d56Sopenharmony_ciimport _overlapped 97db96d56Sopenharmony_ciimport _winapi 107db96d56Sopenharmony_ciimport errno 117db96d56Sopenharmony_ciimport math 127db96d56Sopenharmony_ciimport msvcrt 137db96d56Sopenharmony_ciimport socket 147db96d56Sopenharmony_ciimport struct 157db96d56Sopenharmony_ciimport time 167db96d56Sopenharmony_ciimport weakref 177db96d56Sopenharmony_ci 187db96d56Sopenharmony_cifrom . import events 197db96d56Sopenharmony_cifrom . import base_subprocess 207db96d56Sopenharmony_cifrom . import futures 217db96d56Sopenharmony_cifrom . import exceptions 227db96d56Sopenharmony_cifrom . import proactor_events 237db96d56Sopenharmony_cifrom . import selector_events 247db96d56Sopenharmony_cifrom . import tasks 257db96d56Sopenharmony_cifrom . import windows_utils 267db96d56Sopenharmony_cifrom .log import logger 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ci 297db96d56Sopenharmony_ci__all__ = ( 307db96d56Sopenharmony_ci 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', 317db96d56Sopenharmony_ci 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy', 327db96d56Sopenharmony_ci 'WindowsProactorEventLoopPolicy', 337db96d56Sopenharmony_ci) 347db96d56Sopenharmony_ci 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_ciNULL = _winapi.NULL 377db96d56Sopenharmony_ciINFINITE = _winapi.INFINITE 387db96d56Sopenharmony_ciERROR_CONNECTION_REFUSED = 1225 397db96d56Sopenharmony_ciERROR_CONNECTION_ABORTED = 1236 407db96d56Sopenharmony_ci 417db96d56Sopenharmony_ci# Initial delay in seconds for connect_pipe() before retrying to connect 427db96d56Sopenharmony_ciCONNECT_PIPE_INIT_DELAY = 0.001 437db96d56Sopenharmony_ci 447db96d56Sopenharmony_ci# Maximum delay in seconds for connect_pipe() before retrying to connect 457db96d56Sopenharmony_ciCONNECT_PIPE_MAX_DELAY = 0.100 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_ci 487db96d56Sopenharmony_ciclass _OverlappedFuture(futures.Future): 497db96d56Sopenharmony_ci """Subclass of Future which represents an overlapped operation. 507db96d56Sopenharmony_ci 517db96d56Sopenharmony_ci Cancelling it will immediately cancel the overlapped operation. 527db96d56Sopenharmony_ci """ 537db96d56Sopenharmony_ci 547db96d56Sopenharmony_ci def __init__(self, ov, *, loop=None): 557db96d56Sopenharmony_ci super().__init__(loop=loop) 567db96d56Sopenharmony_ci if self._source_traceback: 577db96d56Sopenharmony_ci del self._source_traceback[-1] 587db96d56Sopenharmony_ci self._ov = ov 597db96d56Sopenharmony_ci 607db96d56Sopenharmony_ci def _repr_info(self): 617db96d56Sopenharmony_ci info = super()._repr_info() 627db96d56Sopenharmony_ci if self._ov is not None: 637db96d56Sopenharmony_ci state = 'pending' if self._ov.pending else 'completed' 647db96d56Sopenharmony_ci info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>') 657db96d56Sopenharmony_ci return info 667db96d56Sopenharmony_ci 677db96d56Sopenharmony_ci def _cancel_overlapped(self): 687db96d56Sopenharmony_ci if self._ov is None: 697db96d56Sopenharmony_ci return 707db96d56Sopenharmony_ci try: 717db96d56Sopenharmony_ci self._ov.cancel() 727db96d56Sopenharmony_ci except OSError as exc: 737db96d56Sopenharmony_ci context = { 747db96d56Sopenharmony_ci 'message': 'Cancelling an overlapped future failed', 757db96d56Sopenharmony_ci 'exception': exc, 767db96d56Sopenharmony_ci 'future': self, 777db96d56Sopenharmony_ci } 787db96d56Sopenharmony_ci if self._source_traceback: 797db96d56Sopenharmony_ci context['source_traceback'] = self._source_traceback 807db96d56Sopenharmony_ci self._loop.call_exception_handler(context) 817db96d56Sopenharmony_ci self._ov = None 827db96d56Sopenharmony_ci 837db96d56Sopenharmony_ci def cancel(self, msg=None): 847db96d56Sopenharmony_ci self._cancel_overlapped() 857db96d56Sopenharmony_ci return super().cancel(msg=msg) 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ci def set_exception(self, exception): 887db96d56Sopenharmony_ci super().set_exception(exception) 897db96d56Sopenharmony_ci self._cancel_overlapped() 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci def set_result(self, result): 927db96d56Sopenharmony_ci super().set_result(result) 937db96d56Sopenharmony_ci self._ov = None 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_ciclass _BaseWaitHandleFuture(futures.Future): 977db96d56Sopenharmony_ci """Subclass of Future which represents a wait handle.""" 987db96d56Sopenharmony_ci 997db96d56Sopenharmony_ci def __init__(self, ov, handle, wait_handle, *, loop=None): 1007db96d56Sopenharmony_ci super().__init__(loop=loop) 1017db96d56Sopenharmony_ci if self._source_traceback: 1027db96d56Sopenharmony_ci del self._source_traceback[-1] 1037db96d56Sopenharmony_ci # Keep a reference to the Overlapped object to keep it alive until the 1047db96d56Sopenharmony_ci # wait is unregistered 1057db96d56Sopenharmony_ci self._ov = ov 1067db96d56Sopenharmony_ci self._handle = handle 1077db96d56Sopenharmony_ci self._wait_handle = wait_handle 1087db96d56Sopenharmony_ci 1097db96d56Sopenharmony_ci # Should we call UnregisterWaitEx() if the wait completes 1107db96d56Sopenharmony_ci # or is cancelled? 1117db96d56Sopenharmony_ci self._registered = True 1127db96d56Sopenharmony_ci 1137db96d56Sopenharmony_ci def _poll(self): 1147db96d56Sopenharmony_ci # non-blocking wait: use a timeout of 0 millisecond 1157db96d56Sopenharmony_ci return (_winapi.WaitForSingleObject(self._handle, 0) == 1167db96d56Sopenharmony_ci _winapi.WAIT_OBJECT_0) 1177db96d56Sopenharmony_ci 1187db96d56Sopenharmony_ci def _repr_info(self): 1197db96d56Sopenharmony_ci info = super()._repr_info() 1207db96d56Sopenharmony_ci info.append(f'handle={self._handle:#x}') 1217db96d56Sopenharmony_ci if self._handle is not None: 1227db96d56Sopenharmony_ci state = 'signaled' if self._poll() else 'waiting' 1237db96d56Sopenharmony_ci info.append(state) 1247db96d56Sopenharmony_ci if self._wait_handle is not None: 1257db96d56Sopenharmony_ci info.append(f'wait_handle={self._wait_handle:#x}') 1267db96d56Sopenharmony_ci return info 1277db96d56Sopenharmony_ci 1287db96d56Sopenharmony_ci def _unregister_wait_cb(self, fut): 1297db96d56Sopenharmony_ci # The wait was unregistered: it's not safe to destroy the Overlapped 1307db96d56Sopenharmony_ci # object 1317db96d56Sopenharmony_ci self._ov = None 1327db96d56Sopenharmony_ci 1337db96d56Sopenharmony_ci def _unregister_wait(self): 1347db96d56Sopenharmony_ci if not self._registered: 1357db96d56Sopenharmony_ci return 1367db96d56Sopenharmony_ci self._registered = False 1377db96d56Sopenharmony_ci 1387db96d56Sopenharmony_ci wait_handle = self._wait_handle 1397db96d56Sopenharmony_ci self._wait_handle = None 1407db96d56Sopenharmony_ci try: 1417db96d56Sopenharmony_ci _overlapped.UnregisterWait(wait_handle) 1427db96d56Sopenharmony_ci except OSError as exc: 1437db96d56Sopenharmony_ci if exc.winerror != _overlapped.ERROR_IO_PENDING: 1447db96d56Sopenharmony_ci context = { 1457db96d56Sopenharmony_ci 'message': 'Failed to unregister the wait handle', 1467db96d56Sopenharmony_ci 'exception': exc, 1477db96d56Sopenharmony_ci 'future': self, 1487db96d56Sopenharmony_ci } 1497db96d56Sopenharmony_ci if self._source_traceback: 1507db96d56Sopenharmony_ci context['source_traceback'] = self._source_traceback 1517db96d56Sopenharmony_ci self._loop.call_exception_handler(context) 1527db96d56Sopenharmony_ci return 1537db96d56Sopenharmony_ci # ERROR_IO_PENDING means that the unregister is pending 1547db96d56Sopenharmony_ci 1557db96d56Sopenharmony_ci self._unregister_wait_cb(None) 1567db96d56Sopenharmony_ci 1577db96d56Sopenharmony_ci def cancel(self, msg=None): 1587db96d56Sopenharmony_ci self._unregister_wait() 1597db96d56Sopenharmony_ci return super().cancel(msg=msg) 1607db96d56Sopenharmony_ci 1617db96d56Sopenharmony_ci def set_exception(self, exception): 1627db96d56Sopenharmony_ci self._unregister_wait() 1637db96d56Sopenharmony_ci super().set_exception(exception) 1647db96d56Sopenharmony_ci 1657db96d56Sopenharmony_ci def set_result(self, result): 1667db96d56Sopenharmony_ci self._unregister_wait() 1677db96d56Sopenharmony_ci super().set_result(result) 1687db96d56Sopenharmony_ci 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ciclass _WaitCancelFuture(_BaseWaitHandleFuture): 1717db96d56Sopenharmony_ci """Subclass of Future which represents a wait for the cancellation of a 1727db96d56Sopenharmony_ci _WaitHandleFuture using an event. 1737db96d56Sopenharmony_ci """ 1747db96d56Sopenharmony_ci 1757db96d56Sopenharmony_ci def __init__(self, ov, event, wait_handle, *, loop=None): 1767db96d56Sopenharmony_ci super().__init__(ov, event, wait_handle, loop=loop) 1777db96d56Sopenharmony_ci 1787db96d56Sopenharmony_ci self._done_callback = None 1797db96d56Sopenharmony_ci 1807db96d56Sopenharmony_ci def cancel(self): 1817db96d56Sopenharmony_ci raise RuntimeError("_WaitCancelFuture must not be cancelled") 1827db96d56Sopenharmony_ci 1837db96d56Sopenharmony_ci def set_result(self, result): 1847db96d56Sopenharmony_ci super().set_result(result) 1857db96d56Sopenharmony_ci if self._done_callback is not None: 1867db96d56Sopenharmony_ci self._done_callback(self) 1877db96d56Sopenharmony_ci 1887db96d56Sopenharmony_ci def set_exception(self, exception): 1897db96d56Sopenharmony_ci super().set_exception(exception) 1907db96d56Sopenharmony_ci if self._done_callback is not None: 1917db96d56Sopenharmony_ci self._done_callback(self) 1927db96d56Sopenharmony_ci 1937db96d56Sopenharmony_ci 1947db96d56Sopenharmony_ciclass _WaitHandleFuture(_BaseWaitHandleFuture): 1957db96d56Sopenharmony_ci def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): 1967db96d56Sopenharmony_ci super().__init__(ov, handle, wait_handle, loop=loop) 1977db96d56Sopenharmony_ci self._proactor = proactor 1987db96d56Sopenharmony_ci self._unregister_proactor = True 1997db96d56Sopenharmony_ci self._event = _overlapped.CreateEvent(None, True, False, None) 2007db96d56Sopenharmony_ci self._event_fut = None 2017db96d56Sopenharmony_ci 2027db96d56Sopenharmony_ci def _unregister_wait_cb(self, fut): 2037db96d56Sopenharmony_ci if self._event is not None: 2047db96d56Sopenharmony_ci _winapi.CloseHandle(self._event) 2057db96d56Sopenharmony_ci self._event = None 2067db96d56Sopenharmony_ci self._event_fut = None 2077db96d56Sopenharmony_ci 2087db96d56Sopenharmony_ci # If the wait was cancelled, the wait may never be signalled, so 2097db96d56Sopenharmony_ci # it's required to unregister it. Otherwise, IocpProactor.close() will 2107db96d56Sopenharmony_ci # wait forever for an event which will never come. 2117db96d56Sopenharmony_ci # 2127db96d56Sopenharmony_ci # If the IocpProactor already received the event, it's safe to call 2137db96d56Sopenharmony_ci # _unregister() because we kept a reference to the Overlapped object 2147db96d56Sopenharmony_ci # which is used as a unique key. 2157db96d56Sopenharmony_ci self._proactor._unregister(self._ov) 2167db96d56Sopenharmony_ci self._proactor = None 2177db96d56Sopenharmony_ci 2187db96d56Sopenharmony_ci super()._unregister_wait_cb(fut) 2197db96d56Sopenharmony_ci 2207db96d56Sopenharmony_ci def _unregister_wait(self): 2217db96d56Sopenharmony_ci if not self._registered: 2227db96d56Sopenharmony_ci return 2237db96d56Sopenharmony_ci self._registered = False 2247db96d56Sopenharmony_ci 2257db96d56Sopenharmony_ci wait_handle = self._wait_handle 2267db96d56Sopenharmony_ci self._wait_handle = None 2277db96d56Sopenharmony_ci try: 2287db96d56Sopenharmony_ci _overlapped.UnregisterWaitEx(wait_handle, self._event) 2297db96d56Sopenharmony_ci except OSError as exc: 2307db96d56Sopenharmony_ci if exc.winerror != _overlapped.ERROR_IO_PENDING: 2317db96d56Sopenharmony_ci context = { 2327db96d56Sopenharmony_ci 'message': 'Failed to unregister the wait handle', 2337db96d56Sopenharmony_ci 'exception': exc, 2347db96d56Sopenharmony_ci 'future': self, 2357db96d56Sopenharmony_ci } 2367db96d56Sopenharmony_ci if self._source_traceback: 2377db96d56Sopenharmony_ci context['source_traceback'] = self._source_traceback 2387db96d56Sopenharmony_ci self._loop.call_exception_handler(context) 2397db96d56Sopenharmony_ci return 2407db96d56Sopenharmony_ci # ERROR_IO_PENDING is not an error, the wait was unregistered 2417db96d56Sopenharmony_ci 2427db96d56Sopenharmony_ci self._event_fut = self._proactor._wait_cancel(self._event, 2437db96d56Sopenharmony_ci self._unregister_wait_cb) 2447db96d56Sopenharmony_ci 2457db96d56Sopenharmony_ci 2467db96d56Sopenharmony_ciclass PipeServer(object): 2477db96d56Sopenharmony_ci """Class representing a pipe server. 2487db96d56Sopenharmony_ci 2497db96d56Sopenharmony_ci This is much like a bound, listening socket. 2507db96d56Sopenharmony_ci """ 2517db96d56Sopenharmony_ci def __init__(self, address): 2527db96d56Sopenharmony_ci self._address = address 2537db96d56Sopenharmony_ci self._free_instances = weakref.WeakSet() 2547db96d56Sopenharmony_ci # initialize the pipe attribute before calling _server_pipe_handle() 2557db96d56Sopenharmony_ci # because this function can raise an exception and the destructor calls 2567db96d56Sopenharmony_ci # the close() method 2577db96d56Sopenharmony_ci self._pipe = None 2587db96d56Sopenharmony_ci self._accept_pipe_future = None 2597db96d56Sopenharmony_ci self._pipe = self._server_pipe_handle(True) 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ci def _get_unconnected_pipe(self): 2627db96d56Sopenharmony_ci # Create new instance and return previous one. This ensures 2637db96d56Sopenharmony_ci # that (until the server is closed) there is always at least 2647db96d56Sopenharmony_ci # one pipe handle for address. Therefore if a client attempt 2657db96d56Sopenharmony_ci # to connect it will not fail with FileNotFoundError. 2667db96d56Sopenharmony_ci tmp, self._pipe = self._pipe, self._server_pipe_handle(False) 2677db96d56Sopenharmony_ci return tmp 2687db96d56Sopenharmony_ci 2697db96d56Sopenharmony_ci def _server_pipe_handle(self, first): 2707db96d56Sopenharmony_ci # Return a wrapper for a new pipe handle. 2717db96d56Sopenharmony_ci if self.closed(): 2727db96d56Sopenharmony_ci return None 2737db96d56Sopenharmony_ci flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 2747db96d56Sopenharmony_ci if first: 2757db96d56Sopenharmony_ci flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 2767db96d56Sopenharmony_ci h = _winapi.CreateNamedPipe( 2777db96d56Sopenharmony_ci self._address, flags, 2787db96d56Sopenharmony_ci _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 2797db96d56Sopenharmony_ci _winapi.PIPE_WAIT, 2807db96d56Sopenharmony_ci _winapi.PIPE_UNLIMITED_INSTANCES, 2817db96d56Sopenharmony_ci windows_utils.BUFSIZE, windows_utils.BUFSIZE, 2827db96d56Sopenharmony_ci _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 2837db96d56Sopenharmony_ci pipe = windows_utils.PipeHandle(h) 2847db96d56Sopenharmony_ci self._free_instances.add(pipe) 2857db96d56Sopenharmony_ci return pipe 2867db96d56Sopenharmony_ci 2877db96d56Sopenharmony_ci def closed(self): 2887db96d56Sopenharmony_ci return (self._address is None) 2897db96d56Sopenharmony_ci 2907db96d56Sopenharmony_ci def close(self): 2917db96d56Sopenharmony_ci if self._accept_pipe_future is not None: 2927db96d56Sopenharmony_ci self._accept_pipe_future.cancel() 2937db96d56Sopenharmony_ci self._accept_pipe_future = None 2947db96d56Sopenharmony_ci # Close all instances which have not been connected to by a client. 2957db96d56Sopenharmony_ci if self._address is not None: 2967db96d56Sopenharmony_ci for pipe in self._free_instances: 2977db96d56Sopenharmony_ci pipe.close() 2987db96d56Sopenharmony_ci self._pipe = None 2997db96d56Sopenharmony_ci self._address = None 3007db96d56Sopenharmony_ci self._free_instances.clear() 3017db96d56Sopenharmony_ci 3027db96d56Sopenharmony_ci __del__ = close 3037db96d56Sopenharmony_ci 3047db96d56Sopenharmony_ci 3057db96d56Sopenharmony_ciclass _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): 3067db96d56Sopenharmony_ci """Windows version of selector event loop.""" 3077db96d56Sopenharmony_ci 3087db96d56Sopenharmony_ci 3097db96d56Sopenharmony_ciclass ProactorEventLoop(proactor_events.BaseProactorEventLoop): 3107db96d56Sopenharmony_ci """Windows version of proactor event loop using IOCP.""" 3117db96d56Sopenharmony_ci 3127db96d56Sopenharmony_ci def __init__(self, proactor=None): 3137db96d56Sopenharmony_ci if proactor is None: 3147db96d56Sopenharmony_ci proactor = IocpProactor() 3157db96d56Sopenharmony_ci super().__init__(proactor) 3167db96d56Sopenharmony_ci 3177db96d56Sopenharmony_ci def run_forever(self): 3187db96d56Sopenharmony_ci try: 3197db96d56Sopenharmony_ci assert self._self_reading_future is None 3207db96d56Sopenharmony_ci self.call_soon(self._loop_self_reading) 3217db96d56Sopenharmony_ci super().run_forever() 3227db96d56Sopenharmony_ci finally: 3237db96d56Sopenharmony_ci if self._self_reading_future is not None: 3247db96d56Sopenharmony_ci ov = self._self_reading_future._ov 3257db96d56Sopenharmony_ci self._self_reading_future.cancel() 3267db96d56Sopenharmony_ci # self_reading_future was just cancelled so if it hasn't been 3277db96d56Sopenharmony_ci # finished yet, it never will be (it's possible that it has 3287db96d56Sopenharmony_ci # already finished and its callback is waiting in the queue, 3297db96d56Sopenharmony_ci # where it could still happen if the event loop is restarted). 3307db96d56Sopenharmony_ci # Unregister it otherwise IocpProactor.close will wait for it 3317db96d56Sopenharmony_ci # forever 3327db96d56Sopenharmony_ci if ov is not None: 3337db96d56Sopenharmony_ci self._proactor._unregister(ov) 3347db96d56Sopenharmony_ci self._self_reading_future = None 3357db96d56Sopenharmony_ci 3367db96d56Sopenharmony_ci async def create_pipe_connection(self, protocol_factory, address): 3377db96d56Sopenharmony_ci f = self._proactor.connect_pipe(address) 3387db96d56Sopenharmony_ci pipe = await f 3397db96d56Sopenharmony_ci protocol = protocol_factory() 3407db96d56Sopenharmony_ci trans = self._make_duplex_pipe_transport(pipe, protocol, 3417db96d56Sopenharmony_ci extra={'addr': address}) 3427db96d56Sopenharmony_ci return trans, protocol 3437db96d56Sopenharmony_ci 3447db96d56Sopenharmony_ci async def start_serving_pipe(self, protocol_factory, address): 3457db96d56Sopenharmony_ci server = PipeServer(address) 3467db96d56Sopenharmony_ci 3477db96d56Sopenharmony_ci def loop_accept_pipe(f=None): 3487db96d56Sopenharmony_ci pipe = None 3497db96d56Sopenharmony_ci try: 3507db96d56Sopenharmony_ci if f: 3517db96d56Sopenharmony_ci pipe = f.result() 3527db96d56Sopenharmony_ci server._free_instances.discard(pipe) 3537db96d56Sopenharmony_ci 3547db96d56Sopenharmony_ci if server.closed(): 3557db96d56Sopenharmony_ci # A client connected before the server was closed: 3567db96d56Sopenharmony_ci # drop the client (close the pipe) and exit 3577db96d56Sopenharmony_ci pipe.close() 3587db96d56Sopenharmony_ci return 3597db96d56Sopenharmony_ci 3607db96d56Sopenharmony_ci protocol = protocol_factory() 3617db96d56Sopenharmony_ci self._make_duplex_pipe_transport( 3627db96d56Sopenharmony_ci pipe, protocol, extra={'addr': address}) 3637db96d56Sopenharmony_ci 3647db96d56Sopenharmony_ci pipe = server._get_unconnected_pipe() 3657db96d56Sopenharmony_ci if pipe is None: 3667db96d56Sopenharmony_ci return 3677db96d56Sopenharmony_ci 3687db96d56Sopenharmony_ci f = self._proactor.accept_pipe(pipe) 3697db96d56Sopenharmony_ci except BrokenPipeError: 3707db96d56Sopenharmony_ci if pipe and pipe.fileno() != -1: 3717db96d56Sopenharmony_ci pipe.close() 3727db96d56Sopenharmony_ci self.call_soon(loop_accept_pipe) 3737db96d56Sopenharmony_ci except OSError as exc: 3747db96d56Sopenharmony_ci if pipe and pipe.fileno() != -1: 3757db96d56Sopenharmony_ci self.call_exception_handler({ 3767db96d56Sopenharmony_ci 'message': 'Pipe accept failed', 3777db96d56Sopenharmony_ci 'exception': exc, 3787db96d56Sopenharmony_ci 'pipe': pipe, 3797db96d56Sopenharmony_ci }) 3807db96d56Sopenharmony_ci pipe.close() 3817db96d56Sopenharmony_ci elif self._debug: 3827db96d56Sopenharmony_ci logger.warning("Accept pipe failed on pipe %r", 3837db96d56Sopenharmony_ci pipe, exc_info=True) 3847db96d56Sopenharmony_ci self.call_soon(loop_accept_pipe) 3857db96d56Sopenharmony_ci except exceptions.CancelledError: 3867db96d56Sopenharmony_ci if pipe: 3877db96d56Sopenharmony_ci pipe.close() 3887db96d56Sopenharmony_ci else: 3897db96d56Sopenharmony_ci server._accept_pipe_future = f 3907db96d56Sopenharmony_ci f.add_done_callback(loop_accept_pipe) 3917db96d56Sopenharmony_ci 3927db96d56Sopenharmony_ci self.call_soon(loop_accept_pipe) 3937db96d56Sopenharmony_ci return [server] 3947db96d56Sopenharmony_ci 3957db96d56Sopenharmony_ci async def _make_subprocess_transport(self, protocol, args, shell, 3967db96d56Sopenharmony_ci stdin, stdout, stderr, bufsize, 3977db96d56Sopenharmony_ci extra=None, **kwargs): 3987db96d56Sopenharmony_ci waiter = self.create_future() 3997db96d56Sopenharmony_ci transp = _WindowsSubprocessTransport(self, protocol, args, shell, 4007db96d56Sopenharmony_ci stdin, stdout, stderr, bufsize, 4017db96d56Sopenharmony_ci waiter=waiter, extra=extra, 4027db96d56Sopenharmony_ci **kwargs) 4037db96d56Sopenharmony_ci try: 4047db96d56Sopenharmony_ci await waiter 4057db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 4067db96d56Sopenharmony_ci raise 4077db96d56Sopenharmony_ci except BaseException: 4087db96d56Sopenharmony_ci transp.close() 4097db96d56Sopenharmony_ci await transp._wait() 4107db96d56Sopenharmony_ci raise 4117db96d56Sopenharmony_ci 4127db96d56Sopenharmony_ci return transp 4137db96d56Sopenharmony_ci 4147db96d56Sopenharmony_ci 4157db96d56Sopenharmony_ciclass IocpProactor: 4167db96d56Sopenharmony_ci """Proactor implementation using IOCP.""" 4177db96d56Sopenharmony_ci 4187db96d56Sopenharmony_ci def __init__(self, concurrency=INFINITE): 4197db96d56Sopenharmony_ci self._loop = None 4207db96d56Sopenharmony_ci self._results = [] 4217db96d56Sopenharmony_ci self._iocp = _overlapped.CreateIoCompletionPort( 4227db96d56Sopenharmony_ci _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) 4237db96d56Sopenharmony_ci self._cache = {} 4247db96d56Sopenharmony_ci self._registered = weakref.WeakSet() 4257db96d56Sopenharmony_ci self._unregistered = [] 4267db96d56Sopenharmony_ci self._stopped_serving = weakref.WeakSet() 4277db96d56Sopenharmony_ci 4287db96d56Sopenharmony_ci def _check_closed(self): 4297db96d56Sopenharmony_ci if self._iocp is None: 4307db96d56Sopenharmony_ci raise RuntimeError('IocpProactor is closed') 4317db96d56Sopenharmony_ci 4327db96d56Sopenharmony_ci def __repr__(self): 4337db96d56Sopenharmony_ci info = ['overlapped#=%s' % len(self._cache), 4347db96d56Sopenharmony_ci 'result#=%s' % len(self._results)] 4357db96d56Sopenharmony_ci if self._iocp is None: 4367db96d56Sopenharmony_ci info.append('closed') 4377db96d56Sopenharmony_ci return '<%s %s>' % (self.__class__.__name__, " ".join(info)) 4387db96d56Sopenharmony_ci 4397db96d56Sopenharmony_ci def set_loop(self, loop): 4407db96d56Sopenharmony_ci self._loop = loop 4417db96d56Sopenharmony_ci 4427db96d56Sopenharmony_ci def select(self, timeout=None): 4437db96d56Sopenharmony_ci if not self._results: 4447db96d56Sopenharmony_ci self._poll(timeout) 4457db96d56Sopenharmony_ci tmp = self._results 4467db96d56Sopenharmony_ci self._results = [] 4477db96d56Sopenharmony_ci try: 4487db96d56Sopenharmony_ci return tmp 4497db96d56Sopenharmony_ci finally: 4507db96d56Sopenharmony_ci # Needed to break cycles when an exception occurs. 4517db96d56Sopenharmony_ci tmp = None 4527db96d56Sopenharmony_ci 4537db96d56Sopenharmony_ci def _result(self, value): 4547db96d56Sopenharmony_ci fut = self._loop.create_future() 4557db96d56Sopenharmony_ci fut.set_result(value) 4567db96d56Sopenharmony_ci return fut 4577db96d56Sopenharmony_ci 4587db96d56Sopenharmony_ci def recv(self, conn, nbytes, flags=0): 4597db96d56Sopenharmony_ci self._register_with_iocp(conn) 4607db96d56Sopenharmony_ci ov = _overlapped.Overlapped(NULL) 4617db96d56Sopenharmony_ci try: 4627db96d56Sopenharmony_ci if isinstance(conn, socket.socket): 4637db96d56Sopenharmony_ci ov.WSARecv(conn.fileno(), nbytes, flags) 4647db96d56Sopenharmony_ci else: 4657db96d56Sopenharmony_ci ov.ReadFile(conn.fileno(), nbytes) 4667db96d56Sopenharmony_ci except BrokenPipeError: 4677db96d56Sopenharmony_ci return self._result(b'') 4687db96d56Sopenharmony_ci 4697db96d56Sopenharmony_ci def finish_recv(trans, key, ov): 4707db96d56Sopenharmony_ci try: 4717db96d56Sopenharmony_ci return ov.getresult() 4727db96d56Sopenharmony_ci except OSError as exc: 4737db96d56Sopenharmony_ci if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 4747db96d56Sopenharmony_ci _overlapped.ERROR_OPERATION_ABORTED): 4757db96d56Sopenharmony_ci raise ConnectionResetError(*exc.args) 4767db96d56Sopenharmony_ci else: 4777db96d56Sopenharmony_ci raise 4787db96d56Sopenharmony_ci 4797db96d56Sopenharmony_ci return self._register(ov, conn, finish_recv) 4807db96d56Sopenharmony_ci 4817db96d56Sopenharmony_ci def recv_into(self, conn, buf, flags=0): 4827db96d56Sopenharmony_ci self._register_with_iocp(conn) 4837db96d56Sopenharmony_ci ov = _overlapped.Overlapped(NULL) 4847db96d56Sopenharmony_ci try: 4857db96d56Sopenharmony_ci if isinstance(conn, socket.socket): 4867db96d56Sopenharmony_ci ov.WSARecvInto(conn.fileno(), buf, flags) 4877db96d56Sopenharmony_ci else: 4887db96d56Sopenharmony_ci ov.ReadFileInto(conn.fileno(), buf) 4897db96d56Sopenharmony_ci except BrokenPipeError: 4907db96d56Sopenharmony_ci return self._result(0) 4917db96d56Sopenharmony_ci 4927db96d56Sopenharmony_ci def finish_recv(trans, key, ov): 4937db96d56Sopenharmony_ci try: 4947db96d56Sopenharmony_ci return ov.getresult() 4957db96d56Sopenharmony_ci except OSError as exc: 4967db96d56Sopenharmony_ci if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 4977db96d56Sopenharmony_ci _overlapped.ERROR_OPERATION_ABORTED): 4987db96d56Sopenharmony_ci raise ConnectionResetError(*exc.args) 4997db96d56Sopenharmony_ci else: 5007db96d56Sopenharmony_ci raise 5017db96d56Sopenharmony_ci 5027db96d56Sopenharmony_ci return self._register(ov, conn, finish_recv) 5037db96d56Sopenharmony_ci 5047db96d56Sopenharmony_ci def recvfrom(self, conn, nbytes, flags=0): 5057db96d56Sopenharmony_ci self._register_with_iocp(conn) 5067db96d56Sopenharmony_ci ov = _overlapped.Overlapped(NULL) 5077db96d56Sopenharmony_ci try: 5087db96d56Sopenharmony_ci ov.WSARecvFrom(conn.fileno(), nbytes, flags) 5097db96d56Sopenharmony_ci except BrokenPipeError: 5107db96d56Sopenharmony_ci return self._result((b'', None)) 5117db96d56Sopenharmony_ci 5127db96d56Sopenharmony_ci def finish_recv(trans, key, ov): 5137db96d56Sopenharmony_ci try: 5147db96d56Sopenharmony_ci return ov.getresult() 5157db96d56Sopenharmony_ci except OSError as exc: 5167db96d56Sopenharmony_ci if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 5177db96d56Sopenharmony_ci _overlapped.ERROR_OPERATION_ABORTED): 5187db96d56Sopenharmony_ci raise ConnectionResetError(*exc.args) 5197db96d56Sopenharmony_ci else: 5207db96d56Sopenharmony_ci raise 5217db96d56Sopenharmony_ci 5227db96d56Sopenharmony_ci return self._register(ov, conn, finish_recv) 5237db96d56Sopenharmony_ci 5247db96d56Sopenharmony_ci def recvfrom_into(self, conn, buf, flags=0): 5257db96d56Sopenharmony_ci self._register_with_iocp(conn) 5267db96d56Sopenharmony_ci ov = _overlapped.Overlapped(NULL) 5277db96d56Sopenharmony_ci try: 5287db96d56Sopenharmony_ci ov.WSARecvFromInto(conn.fileno(), buf, flags) 5297db96d56Sopenharmony_ci except BrokenPipeError: 5307db96d56Sopenharmony_ci return self._result((0, None)) 5317db96d56Sopenharmony_ci 5327db96d56Sopenharmony_ci def finish_recv(trans, key, ov): 5337db96d56Sopenharmony_ci try: 5347db96d56Sopenharmony_ci return ov.getresult() 5357db96d56Sopenharmony_ci except OSError as exc: 5367db96d56Sopenharmony_ci if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 5377db96d56Sopenharmony_ci _overlapped.ERROR_OPERATION_ABORTED): 5387db96d56Sopenharmony_ci raise ConnectionResetError(*exc.args) 5397db96d56Sopenharmony_ci else: 5407db96d56Sopenharmony_ci raise 5417db96d56Sopenharmony_ci 5427db96d56Sopenharmony_ci return self._register(ov, conn, finish_recv) 5437db96d56Sopenharmony_ci 5447db96d56Sopenharmony_ci def sendto(self, conn, buf, flags=0, addr=None): 5457db96d56Sopenharmony_ci self._register_with_iocp(conn) 5467db96d56Sopenharmony_ci ov = _overlapped.Overlapped(NULL) 5477db96d56Sopenharmony_ci 5487db96d56Sopenharmony_ci ov.WSASendTo(conn.fileno(), buf, flags, addr) 5497db96d56Sopenharmony_ci 5507db96d56Sopenharmony_ci def finish_send(trans, key, ov): 5517db96d56Sopenharmony_ci try: 5527db96d56Sopenharmony_ci return ov.getresult() 5537db96d56Sopenharmony_ci except OSError as exc: 5547db96d56Sopenharmony_ci if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 5557db96d56Sopenharmony_ci _overlapped.ERROR_OPERATION_ABORTED): 5567db96d56Sopenharmony_ci raise ConnectionResetError(*exc.args) 5577db96d56Sopenharmony_ci else: 5587db96d56Sopenharmony_ci raise 5597db96d56Sopenharmony_ci 5607db96d56Sopenharmony_ci return self._register(ov, conn, finish_send) 5617db96d56Sopenharmony_ci 5627db96d56Sopenharmony_ci def send(self, conn, buf, flags=0): 5637db96d56Sopenharmony_ci self._register_with_iocp(conn) 5647db96d56Sopenharmony_ci ov = _overlapped.Overlapped(NULL) 5657db96d56Sopenharmony_ci if isinstance(conn, socket.socket): 5667db96d56Sopenharmony_ci ov.WSASend(conn.fileno(), buf, flags) 5677db96d56Sopenharmony_ci else: 5687db96d56Sopenharmony_ci ov.WriteFile(conn.fileno(), buf) 5697db96d56Sopenharmony_ci 5707db96d56Sopenharmony_ci def finish_send(trans, key, ov): 5717db96d56Sopenharmony_ci try: 5727db96d56Sopenharmony_ci return ov.getresult() 5737db96d56Sopenharmony_ci except OSError as exc: 5747db96d56Sopenharmony_ci if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 5757db96d56Sopenharmony_ci _overlapped.ERROR_OPERATION_ABORTED): 5767db96d56Sopenharmony_ci raise ConnectionResetError(*exc.args) 5777db96d56Sopenharmony_ci else: 5787db96d56Sopenharmony_ci raise 5797db96d56Sopenharmony_ci 5807db96d56Sopenharmony_ci return self._register(ov, conn, finish_send) 5817db96d56Sopenharmony_ci 5827db96d56Sopenharmony_ci def accept(self, listener): 5837db96d56Sopenharmony_ci self._register_with_iocp(listener) 5847db96d56Sopenharmony_ci conn = self._get_accept_socket(listener.family) 5857db96d56Sopenharmony_ci ov = _overlapped.Overlapped(NULL) 5867db96d56Sopenharmony_ci ov.AcceptEx(listener.fileno(), conn.fileno()) 5877db96d56Sopenharmony_ci 5887db96d56Sopenharmony_ci def finish_accept(trans, key, ov): 5897db96d56Sopenharmony_ci ov.getresult() 5907db96d56Sopenharmony_ci # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. 5917db96d56Sopenharmony_ci buf = struct.pack('@P', listener.fileno()) 5927db96d56Sopenharmony_ci conn.setsockopt(socket.SOL_SOCKET, 5937db96d56Sopenharmony_ci _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) 5947db96d56Sopenharmony_ci conn.settimeout(listener.gettimeout()) 5957db96d56Sopenharmony_ci return conn, conn.getpeername() 5967db96d56Sopenharmony_ci 5977db96d56Sopenharmony_ci async def accept_coro(future, conn): 5987db96d56Sopenharmony_ci # Coroutine closing the accept socket if the future is cancelled 5997db96d56Sopenharmony_ci try: 6007db96d56Sopenharmony_ci await future 6017db96d56Sopenharmony_ci except exceptions.CancelledError: 6027db96d56Sopenharmony_ci conn.close() 6037db96d56Sopenharmony_ci raise 6047db96d56Sopenharmony_ci 6057db96d56Sopenharmony_ci future = self._register(ov, listener, finish_accept) 6067db96d56Sopenharmony_ci coro = accept_coro(future, conn) 6077db96d56Sopenharmony_ci tasks.ensure_future(coro, loop=self._loop) 6087db96d56Sopenharmony_ci return future 6097db96d56Sopenharmony_ci 6107db96d56Sopenharmony_ci def connect(self, conn, address): 6117db96d56Sopenharmony_ci if conn.type == socket.SOCK_DGRAM: 6127db96d56Sopenharmony_ci # WSAConnect will complete immediately for UDP sockets so we don't 6137db96d56Sopenharmony_ci # need to register any IOCP operation 6147db96d56Sopenharmony_ci _overlapped.WSAConnect(conn.fileno(), address) 6157db96d56Sopenharmony_ci fut = self._loop.create_future() 6167db96d56Sopenharmony_ci fut.set_result(None) 6177db96d56Sopenharmony_ci return fut 6187db96d56Sopenharmony_ci 6197db96d56Sopenharmony_ci self._register_with_iocp(conn) 6207db96d56Sopenharmony_ci # The socket needs to be locally bound before we call ConnectEx(). 6217db96d56Sopenharmony_ci try: 6227db96d56Sopenharmony_ci _overlapped.BindLocal(conn.fileno(), conn.family) 6237db96d56Sopenharmony_ci except OSError as e: 6247db96d56Sopenharmony_ci if e.winerror != errno.WSAEINVAL: 6257db96d56Sopenharmony_ci raise 6267db96d56Sopenharmony_ci # Probably already locally bound; check using getsockname(). 6277db96d56Sopenharmony_ci if conn.getsockname()[1] == 0: 6287db96d56Sopenharmony_ci raise 6297db96d56Sopenharmony_ci ov = _overlapped.Overlapped(NULL) 6307db96d56Sopenharmony_ci ov.ConnectEx(conn.fileno(), address) 6317db96d56Sopenharmony_ci 6327db96d56Sopenharmony_ci def finish_connect(trans, key, ov): 6337db96d56Sopenharmony_ci ov.getresult() 6347db96d56Sopenharmony_ci # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work. 6357db96d56Sopenharmony_ci conn.setsockopt(socket.SOL_SOCKET, 6367db96d56Sopenharmony_ci _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0) 6377db96d56Sopenharmony_ci return conn 6387db96d56Sopenharmony_ci 6397db96d56Sopenharmony_ci return self._register(ov, conn, finish_connect) 6407db96d56Sopenharmony_ci 6417db96d56Sopenharmony_ci def sendfile(self, sock, file, offset, count): 6427db96d56Sopenharmony_ci self._register_with_iocp(sock) 6437db96d56Sopenharmony_ci ov = _overlapped.Overlapped(NULL) 6447db96d56Sopenharmony_ci offset_low = offset & 0xffff_ffff 6457db96d56Sopenharmony_ci offset_high = (offset >> 32) & 0xffff_ffff 6467db96d56Sopenharmony_ci ov.TransmitFile(sock.fileno(), 6477db96d56Sopenharmony_ci msvcrt.get_osfhandle(file.fileno()), 6487db96d56Sopenharmony_ci offset_low, offset_high, 6497db96d56Sopenharmony_ci count, 0, 0) 6507db96d56Sopenharmony_ci 6517db96d56Sopenharmony_ci def finish_sendfile(trans, key, ov): 6527db96d56Sopenharmony_ci try: 6537db96d56Sopenharmony_ci return ov.getresult() 6547db96d56Sopenharmony_ci except OSError as exc: 6557db96d56Sopenharmony_ci if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 6567db96d56Sopenharmony_ci _overlapped.ERROR_OPERATION_ABORTED): 6577db96d56Sopenharmony_ci raise ConnectionResetError(*exc.args) 6587db96d56Sopenharmony_ci else: 6597db96d56Sopenharmony_ci raise 6607db96d56Sopenharmony_ci return self._register(ov, sock, finish_sendfile) 6617db96d56Sopenharmony_ci 6627db96d56Sopenharmony_ci def accept_pipe(self, pipe): 6637db96d56Sopenharmony_ci self._register_with_iocp(pipe) 6647db96d56Sopenharmony_ci ov = _overlapped.Overlapped(NULL) 6657db96d56Sopenharmony_ci connected = ov.ConnectNamedPipe(pipe.fileno()) 6667db96d56Sopenharmony_ci 6677db96d56Sopenharmony_ci if connected: 6687db96d56Sopenharmony_ci # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means 6697db96d56Sopenharmony_ci # that the pipe is connected. There is no need to wait for the 6707db96d56Sopenharmony_ci # completion of the connection. 6717db96d56Sopenharmony_ci return self._result(pipe) 6727db96d56Sopenharmony_ci 6737db96d56Sopenharmony_ci def finish_accept_pipe(trans, key, ov): 6747db96d56Sopenharmony_ci ov.getresult() 6757db96d56Sopenharmony_ci return pipe 6767db96d56Sopenharmony_ci 6777db96d56Sopenharmony_ci return self._register(ov, pipe, finish_accept_pipe) 6787db96d56Sopenharmony_ci 6797db96d56Sopenharmony_ci async def connect_pipe(self, address): 6807db96d56Sopenharmony_ci delay = CONNECT_PIPE_INIT_DELAY 6817db96d56Sopenharmony_ci while True: 6827db96d56Sopenharmony_ci # Unfortunately there is no way to do an overlapped connect to 6837db96d56Sopenharmony_ci # a pipe. Call CreateFile() in a loop until it doesn't fail with 6847db96d56Sopenharmony_ci # ERROR_PIPE_BUSY. 6857db96d56Sopenharmony_ci try: 6867db96d56Sopenharmony_ci handle = _overlapped.ConnectPipe(address) 6877db96d56Sopenharmony_ci break 6887db96d56Sopenharmony_ci except OSError as exc: 6897db96d56Sopenharmony_ci if exc.winerror != _overlapped.ERROR_PIPE_BUSY: 6907db96d56Sopenharmony_ci raise 6917db96d56Sopenharmony_ci 6927db96d56Sopenharmony_ci # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later 6937db96d56Sopenharmony_ci delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) 6947db96d56Sopenharmony_ci await tasks.sleep(delay) 6957db96d56Sopenharmony_ci 6967db96d56Sopenharmony_ci return windows_utils.PipeHandle(handle) 6977db96d56Sopenharmony_ci 6987db96d56Sopenharmony_ci def wait_for_handle(self, handle, timeout=None): 6997db96d56Sopenharmony_ci """Wait for a handle. 7007db96d56Sopenharmony_ci 7017db96d56Sopenharmony_ci Return a Future object. The result of the future is True if the wait 7027db96d56Sopenharmony_ci completed, or False if the wait did not complete (on timeout). 7037db96d56Sopenharmony_ci """ 7047db96d56Sopenharmony_ci return self._wait_for_handle(handle, timeout, False) 7057db96d56Sopenharmony_ci 7067db96d56Sopenharmony_ci def _wait_cancel(self, event, done_callback): 7077db96d56Sopenharmony_ci fut = self._wait_for_handle(event, None, True) 7087db96d56Sopenharmony_ci # add_done_callback() cannot be used because the wait may only complete 7097db96d56Sopenharmony_ci # in IocpProactor.close(), while the event loop is not running. 7107db96d56Sopenharmony_ci fut._done_callback = done_callback 7117db96d56Sopenharmony_ci return fut 7127db96d56Sopenharmony_ci 7137db96d56Sopenharmony_ci def _wait_for_handle(self, handle, timeout, _is_cancel): 7147db96d56Sopenharmony_ci self._check_closed() 7157db96d56Sopenharmony_ci 7167db96d56Sopenharmony_ci if timeout is None: 7177db96d56Sopenharmony_ci ms = _winapi.INFINITE 7187db96d56Sopenharmony_ci else: 7197db96d56Sopenharmony_ci # RegisterWaitForSingleObject() has a resolution of 1 millisecond, 7207db96d56Sopenharmony_ci # round away from zero to wait *at least* timeout seconds. 7217db96d56Sopenharmony_ci ms = math.ceil(timeout * 1e3) 7227db96d56Sopenharmony_ci 7237db96d56Sopenharmony_ci # We only create ov so we can use ov.address as a key for the cache. 7247db96d56Sopenharmony_ci ov = _overlapped.Overlapped(NULL) 7257db96d56Sopenharmony_ci wait_handle = _overlapped.RegisterWaitWithQueue( 7267db96d56Sopenharmony_ci handle, self._iocp, ov.address, ms) 7277db96d56Sopenharmony_ci if _is_cancel: 7287db96d56Sopenharmony_ci f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) 7297db96d56Sopenharmony_ci else: 7307db96d56Sopenharmony_ci f = _WaitHandleFuture(ov, handle, wait_handle, self, 7317db96d56Sopenharmony_ci loop=self._loop) 7327db96d56Sopenharmony_ci if f._source_traceback: 7337db96d56Sopenharmony_ci del f._source_traceback[-1] 7347db96d56Sopenharmony_ci 7357db96d56Sopenharmony_ci def finish_wait_for_handle(trans, key, ov): 7367db96d56Sopenharmony_ci # Note that this second wait means that we should only use 7377db96d56Sopenharmony_ci # this with handles types where a successful wait has no 7387db96d56Sopenharmony_ci # effect. So events or processes are all right, but locks 7397db96d56Sopenharmony_ci # or semaphores are not. Also note if the handle is 7407db96d56Sopenharmony_ci # signalled and then quickly reset, then we may return 7417db96d56Sopenharmony_ci # False even though we have not timed out. 7427db96d56Sopenharmony_ci return f._poll() 7437db96d56Sopenharmony_ci 7447db96d56Sopenharmony_ci self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) 7457db96d56Sopenharmony_ci return f 7467db96d56Sopenharmony_ci 7477db96d56Sopenharmony_ci def _register_with_iocp(self, obj): 7487db96d56Sopenharmony_ci # To get notifications of finished ops on this objects sent to the 7497db96d56Sopenharmony_ci # completion port, were must register the handle. 7507db96d56Sopenharmony_ci if obj not in self._registered: 7517db96d56Sopenharmony_ci self._registered.add(obj) 7527db96d56Sopenharmony_ci _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) 7537db96d56Sopenharmony_ci # XXX We could also use SetFileCompletionNotificationModes() 7547db96d56Sopenharmony_ci # to avoid sending notifications to completion port of ops 7557db96d56Sopenharmony_ci # that succeed immediately. 7567db96d56Sopenharmony_ci 7577db96d56Sopenharmony_ci def _register(self, ov, obj, callback): 7587db96d56Sopenharmony_ci self._check_closed() 7597db96d56Sopenharmony_ci 7607db96d56Sopenharmony_ci # Return a future which will be set with the result of the 7617db96d56Sopenharmony_ci # operation when it completes. The future's value is actually 7627db96d56Sopenharmony_ci # the value returned by callback(). 7637db96d56Sopenharmony_ci f = _OverlappedFuture(ov, loop=self._loop) 7647db96d56Sopenharmony_ci if f._source_traceback: 7657db96d56Sopenharmony_ci del f._source_traceback[-1] 7667db96d56Sopenharmony_ci if not ov.pending: 7677db96d56Sopenharmony_ci # The operation has completed, so no need to postpone the 7687db96d56Sopenharmony_ci # work. We cannot take this short cut if we need the 7697db96d56Sopenharmony_ci # NumberOfBytes, CompletionKey values returned by 7707db96d56Sopenharmony_ci # PostQueuedCompletionStatus(). 7717db96d56Sopenharmony_ci try: 7727db96d56Sopenharmony_ci value = callback(None, None, ov) 7737db96d56Sopenharmony_ci except OSError as e: 7747db96d56Sopenharmony_ci f.set_exception(e) 7757db96d56Sopenharmony_ci else: 7767db96d56Sopenharmony_ci f.set_result(value) 7777db96d56Sopenharmony_ci # Even if GetOverlappedResult() was called, we have to wait for the 7787db96d56Sopenharmony_ci # notification of the completion in GetQueuedCompletionStatus(). 7797db96d56Sopenharmony_ci # Register the overlapped operation to keep a reference to the 7807db96d56Sopenharmony_ci # OVERLAPPED object, otherwise the memory is freed and Windows may 7817db96d56Sopenharmony_ci # read uninitialized memory. 7827db96d56Sopenharmony_ci 7837db96d56Sopenharmony_ci # Register the overlapped operation for later. Note that 7847db96d56Sopenharmony_ci # we only store obj to prevent it from being garbage 7857db96d56Sopenharmony_ci # collected too early. 7867db96d56Sopenharmony_ci self._cache[ov.address] = (f, ov, obj, callback) 7877db96d56Sopenharmony_ci return f 7887db96d56Sopenharmony_ci 7897db96d56Sopenharmony_ci def _unregister(self, ov): 7907db96d56Sopenharmony_ci """Unregister an overlapped object. 7917db96d56Sopenharmony_ci 7927db96d56Sopenharmony_ci Call this method when its future has been cancelled. The event can 7937db96d56Sopenharmony_ci already be signalled (pending in the proactor event queue). It is also 7947db96d56Sopenharmony_ci safe if the event is never signalled (because it was cancelled). 7957db96d56Sopenharmony_ci """ 7967db96d56Sopenharmony_ci self._check_closed() 7977db96d56Sopenharmony_ci self._unregistered.append(ov) 7987db96d56Sopenharmony_ci 7997db96d56Sopenharmony_ci def _get_accept_socket(self, family): 8007db96d56Sopenharmony_ci s = socket.socket(family) 8017db96d56Sopenharmony_ci s.settimeout(0) 8027db96d56Sopenharmony_ci return s 8037db96d56Sopenharmony_ci 8047db96d56Sopenharmony_ci def _poll(self, timeout=None): 8057db96d56Sopenharmony_ci if timeout is None: 8067db96d56Sopenharmony_ci ms = INFINITE 8077db96d56Sopenharmony_ci elif timeout < 0: 8087db96d56Sopenharmony_ci raise ValueError("negative timeout") 8097db96d56Sopenharmony_ci else: 8107db96d56Sopenharmony_ci # GetQueuedCompletionStatus() has a resolution of 1 millisecond, 8117db96d56Sopenharmony_ci # round away from zero to wait *at least* timeout seconds. 8127db96d56Sopenharmony_ci ms = math.ceil(timeout * 1e3) 8137db96d56Sopenharmony_ci if ms >= INFINITE: 8147db96d56Sopenharmony_ci raise ValueError("timeout too big") 8157db96d56Sopenharmony_ci 8167db96d56Sopenharmony_ci while True: 8177db96d56Sopenharmony_ci status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) 8187db96d56Sopenharmony_ci if status is None: 8197db96d56Sopenharmony_ci break 8207db96d56Sopenharmony_ci ms = 0 8217db96d56Sopenharmony_ci 8227db96d56Sopenharmony_ci err, transferred, key, address = status 8237db96d56Sopenharmony_ci try: 8247db96d56Sopenharmony_ci f, ov, obj, callback = self._cache.pop(address) 8257db96d56Sopenharmony_ci except KeyError: 8267db96d56Sopenharmony_ci if self._loop.get_debug(): 8277db96d56Sopenharmony_ci self._loop.call_exception_handler({ 8287db96d56Sopenharmony_ci 'message': ('GetQueuedCompletionStatus() returned an ' 8297db96d56Sopenharmony_ci 'unexpected event'), 8307db96d56Sopenharmony_ci 'status': ('err=%s transferred=%s key=%#x address=%#x' 8317db96d56Sopenharmony_ci % (err, transferred, key, address)), 8327db96d56Sopenharmony_ci }) 8337db96d56Sopenharmony_ci 8347db96d56Sopenharmony_ci # key is either zero, or it is used to return a pipe 8357db96d56Sopenharmony_ci # handle which should be closed to avoid a leak. 8367db96d56Sopenharmony_ci if key not in (0, _overlapped.INVALID_HANDLE_VALUE): 8377db96d56Sopenharmony_ci _winapi.CloseHandle(key) 8387db96d56Sopenharmony_ci continue 8397db96d56Sopenharmony_ci 8407db96d56Sopenharmony_ci if obj in self._stopped_serving: 8417db96d56Sopenharmony_ci f.cancel() 8427db96d56Sopenharmony_ci # Don't call the callback if _register() already read the result or 8437db96d56Sopenharmony_ci # if the overlapped has been cancelled 8447db96d56Sopenharmony_ci elif not f.done(): 8457db96d56Sopenharmony_ci try: 8467db96d56Sopenharmony_ci value = callback(transferred, key, ov) 8477db96d56Sopenharmony_ci except OSError as e: 8487db96d56Sopenharmony_ci f.set_exception(e) 8497db96d56Sopenharmony_ci self._results.append(f) 8507db96d56Sopenharmony_ci else: 8517db96d56Sopenharmony_ci f.set_result(value) 8527db96d56Sopenharmony_ci self._results.append(f) 8537db96d56Sopenharmony_ci finally: 8547db96d56Sopenharmony_ci f = None 8557db96d56Sopenharmony_ci 8567db96d56Sopenharmony_ci # Remove unregistered futures 8577db96d56Sopenharmony_ci for ov in self._unregistered: 8587db96d56Sopenharmony_ci self._cache.pop(ov.address, None) 8597db96d56Sopenharmony_ci self._unregistered.clear() 8607db96d56Sopenharmony_ci 8617db96d56Sopenharmony_ci def _stop_serving(self, obj): 8627db96d56Sopenharmony_ci # obj is a socket or pipe handle. It will be closed in 8637db96d56Sopenharmony_ci # BaseProactorEventLoop._stop_serving() which will make any 8647db96d56Sopenharmony_ci # pending operations fail quickly. 8657db96d56Sopenharmony_ci self._stopped_serving.add(obj) 8667db96d56Sopenharmony_ci 8677db96d56Sopenharmony_ci def close(self): 8687db96d56Sopenharmony_ci if self._iocp is None: 8697db96d56Sopenharmony_ci # already closed 8707db96d56Sopenharmony_ci return 8717db96d56Sopenharmony_ci 8727db96d56Sopenharmony_ci # Cancel remaining registered operations. 8737db96d56Sopenharmony_ci for fut, ov, obj, callback in list(self._cache.values()): 8747db96d56Sopenharmony_ci if fut.cancelled(): 8757db96d56Sopenharmony_ci # Nothing to do with cancelled futures 8767db96d56Sopenharmony_ci pass 8777db96d56Sopenharmony_ci elif isinstance(fut, _WaitCancelFuture): 8787db96d56Sopenharmony_ci # _WaitCancelFuture must not be cancelled 8797db96d56Sopenharmony_ci pass 8807db96d56Sopenharmony_ci else: 8817db96d56Sopenharmony_ci try: 8827db96d56Sopenharmony_ci fut.cancel() 8837db96d56Sopenharmony_ci except OSError as exc: 8847db96d56Sopenharmony_ci if self._loop is not None: 8857db96d56Sopenharmony_ci context = { 8867db96d56Sopenharmony_ci 'message': 'Cancelling a future failed', 8877db96d56Sopenharmony_ci 'exception': exc, 8887db96d56Sopenharmony_ci 'future': fut, 8897db96d56Sopenharmony_ci } 8907db96d56Sopenharmony_ci if fut._source_traceback: 8917db96d56Sopenharmony_ci context['source_traceback'] = fut._source_traceback 8927db96d56Sopenharmony_ci self._loop.call_exception_handler(context) 8937db96d56Sopenharmony_ci 8947db96d56Sopenharmony_ci # Wait until all cancelled overlapped complete: don't exit with running 8957db96d56Sopenharmony_ci # overlapped to prevent a crash. Display progress every second if the 8967db96d56Sopenharmony_ci # loop is still running. 8977db96d56Sopenharmony_ci msg_update = 1.0 8987db96d56Sopenharmony_ci start_time = time.monotonic() 8997db96d56Sopenharmony_ci next_msg = start_time + msg_update 9007db96d56Sopenharmony_ci while self._cache: 9017db96d56Sopenharmony_ci if next_msg <= time.monotonic(): 9027db96d56Sopenharmony_ci logger.debug('%r is running after closing for %.1f seconds', 9037db96d56Sopenharmony_ci self, time.monotonic() - start_time) 9047db96d56Sopenharmony_ci next_msg = time.monotonic() + msg_update 9057db96d56Sopenharmony_ci 9067db96d56Sopenharmony_ci # handle a few events, or timeout 9077db96d56Sopenharmony_ci self._poll(msg_update) 9087db96d56Sopenharmony_ci 9097db96d56Sopenharmony_ci self._results = [] 9107db96d56Sopenharmony_ci 9117db96d56Sopenharmony_ci _winapi.CloseHandle(self._iocp) 9127db96d56Sopenharmony_ci self._iocp = None 9137db96d56Sopenharmony_ci 9147db96d56Sopenharmony_ci def __del__(self): 9157db96d56Sopenharmony_ci self.close() 9167db96d56Sopenharmony_ci 9177db96d56Sopenharmony_ci 9187db96d56Sopenharmony_ciclass _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): 9197db96d56Sopenharmony_ci 9207db96d56Sopenharmony_ci def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 9217db96d56Sopenharmony_ci self._proc = windows_utils.Popen( 9227db96d56Sopenharmony_ci args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 9237db96d56Sopenharmony_ci bufsize=bufsize, **kwargs) 9247db96d56Sopenharmony_ci 9257db96d56Sopenharmony_ci def callback(f): 9267db96d56Sopenharmony_ci returncode = self._proc.poll() 9277db96d56Sopenharmony_ci self._process_exited(returncode) 9287db96d56Sopenharmony_ci 9297db96d56Sopenharmony_ci f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) 9307db96d56Sopenharmony_ci f.add_done_callback(callback) 9317db96d56Sopenharmony_ci 9327db96d56Sopenharmony_ci 9337db96d56Sopenharmony_ciSelectorEventLoop = _WindowsSelectorEventLoop 9347db96d56Sopenharmony_ci 9357db96d56Sopenharmony_ci 9367db96d56Sopenharmony_ciclass WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 9377db96d56Sopenharmony_ci _loop_factory = SelectorEventLoop 9387db96d56Sopenharmony_ci 9397db96d56Sopenharmony_ci 9407db96d56Sopenharmony_ciclass WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 9417db96d56Sopenharmony_ci _loop_factory = ProactorEventLoop 9427db96d56Sopenharmony_ci 9437db96d56Sopenharmony_ci 9447db96d56Sopenharmony_ciDefaultEventLoopPolicy = WindowsProactorEventLoopPolicy 945