17db96d56Sopenharmony_ci"""Selector and proactor event loops for Windows."""
27db96d56Sopenharmony_ci
37db96d56Sopenharmony_ciimport sys
47db96d56Sopenharmony_ci
57db96d56Sopenharmony_ciif sys.platform != 'win32':  # pragma: no cover
67db96d56Sopenharmony_ci    raise ImportError('win32 only')
77db96d56Sopenharmony_ci
87db96d56Sopenharmony_ciimport _overlapped
97db96d56Sopenharmony_ciimport _winapi
107db96d56Sopenharmony_ciimport errno
117db96d56Sopenharmony_ciimport math
127db96d56Sopenharmony_ciimport msvcrt
137db96d56Sopenharmony_ciimport socket
147db96d56Sopenharmony_ciimport struct
157db96d56Sopenharmony_ciimport time
167db96d56Sopenharmony_ciimport weakref
177db96d56Sopenharmony_ci
187db96d56Sopenharmony_cifrom . import events
197db96d56Sopenharmony_cifrom . import base_subprocess
207db96d56Sopenharmony_cifrom . import futures
217db96d56Sopenharmony_cifrom . import exceptions
227db96d56Sopenharmony_cifrom . import proactor_events
237db96d56Sopenharmony_cifrom . import selector_events
247db96d56Sopenharmony_cifrom . import tasks
257db96d56Sopenharmony_cifrom . import windows_utils
267db96d56Sopenharmony_cifrom .log import logger
277db96d56Sopenharmony_ci
287db96d56Sopenharmony_ci
297db96d56Sopenharmony_ci__all__ = (
307db96d56Sopenharmony_ci    'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
317db96d56Sopenharmony_ci    'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
327db96d56Sopenharmony_ci    'WindowsProactorEventLoopPolicy',
337db96d56Sopenharmony_ci)
347db96d56Sopenharmony_ci
357db96d56Sopenharmony_ci
367db96d56Sopenharmony_ciNULL = _winapi.NULL
377db96d56Sopenharmony_ciINFINITE = _winapi.INFINITE
387db96d56Sopenharmony_ciERROR_CONNECTION_REFUSED = 1225
397db96d56Sopenharmony_ciERROR_CONNECTION_ABORTED = 1236
407db96d56Sopenharmony_ci
417db96d56Sopenharmony_ci# Initial delay in seconds for connect_pipe() before retrying to connect
427db96d56Sopenharmony_ciCONNECT_PIPE_INIT_DELAY = 0.001
437db96d56Sopenharmony_ci
447db96d56Sopenharmony_ci# Maximum delay in seconds for connect_pipe() before retrying to connect
457db96d56Sopenharmony_ciCONNECT_PIPE_MAX_DELAY = 0.100
467db96d56Sopenharmony_ci
477db96d56Sopenharmony_ci
487db96d56Sopenharmony_ciclass _OverlappedFuture(futures.Future):
497db96d56Sopenharmony_ci    """Subclass of Future which represents an overlapped operation.
507db96d56Sopenharmony_ci
517db96d56Sopenharmony_ci    Cancelling it will immediately cancel the overlapped operation.
527db96d56Sopenharmony_ci    """
537db96d56Sopenharmony_ci
547db96d56Sopenharmony_ci    def __init__(self, ov, *, loop=None):
557db96d56Sopenharmony_ci        super().__init__(loop=loop)
567db96d56Sopenharmony_ci        if self._source_traceback:
577db96d56Sopenharmony_ci            del self._source_traceback[-1]
587db96d56Sopenharmony_ci        self._ov = ov
597db96d56Sopenharmony_ci
607db96d56Sopenharmony_ci    def _repr_info(self):
617db96d56Sopenharmony_ci        info = super()._repr_info()
627db96d56Sopenharmony_ci        if self._ov is not None:
637db96d56Sopenharmony_ci            state = 'pending' if self._ov.pending else 'completed'
647db96d56Sopenharmony_ci            info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
657db96d56Sopenharmony_ci        return info
667db96d56Sopenharmony_ci
677db96d56Sopenharmony_ci    def _cancel_overlapped(self):
687db96d56Sopenharmony_ci        if self._ov is None:
697db96d56Sopenharmony_ci            return
707db96d56Sopenharmony_ci        try:
717db96d56Sopenharmony_ci            self._ov.cancel()
727db96d56Sopenharmony_ci        except OSError as exc:
737db96d56Sopenharmony_ci            context = {
747db96d56Sopenharmony_ci                'message': 'Cancelling an overlapped future failed',
757db96d56Sopenharmony_ci                'exception': exc,
767db96d56Sopenharmony_ci                'future': self,
777db96d56Sopenharmony_ci            }
787db96d56Sopenharmony_ci            if self._source_traceback:
797db96d56Sopenharmony_ci                context['source_traceback'] = self._source_traceback
807db96d56Sopenharmony_ci            self._loop.call_exception_handler(context)
817db96d56Sopenharmony_ci        self._ov = None
827db96d56Sopenharmony_ci
837db96d56Sopenharmony_ci    def cancel(self, msg=None):
847db96d56Sopenharmony_ci        self._cancel_overlapped()
857db96d56Sopenharmony_ci        return super().cancel(msg=msg)
867db96d56Sopenharmony_ci
877db96d56Sopenharmony_ci    def set_exception(self, exception):
887db96d56Sopenharmony_ci        super().set_exception(exception)
897db96d56Sopenharmony_ci        self._cancel_overlapped()
907db96d56Sopenharmony_ci
917db96d56Sopenharmony_ci    def set_result(self, result):
927db96d56Sopenharmony_ci        super().set_result(result)
937db96d56Sopenharmony_ci        self._ov = None
947db96d56Sopenharmony_ci
957db96d56Sopenharmony_ci
967db96d56Sopenharmony_ciclass _BaseWaitHandleFuture(futures.Future):
977db96d56Sopenharmony_ci    """Subclass of Future which represents a wait handle."""
987db96d56Sopenharmony_ci
997db96d56Sopenharmony_ci    def __init__(self, ov, handle, wait_handle, *, loop=None):
1007db96d56Sopenharmony_ci        super().__init__(loop=loop)
1017db96d56Sopenharmony_ci        if self._source_traceback:
1027db96d56Sopenharmony_ci            del self._source_traceback[-1]
1037db96d56Sopenharmony_ci        # Keep a reference to the Overlapped object to keep it alive until the
1047db96d56Sopenharmony_ci        # wait is unregistered
1057db96d56Sopenharmony_ci        self._ov = ov
1067db96d56Sopenharmony_ci        self._handle = handle
1077db96d56Sopenharmony_ci        self._wait_handle = wait_handle
1087db96d56Sopenharmony_ci
1097db96d56Sopenharmony_ci        # Should we call UnregisterWaitEx() if the wait completes
1107db96d56Sopenharmony_ci        # or is cancelled?
1117db96d56Sopenharmony_ci        self._registered = True
1127db96d56Sopenharmony_ci
1137db96d56Sopenharmony_ci    def _poll(self):
1147db96d56Sopenharmony_ci        # non-blocking wait: use a timeout of 0 millisecond
1157db96d56Sopenharmony_ci        return (_winapi.WaitForSingleObject(self._handle, 0) ==
1167db96d56Sopenharmony_ci                _winapi.WAIT_OBJECT_0)
1177db96d56Sopenharmony_ci
1187db96d56Sopenharmony_ci    def _repr_info(self):
1197db96d56Sopenharmony_ci        info = super()._repr_info()
1207db96d56Sopenharmony_ci        info.append(f'handle={self._handle:#x}')
1217db96d56Sopenharmony_ci        if self._handle is not None:
1227db96d56Sopenharmony_ci            state = 'signaled' if self._poll() else 'waiting'
1237db96d56Sopenharmony_ci            info.append(state)
1247db96d56Sopenharmony_ci        if self._wait_handle is not None:
1257db96d56Sopenharmony_ci            info.append(f'wait_handle={self._wait_handle:#x}')
1267db96d56Sopenharmony_ci        return info
1277db96d56Sopenharmony_ci
1287db96d56Sopenharmony_ci    def _unregister_wait_cb(self, fut):
1297db96d56Sopenharmony_ci        # The wait was unregistered: it's not safe to destroy the Overlapped
1307db96d56Sopenharmony_ci        # object
1317db96d56Sopenharmony_ci        self._ov = None
1327db96d56Sopenharmony_ci
1337db96d56Sopenharmony_ci    def _unregister_wait(self):
1347db96d56Sopenharmony_ci        if not self._registered:
1357db96d56Sopenharmony_ci            return
1367db96d56Sopenharmony_ci        self._registered = False
1377db96d56Sopenharmony_ci
1387db96d56Sopenharmony_ci        wait_handle = self._wait_handle
1397db96d56Sopenharmony_ci        self._wait_handle = None
1407db96d56Sopenharmony_ci        try:
1417db96d56Sopenharmony_ci            _overlapped.UnregisterWait(wait_handle)
1427db96d56Sopenharmony_ci        except OSError as exc:
1437db96d56Sopenharmony_ci            if exc.winerror != _overlapped.ERROR_IO_PENDING:
1447db96d56Sopenharmony_ci                context = {
1457db96d56Sopenharmony_ci                    'message': 'Failed to unregister the wait handle',
1467db96d56Sopenharmony_ci                    'exception': exc,
1477db96d56Sopenharmony_ci                    'future': self,
1487db96d56Sopenharmony_ci                }
1497db96d56Sopenharmony_ci                if self._source_traceback:
1507db96d56Sopenharmony_ci                    context['source_traceback'] = self._source_traceback
1517db96d56Sopenharmony_ci                self._loop.call_exception_handler(context)
1527db96d56Sopenharmony_ci                return
1537db96d56Sopenharmony_ci            # ERROR_IO_PENDING means that the unregister is pending
1547db96d56Sopenharmony_ci
1557db96d56Sopenharmony_ci        self._unregister_wait_cb(None)
1567db96d56Sopenharmony_ci
1577db96d56Sopenharmony_ci    def cancel(self, msg=None):
1587db96d56Sopenharmony_ci        self._unregister_wait()
1597db96d56Sopenharmony_ci        return super().cancel(msg=msg)
1607db96d56Sopenharmony_ci
1617db96d56Sopenharmony_ci    def set_exception(self, exception):
1627db96d56Sopenharmony_ci        self._unregister_wait()
1637db96d56Sopenharmony_ci        super().set_exception(exception)
1647db96d56Sopenharmony_ci
1657db96d56Sopenharmony_ci    def set_result(self, result):
1667db96d56Sopenharmony_ci        self._unregister_wait()
1677db96d56Sopenharmony_ci        super().set_result(result)
1687db96d56Sopenharmony_ci
1697db96d56Sopenharmony_ci
1707db96d56Sopenharmony_ciclass _WaitCancelFuture(_BaseWaitHandleFuture):
1717db96d56Sopenharmony_ci    """Subclass of Future which represents a wait for the cancellation of a
1727db96d56Sopenharmony_ci    _WaitHandleFuture using an event.
1737db96d56Sopenharmony_ci    """
1747db96d56Sopenharmony_ci
1757db96d56Sopenharmony_ci    def __init__(self, ov, event, wait_handle, *, loop=None):
1767db96d56Sopenharmony_ci        super().__init__(ov, event, wait_handle, loop=loop)
1777db96d56Sopenharmony_ci
1787db96d56Sopenharmony_ci        self._done_callback = None
1797db96d56Sopenharmony_ci
1807db96d56Sopenharmony_ci    def cancel(self):
1817db96d56Sopenharmony_ci        raise RuntimeError("_WaitCancelFuture must not be cancelled")
1827db96d56Sopenharmony_ci
1837db96d56Sopenharmony_ci    def set_result(self, result):
1847db96d56Sopenharmony_ci        super().set_result(result)
1857db96d56Sopenharmony_ci        if self._done_callback is not None:
1867db96d56Sopenharmony_ci            self._done_callback(self)
1877db96d56Sopenharmony_ci
1887db96d56Sopenharmony_ci    def set_exception(self, exception):
1897db96d56Sopenharmony_ci        super().set_exception(exception)
1907db96d56Sopenharmony_ci        if self._done_callback is not None:
1917db96d56Sopenharmony_ci            self._done_callback(self)
1927db96d56Sopenharmony_ci
1937db96d56Sopenharmony_ci
1947db96d56Sopenharmony_ciclass _WaitHandleFuture(_BaseWaitHandleFuture):
1957db96d56Sopenharmony_ci    def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
1967db96d56Sopenharmony_ci        super().__init__(ov, handle, wait_handle, loop=loop)
1977db96d56Sopenharmony_ci        self._proactor = proactor
1987db96d56Sopenharmony_ci        self._unregister_proactor = True
1997db96d56Sopenharmony_ci        self._event = _overlapped.CreateEvent(None, True, False, None)
2007db96d56Sopenharmony_ci        self._event_fut = None
2017db96d56Sopenharmony_ci
2027db96d56Sopenharmony_ci    def _unregister_wait_cb(self, fut):
2037db96d56Sopenharmony_ci        if self._event is not None:
2047db96d56Sopenharmony_ci            _winapi.CloseHandle(self._event)
2057db96d56Sopenharmony_ci            self._event = None
2067db96d56Sopenharmony_ci            self._event_fut = None
2077db96d56Sopenharmony_ci
2087db96d56Sopenharmony_ci        # If the wait was cancelled, the wait may never be signalled, so
2097db96d56Sopenharmony_ci        # it's required to unregister it. Otherwise, IocpProactor.close() will
2107db96d56Sopenharmony_ci        # wait forever for an event which will never come.
2117db96d56Sopenharmony_ci        #
2127db96d56Sopenharmony_ci        # If the IocpProactor already received the event, it's safe to call
2137db96d56Sopenharmony_ci        # _unregister() because we kept a reference to the Overlapped object
2147db96d56Sopenharmony_ci        # which is used as a unique key.
2157db96d56Sopenharmony_ci        self._proactor._unregister(self._ov)
2167db96d56Sopenharmony_ci        self._proactor = None
2177db96d56Sopenharmony_ci
2187db96d56Sopenharmony_ci        super()._unregister_wait_cb(fut)
2197db96d56Sopenharmony_ci
2207db96d56Sopenharmony_ci    def _unregister_wait(self):
2217db96d56Sopenharmony_ci        if not self._registered:
2227db96d56Sopenharmony_ci            return
2237db96d56Sopenharmony_ci        self._registered = False
2247db96d56Sopenharmony_ci
2257db96d56Sopenharmony_ci        wait_handle = self._wait_handle
2267db96d56Sopenharmony_ci        self._wait_handle = None
2277db96d56Sopenharmony_ci        try:
2287db96d56Sopenharmony_ci            _overlapped.UnregisterWaitEx(wait_handle, self._event)
2297db96d56Sopenharmony_ci        except OSError as exc:
2307db96d56Sopenharmony_ci            if exc.winerror != _overlapped.ERROR_IO_PENDING:
2317db96d56Sopenharmony_ci                context = {
2327db96d56Sopenharmony_ci                    'message': 'Failed to unregister the wait handle',
2337db96d56Sopenharmony_ci                    'exception': exc,
2347db96d56Sopenharmony_ci                    'future': self,
2357db96d56Sopenharmony_ci                }
2367db96d56Sopenharmony_ci                if self._source_traceback:
2377db96d56Sopenharmony_ci                    context['source_traceback'] = self._source_traceback
2387db96d56Sopenharmony_ci                self._loop.call_exception_handler(context)
2397db96d56Sopenharmony_ci                return
2407db96d56Sopenharmony_ci            # ERROR_IO_PENDING is not an error, the wait was unregistered
2417db96d56Sopenharmony_ci
2427db96d56Sopenharmony_ci        self._event_fut = self._proactor._wait_cancel(self._event,
2437db96d56Sopenharmony_ci                                                      self._unregister_wait_cb)
2447db96d56Sopenharmony_ci
2457db96d56Sopenharmony_ci
2467db96d56Sopenharmony_ciclass PipeServer(object):
2477db96d56Sopenharmony_ci    """Class representing a pipe server.
2487db96d56Sopenharmony_ci
2497db96d56Sopenharmony_ci    This is much like a bound, listening socket.
2507db96d56Sopenharmony_ci    """
2517db96d56Sopenharmony_ci    def __init__(self, address):
2527db96d56Sopenharmony_ci        self._address = address
2537db96d56Sopenharmony_ci        self._free_instances = weakref.WeakSet()
2547db96d56Sopenharmony_ci        # initialize the pipe attribute before calling _server_pipe_handle()
2557db96d56Sopenharmony_ci        # because this function can raise an exception and the destructor calls
2567db96d56Sopenharmony_ci        # the close() method
2577db96d56Sopenharmony_ci        self._pipe = None
2587db96d56Sopenharmony_ci        self._accept_pipe_future = None
2597db96d56Sopenharmony_ci        self._pipe = self._server_pipe_handle(True)
2607db96d56Sopenharmony_ci
2617db96d56Sopenharmony_ci    def _get_unconnected_pipe(self):
2627db96d56Sopenharmony_ci        # Create new instance and return previous one.  This ensures
2637db96d56Sopenharmony_ci        # that (until the server is closed) there is always at least
2647db96d56Sopenharmony_ci        # one pipe handle for address.  Therefore if a client attempt
2657db96d56Sopenharmony_ci        # to connect it will not fail with FileNotFoundError.
2667db96d56Sopenharmony_ci        tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
2677db96d56Sopenharmony_ci        return tmp
2687db96d56Sopenharmony_ci
2697db96d56Sopenharmony_ci    def _server_pipe_handle(self, first):
2707db96d56Sopenharmony_ci        # Return a wrapper for a new pipe handle.
2717db96d56Sopenharmony_ci        if self.closed():
2727db96d56Sopenharmony_ci            return None
2737db96d56Sopenharmony_ci        flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
2747db96d56Sopenharmony_ci        if first:
2757db96d56Sopenharmony_ci            flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
2767db96d56Sopenharmony_ci        h = _winapi.CreateNamedPipe(
2777db96d56Sopenharmony_ci            self._address, flags,
2787db96d56Sopenharmony_ci            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
2797db96d56Sopenharmony_ci            _winapi.PIPE_WAIT,
2807db96d56Sopenharmony_ci            _winapi.PIPE_UNLIMITED_INSTANCES,
2817db96d56Sopenharmony_ci            windows_utils.BUFSIZE, windows_utils.BUFSIZE,
2827db96d56Sopenharmony_ci            _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
2837db96d56Sopenharmony_ci        pipe = windows_utils.PipeHandle(h)
2847db96d56Sopenharmony_ci        self._free_instances.add(pipe)
2857db96d56Sopenharmony_ci        return pipe
2867db96d56Sopenharmony_ci
2877db96d56Sopenharmony_ci    def closed(self):
2887db96d56Sopenharmony_ci        return (self._address is None)
2897db96d56Sopenharmony_ci
2907db96d56Sopenharmony_ci    def close(self):
2917db96d56Sopenharmony_ci        if self._accept_pipe_future is not None:
2927db96d56Sopenharmony_ci            self._accept_pipe_future.cancel()
2937db96d56Sopenharmony_ci            self._accept_pipe_future = None
2947db96d56Sopenharmony_ci        # Close all instances which have not been connected to by a client.
2957db96d56Sopenharmony_ci        if self._address is not None:
2967db96d56Sopenharmony_ci            for pipe in self._free_instances:
2977db96d56Sopenharmony_ci                pipe.close()
2987db96d56Sopenharmony_ci            self._pipe = None
2997db96d56Sopenharmony_ci            self._address = None
3007db96d56Sopenharmony_ci            self._free_instances.clear()
3017db96d56Sopenharmony_ci
3027db96d56Sopenharmony_ci    __del__ = close
3037db96d56Sopenharmony_ci
3047db96d56Sopenharmony_ci
3057db96d56Sopenharmony_ciclass _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
3067db96d56Sopenharmony_ci    """Windows version of selector event loop."""
3077db96d56Sopenharmony_ci
3087db96d56Sopenharmony_ci
3097db96d56Sopenharmony_ciclass ProactorEventLoop(proactor_events.BaseProactorEventLoop):
3107db96d56Sopenharmony_ci    """Windows version of proactor event loop using IOCP."""
3117db96d56Sopenharmony_ci
3127db96d56Sopenharmony_ci    def __init__(self, proactor=None):
3137db96d56Sopenharmony_ci        if proactor is None:
3147db96d56Sopenharmony_ci            proactor = IocpProactor()
3157db96d56Sopenharmony_ci        super().__init__(proactor)
3167db96d56Sopenharmony_ci
3177db96d56Sopenharmony_ci    def run_forever(self):
3187db96d56Sopenharmony_ci        try:
3197db96d56Sopenharmony_ci            assert self._self_reading_future is None
3207db96d56Sopenharmony_ci            self.call_soon(self._loop_self_reading)
3217db96d56Sopenharmony_ci            super().run_forever()
3227db96d56Sopenharmony_ci        finally:
3237db96d56Sopenharmony_ci            if self._self_reading_future is not None:
3247db96d56Sopenharmony_ci                ov = self._self_reading_future._ov
3257db96d56Sopenharmony_ci                self._self_reading_future.cancel()
3267db96d56Sopenharmony_ci                # self_reading_future was just cancelled so if it hasn't been
3277db96d56Sopenharmony_ci                # finished yet, it never will be (it's possible that it has
3287db96d56Sopenharmony_ci                # already finished and its callback is waiting in the queue,
3297db96d56Sopenharmony_ci                # where it could still happen if the event loop is restarted).
3307db96d56Sopenharmony_ci                # Unregister it otherwise IocpProactor.close will wait for it
3317db96d56Sopenharmony_ci                # forever
3327db96d56Sopenharmony_ci                if ov is not None:
3337db96d56Sopenharmony_ci                    self._proactor._unregister(ov)
3347db96d56Sopenharmony_ci                self._self_reading_future = None
3357db96d56Sopenharmony_ci
3367db96d56Sopenharmony_ci    async def create_pipe_connection(self, protocol_factory, address):
3377db96d56Sopenharmony_ci        f = self._proactor.connect_pipe(address)
3387db96d56Sopenharmony_ci        pipe = await f
3397db96d56Sopenharmony_ci        protocol = protocol_factory()
3407db96d56Sopenharmony_ci        trans = self._make_duplex_pipe_transport(pipe, protocol,
3417db96d56Sopenharmony_ci                                                 extra={'addr': address})
3427db96d56Sopenharmony_ci        return trans, protocol
3437db96d56Sopenharmony_ci
3447db96d56Sopenharmony_ci    async def start_serving_pipe(self, protocol_factory, address):
3457db96d56Sopenharmony_ci        server = PipeServer(address)
3467db96d56Sopenharmony_ci
3477db96d56Sopenharmony_ci        def loop_accept_pipe(f=None):
3487db96d56Sopenharmony_ci            pipe = None
3497db96d56Sopenharmony_ci            try:
3507db96d56Sopenharmony_ci                if f:
3517db96d56Sopenharmony_ci                    pipe = f.result()
3527db96d56Sopenharmony_ci                    server._free_instances.discard(pipe)
3537db96d56Sopenharmony_ci
3547db96d56Sopenharmony_ci                    if server.closed():
3557db96d56Sopenharmony_ci                        # A client connected before the server was closed:
3567db96d56Sopenharmony_ci                        # drop the client (close the pipe) and exit
3577db96d56Sopenharmony_ci                        pipe.close()
3587db96d56Sopenharmony_ci                        return
3597db96d56Sopenharmony_ci
3607db96d56Sopenharmony_ci                    protocol = protocol_factory()
3617db96d56Sopenharmony_ci                    self._make_duplex_pipe_transport(
3627db96d56Sopenharmony_ci                        pipe, protocol, extra={'addr': address})
3637db96d56Sopenharmony_ci
3647db96d56Sopenharmony_ci                pipe = server._get_unconnected_pipe()
3657db96d56Sopenharmony_ci                if pipe is None:
3667db96d56Sopenharmony_ci                    return
3677db96d56Sopenharmony_ci
3687db96d56Sopenharmony_ci                f = self._proactor.accept_pipe(pipe)
3697db96d56Sopenharmony_ci            except BrokenPipeError:
3707db96d56Sopenharmony_ci                if pipe and pipe.fileno() != -1:
3717db96d56Sopenharmony_ci                    pipe.close()
3727db96d56Sopenharmony_ci                self.call_soon(loop_accept_pipe)
3737db96d56Sopenharmony_ci            except OSError as exc:
3747db96d56Sopenharmony_ci                if pipe and pipe.fileno() != -1:
3757db96d56Sopenharmony_ci                    self.call_exception_handler({
3767db96d56Sopenharmony_ci                        'message': 'Pipe accept failed',
3777db96d56Sopenharmony_ci                        'exception': exc,
3787db96d56Sopenharmony_ci                        'pipe': pipe,
3797db96d56Sopenharmony_ci                    })
3807db96d56Sopenharmony_ci                    pipe.close()
3817db96d56Sopenharmony_ci                elif self._debug:
3827db96d56Sopenharmony_ci                    logger.warning("Accept pipe failed on pipe %r",
3837db96d56Sopenharmony_ci                                   pipe, exc_info=True)
3847db96d56Sopenharmony_ci                self.call_soon(loop_accept_pipe)
3857db96d56Sopenharmony_ci            except exceptions.CancelledError:
3867db96d56Sopenharmony_ci                if pipe:
3877db96d56Sopenharmony_ci                    pipe.close()
3887db96d56Sopenharmony_ci            else:
3897db96d56Sopenharmony_ci                server._accept_pipe_future = f
3907db96d56Sopenharmony_ci                f.add_done_callback(loop_accept_pipe)
3917db96d56Sopenharmony_ci
3927db96d56Sopenharmony_ci        self.call_soon(loop_accept_pipe)
3937db96d56Sopenharmony_ci        return [server]
3947db96d56Sopenharmony_ci
3957db96d56Sopenharmony_ci    async def _make_subprocess_transport(self, protocol, args, shell,
3967db96d56Sopenharmony_ci                                         stdin, stdout, stderr, bufsize,
3977db96d56Sopenharmony_ci                                         extra=None, **kwargs):
3987db96d56Sopenharmony_ci        waiter = self.create_future()
3997db96d56Sopenharmony_ci        transp = _WindowsSubprocessTransport(self, protocol, args, shell,
4007db96d56Sopenharmony_ci                                             stdin, stdout, stderr, bufsize,
4017db96d56Sopenharmony_ci                                             waiter=waiter, extra=extra,
4027db96d56Sopenharmony_ci                                             **kwargs)
4037db96d56Sopenharmony_ci        try:
4047db96d56Sopenharmony_ci            await waiter
4057db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
4067db96d56Sopenharmony_ci            raise
4077db96d56Sopenharmony_ci        except BaseException:
4087db96d56Sopenharmony_ci            transp.close()
4097db96d56Sopenharmony_ci            await transp._wait()
4107db96d56Sopenharmony_ci            raise
4117db96d56Sopenharmony_ci
4127db96d56Sopenharmony_ci        return transp
4137db96d56Sopenharmony_ci
4147db96d56Sopenharmony_ci
4157db96d56Sopenharmony_ciclass IocpProactor:
4167db96d56Sopenharmony_ci    """Proactor implementation using IOCP."""
4177db96d56Sopenharmony_ci
4187db96d56Sopenharmony_ci    def __init__(self, concurrency=INFINITE):
4197db96d56Sopenharmony_ci        self._loop = None
4207db96d56Sopenharmony_ci        self._results = []
4217db96d56Sopenharmony_ci        self._iocp = _overlapped.CreateIoCompletionPort(
4227db96d56Sopenharmony_ci            _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
4237db96d56Sopenharmony_ci        self._cache = {}
4247db96d56Sopenharmony_ci        self._registered = weakref.WeakSet()
4257db96d56Sopenharmony_ci        self._unregistered = []
4267db96d56Sopenharmony_ci        self._stopped_serving = weakref.WeakSet()
4277db96d56Sopenharmony_ci
4287db96d56Sopenharmony_ci    def _check_closed(self):
4297db96d56Sopenharmony_ci        if self._iocp is None:
4307db96d56Sopenharmony_ci            raise RuntimeError('IocpProactor is closed')
4317db96d56Sopenharmony_ci
4327db96d56Sopenharmony_ci    def __repr__(self):
4337db96d56Sopenharmony_ci        info = ['overlapped#=%s' % len(self._cache),
4347db96d56Sopenharmony_ci                'result#=%s' % len(self._results)]
4357db96d56Sopenharmony_ci        if self._iocp is None:
4367db96d56Sopenharmony_ci            info.append('closed')
4377db96d56Sopenharmony_ci        return '<%s %s>' % (self.__class__.__name__, " ".join(info))
4387db96d56Sopenharmony_ci
4397db96d56Sopenharmony_ci    def set_loop(self, loop):
4407db96d56Sopenharmony_ci        self._loop = loop
4417db96d56Sopenharmony_ci
4427db96d56Sopenharmony_ci    def select(self, timeout=None):
4437db96d56Sopenharmony_ci        if not self._results:
4447db96d56Sopenharmony_ci            self._poll(timeout)
4457db96d56Sopenharmony_ci        tmp = self._results
4467db96d56Sopenharmony_ci        self._results = []
4477db96d56Sopenharmony_ci        try:
4487db96d56Sopenharmony_ci            return tmp
4497db96d56Sopenharmony_ci        finally:
4507db96d56Sopenharmony_ci            # Needed to break cycles when an exception occurs.
4517db96d56Sopenharmony_ci            tmp = None
4527db96d56Sopenharmony_ci
4537db96d56Sopenharmony_ci    def _result(self, value):
4547db96d56Sopenharmony_ci        fut = self._loop.create_future()
4557db96d56Sopenharmony_ci        fut.set_result(value)
4567db96d56Sopenharmony_ci        return fut
4577db96d56Sopenharmony_ci
4587db96d56Sopenharmony_ci    def recv(self, conn, nbytes, flags=0):
4597db96d56Sopenharmony_ci        self._register_with_iocp(conn)
4607db96d56Sopenharmony_ci        ov = _overlapped.Overlapped(NULL)
4617db96d56Sopenharmony_ci        try:
4627db96d56Sopenharmony_ci            if isinstance(conn, socket.socket):
4637db96d56Sopenharmony_ci                ov.WSARecv(conn.fileno(), nbytes, flags)
4647db96d56Sopenharmony_ci            else:
4657db96d56Sopenharmony_ci                ov.ReadFile(conn.fileno(), nbytes)
4667db96d56Sopenharmony_ci        except BrokenPipeError:
4677db96d56Sopenharmony_ci            return self._result(b'')
4687db96d56Sopenharmony_ci
4697db96d56Sopenharmony_ci        def finish_recv(trans, key, ov):
4707db96d56Sopenharmony_ci            try:
4717db96d56Sopenharmony_ci                return ov.getresult()
4727db96d56Sopenharmony_ci            except OSError as exc:
4737db96d56Sopenharmony_ci                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
4747db96d56Sopenharmony_ci                                    _overlapped.ERROR_OPERATION_ABORTED):
4757db96d56Sopenharmony_ci                    raise ConnectionResetError(*exc.args)
4767db96d56Sopenharmony_ci                else:
4777db96d56Sopenharmony_ci                    raise
4787db96d56Sopenharmony_ci
4797db96d56Sopenharmony_ci        return self._register(ov, conn, finish_recv)
4807db96d56Sopenharmony_ci
4817db96d56Sopenharmony_ci    def recv_into(self, conn, buf, flags=0):
4827db96d56Sopenharmony_ci        self._register_with_iocp(conn)
4837db96d56Sopenharmony_ci        ov = _overlapped.Overlapped(NULL)
4847db96d56Sopenharmony_ci        try:
4857db96d56Sopenharmony_ci            if isinstance(conn, socket.socket):
4867db96d56Sopenharmony_ci                ov.WSARecvInto(conn.fileno(), buf, flags)
4877db96d56Sopenharmony_ci            else:
4887db96d56Sopenharmony_ci                ov.ReadFileInto(conn.fileno(), buf)
4897db96d56Sopenharmony_ci        except BrokenPipeError:
4907db96d56Sopenharmony_ci            return self._result(0)
4917db96d56Sopenharmony_ci
4927db96d56Sopenharmony_ci        def finish_recv(trans, key, ov):
4937db96d56Sopenharmony_ci            try:
4947db96d56Sopenharmony_ci                return ov.getresult()
4957db96d56Sopenharmony_ci            except OSError as exc:
4967db96d56Sopenharmony_ci                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
4977db96d56Sopenharmony_ci                                    _overlapped.ERROR_OPERATION_ABORTED):
4987db96d56Sopenharmony_ci                    raise ConnectionResetError(*exc.args)
4997db96d56Sopenharmony_ci                else:
5007db96d56Sopenharmony_ci                    raise
5017db96d56Sopenharmony_ci
5027db96d56Sopenharmony_ci        return self._register(ov, conn, finish_recv)
5037db96d56Sopenharmony_ci
5047db96d56Sopenharmony_ci    def recvfrom(self, conn, nbytes, flags=0):
5057db96d56Sopenharmony_ci        self._register_with_iocp(conn)
5067db96d56Sopenharmony_ci        ov = _overlapped.Overlapped(NULL)
5077db96d56Sopenharmony_ci        try:
5087db96d56Sopenharmony_ci            ov.WSARecvFrom(conn.fileno(), nbytes, flags)
5097db96d56Sopenharmony_ci        except BrokenPipeError:
5107db96d56Sopenharmony_ci            return self._result((b'', None))
5117db96d56Sopenharmony_ci
5127db96d56Sopenharmony_ci        def finish_recv(trans, key, ov):
5137db96d56Sopenharmony_ci            try:
5147db96d56Sopenharmony_ci                return ov.getresult()
5157db96d56Sopenharmony_ci            except OSError as exc:
5167db96d56Sopenharmony_ci                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
5177db96d56Sopenharmony_ci                                    _overlapped.ERROR_OPERATION_ABORTED):
5187db96d56Sopenharmony_ci                    raise ConnectionResetError(*exc.args)
5197db96d56Sopenharmony_ci                else:
5207db96d56Sopenharmony_ci                    raise
5217db96d56Sopenharmony_ci
5227db96d56Sopenharmony_ci        return self._register(ov, conn, finish_recv)
5237db96d56Sopenharmony_ci
5247db96d56Sopenharmony_ci    def recvfrom_into(self, conn, buf, flags=0):
5257db96d56Sopenharmony_ci        self._register_with_iocp(conn)
5267db96d56Sopenharmony_ci        ov = _overlapped.Overlapped(NULL)
5277db96d56Sopenharmony_ci        try:
5287db96d56Sopenharmony_ci            ov.WSARecvFromInto(conn.fileno(), buf, flags)
5297db96d56Sopenharmony_ci        except BrokenPipeError:
5307db96d56Sopenharmony_ci            return self._result((0, None))
5317db96d56Sopenharmony_ci
5327db96d56Sopenharmony_ci        def finish_recv(trans, key, ov):
5337db96d56Sopenharmony_ci            try:
5347db96d56Sopenharmony_ci                return ov.getresult()
5357db96d56Sopenharmony_ci            except OSError as exc:
5367db96d56Sopenharmony_ci                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
5377db96d56Sopenharmony_ci                                    _overlapped.ERROR_OPERATION_ABORTED):
5387db96d56Sopenharmony_ci                    raise ConnectionResetError(*exc.args)
5397db96d56Sopenharmony_ci                else:
5407db96d56Sopenharmony_ci                    raise
5417db96d56Sopenharmony_ci
5427db96d56Sopenharmony_ci        return self._register(ov, conn, finish_recv)
5437db96d56Sopenharmony_ci
5447db96d56Sopenharmony_ci    def sendto(self, conn, buf, flags=0, addr=None):
5457db96d56Sopenharmony_ci        self._register_with_iocp(conn)
5467db96d56Sopenharmony_ci        ov = _overlapped.Overlapped(NULL)
5477db96d56Sopenharmony_ci
5487db96d56Sopenharmony_ci        ov.WSASendTo(conn.fileno(), buf, flags, addr)
5497db96d56Sopenharmony_ci
5507db96d56Sopenharmony_ci        def finish_send(trans, key, ov):
5517db96d56Sopenharmony_ci            try:
5527db96d56Sopenharmony_ci                return ov.getresult()
5537db96d56Sopenharmony_ci            except OSError as exc:
5547db96d56Sopenharmony_ci                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
5557db96d56Sopenharmony_ci                                    _overlapped.ERROR_OPERATION_ABORTED):
5567db96d56Sopenharmony_ci                    raise ConnectionResetError(*exc.args)
5577db96d56Sopenharmony_ci                else:
5587db96d56Sopenharmony_ci                    raise
5597db96d56Sopenharmony_ci
5607db96d56Sopenharmony_ci        return self._register(ov, conn, finish_send)
5617db96d56Sopenharmony_ci
5627db96d56Sopenharmony_ci    def send(self, conn, buf, flags=0):
5637db96d56Sopenharmony_ci        self._register_with_iocp(conn)
5647db96d56Sopenharmony_ci        ov = _overlapped.Overlapped(NULL)
5657db96d56Sopenharmony_ci        if isinstance(conn, socket.socket):
5667db96d56Sopenharmony_ci            ov.WSASend(conn.fileno(), buf, flags)
5677db96d56Sopenharmony_ci        else:
5687db96d56Sopenharmony_ci            ov.WriteFile(conn.fileno(), buf)
5697db96d56Sopenharmony_ci
5707db96d56Sopenharmony_ci        def finish_send(trans, key, ov):
5717db96d56Sopenharmony_ci            try:
5727db96d56Sopenharmony_ci                return ov.getresult()
5737db96d56Sopenharmony_ci            except OSError as exc:
5747db96d56Sopenharmony_ci                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
5757db96d56Sopenharmony_ci                                    _overlapped.ERROR_OPERATION_ABORTED):
5767db96d56Sopenharmony_ci                    raise ConnectionResetError(*exc.args)
5777db96d56Sopenharmony_ci                else:
5787db96d56Sopenharmony_ci                    raise
5797db96d56Sopenharmony_ci
5807db96d56Sopenharmony_ci        return self._register(ov, conn, finish_send)
5817db96d56Sopenharmony_ci
5827db96d56Sopenharmony_ci    def accept(self, listener):
5837db96d56Sopenharmony_ci        self._register_with_iocp(listener)
5847db96d56Sopenharmony_ci        conn = self._get_accept_socket(listener.family)
5857db96d56Sopenharmony_ci        ov = _overlapped.Overlapped(NULL)
5867db96d56Sopenharmony_ci        ov.AcceptEx(listener.fileno(), conn.fileno())
5877db96d56Sopenharmony_ci
5887db96d56Sopenharmony_ci        def finish_accept(trans, key, ov):
5897db96d56Sopenharmony_ci            ov.getresult()
5907db96d56Sopenharmony_ci            # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
5917db96d56Sopenharmony_ci            buf = struct.pack('@P', listener.fileno())
5927db96d56Sopenharmony_ci            conn.setsockopt(socket.SOL_SOCKET,
5937db96d56Sopenharmony_ci                            _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
5947db96d56Sopenharmony_ci            conn.settimeout(listener.gettimeout())
5957db96d56Sopenharmony_ci            return conn, conn.getpeername()
5967db96d56Sopenharmony_ci
5977db96d56Sopenharmony_ci        async def accept_coro(future, conn):
5987db96d56Sopenharmony_ci            # Coroutine closing the accept socket if the future is cancelled
5997db96d56Sopenharmony_ci            try:
6007db96d56Sopenharmony_ci                await future
6017db96d56Sopenharmony_ci            except exceptions.CancelledError:
6027db96d56Sopenharmony_ci                conn.close()
6037db96d56Sopenharmony_ci                raise
6047db96d56Sopenharmony_ci
6057db96d56Sopenharmony_ci        future = self._register(ov, listener, finish_accept)
6067db96d56Sopenharmony_ci        coro = accept_coro(future, conn)
6077db96d56Sopenharmony_ci        tasks.ensure_future(coro, loop=self._loop)
6087db96d56Sopenharmony_ci        return future
6097db96d56Sopenharmony_ci
6107db96d56Sopenharmony_ci    def connect(self, conn, address):
6117db96d56Sopenharmony_ci        if conn.type == socket.SOCK_DGRAM:
6127db96d56Sopenharmony_ci            # WSAConnect will complete immediately for UDP sockets so we don't
6137db96d56Sopenharmony_ci            # need to register any IOCP operation
6147db96d56Sopenharmony_ci            _overlapped.WSAConnect(conn.fileno(), address)
6157db96d56Sopenharmony_ci            fut = self._loop.create_future()
6167db96d56Sopenharmony_ci            fut.set_result(None)
6177db96d56Sopenharmony_ci            return fut
6187db96d56Sopenharmony_ci
6197db96d56Sopenharmony_ci        self._register_with_iocp(conn)
6207db96d56Sopenharmony_ci        # The socket needs to be locally bound before we call ConnectEx().
6217db96d56Sopenharmony_ci        try:
6227db96d56Sopenharmony_ci            _overlapped.BindLocal(conn.fileno(), conn.family)
6237db96d56Sopenharmony_ci        except OSError as e:
6247db96d56Sopenharmony_ci            if e.winerror != errno.WSAEINVAL:
6257db96d56Sopenharmony_ci                raise
6267db96d56Sopenharmony_ci            # Probably already locally bound; check using getsockname().
6277db96d56Sopenharmony_ci            if conn.getsockname()[1] == 0:
6287db96d56Sopenharmony_ci                raise
6297db96d56Sopenharmony_ci        ov = _overlapped.Overlapped(NULL)
6307db96d56Sopenharmony_ci        ov.ConnectEx(conn.fileno(), address)
6317db96d56Sopenharmony_ci
6327db96d56Sopenharmony_ci        def finish_connect(trans, key, ov):
6337db96d56Sopenharmony_ci            ov.getresult()
6347db96d56Sopenharmony_ci            # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
6357db96d56Sopenharmony_ci            conn.setsockopt(socket.SOL_SOCKET,
6367db96d56Sopenharmony_ci                            _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
6377db96d56Sopenharmony_ci            return conn
6387db96d56Sopenharmony_ci
6397db96d56Sopenharmony_ci        return self._register(ov, conn, finish_connect)
6407db96d56Sopenharmony_ci
6417db96d56Sopenharmony_ci    def sendfile(self, sock, file, offset, count):
6427db96d56Sopenharmony_ci        self._register_with_iocp(sock)
6437db96d56Sopenharmony_ci        ov = _overlapped.Overlapped(NULL)
6447db96d56Sopenharmony_ci        offset_low = offset & 0xffff_ffff
6457db96d56Sopenharmony_ci        offset_high = (offset >> 32) & 0xffff_ffff
6467db96d56Sopenharmony_ci        ov.TransmitFile(sock.fileno(),
6477db96d56Sopenharmony_ci                        msvcrt.get_osfhandle(file.fileno()),
6487db96d56Sopenharmony_ci                        offset_low, offset_high,
6497db96d56Sopenharmony_ci                        count, 0, 0)
6507db96d56Sopenharmony_ci
6517db96d56Sopenharmony_ci        def finish_sendfile(trans, key, ov):
6527db96d56Sopenharmony_ci            try:
6537db96d56Sopenharmony_ci                return ov.getresult()
6547db96d56Sopenharmony_ci            except OSError as exc:
6557db96d56Sopenharmony_ci                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
6567db96d56Sopenharmony_ci                                    _overlapped.ERROR_OPERATION_ABORTED):
6577db96d56Sopenharmony_ci                    raise ConnectionResetError(*exc.args)
6587db96d56Sopenharmony_ci                else:
6597db96d56Sopenharmony_ci                    raise
6607db96d56Sopenharmony_ci        return self._register(ov, sock, finish_sendfile)
6617db96d56Sopenharmony_ci
6627db96d56Sopenharmony_ci    def accept_pipe(self, pipe):
6637db96d56Sopenharmony_ci        self._register_with_iocp(pipe)
6647db96d56Sopenharmony_ci        ov = _overlapped.Overlapped(NULL)
6657db96d56Sopenharmony_ci        connected = ov.ConnectNamedPipe(pipe.fileno())
6667db96d56Sopenharmony_ci
6677db96d56Sopenharmony_ci        if connected:
6687db96d56Sopenharmony_ci            # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
6697db96d56Sopenharmony_ci            # that the pipe is connected. There is no need to wait for the
6707db96d56Sopenharmony_ci            # completion of the connection.
6717db96d56Sopenharmony_ci            return self._result(pipe)
6727db96d56Sopenharmony_ci
6737db96d56Sopenharmony_ci        def finish_accept_pipe(trans, key, ov):
6747db96d56Sopenharmony_ci            ov.getresult()
6757db96d56Sopenharmony_ci            return pipe
6767db96d56Sopenharmony_ci
6777db96d56Sopenharmony_ci        return self._register(ov, pipe, finish_accept_pipe)
6787db96d56Sopenharmony_ci
6797db96d56Sopenharmony_ci    async def connect_pipe(self, address):
6807db96d56Sopenharmony_ci        delay = CONNECT_PIPE_INIT_DELAY
6817db96d56Sopenharmony_ci        while True:
6827db96d56Sopenharmony_ci            # Unfortunately there is no way to do an overlapped connect to
6837db96d56Sopenharmony_ci            # a pipe.  Call CreateFile() in a loop until it doesn't fail with
6847db96d56Sopenharmony_ci            # ERROR_PIPE_BUSY.
6857db96d56Sopenharmony_ci            try:
6867db96d56Sopenharmony_ci                handle = _overlapped.ConnectPipe(address)
6877db96d56Sopenharmony_ci                break
6887db96d56Sopenharmony_ci            except OSError as exc:
6897db96d56Sopenharmony_ci                if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
6907db96d56Sopenharmony_ci                    raise
6917db96d56Sopenharmony_ci
6927db96d56Sopenharmony_ci            # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
6937db96d56Sopenharmony_ci            delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
6947db96d56Sopenharmony_ci            await tasks.sleep(delay)
6957db96d56Sopenharmony_ci
6967db96d56Sopenharmony_ci        return windows_utils.PipeHandle(handle)
6977db96d56Sopenharmony_ci
6987db96d56Sopenharmony_ci    def wait_for_handle(self, handle, timeout=None):
6997db96d56Sopenharmony_ci        """Wait for a handle.
7007db96d56Sopenharmony_ci
7017db96d56Sopenharmony_ci        Return a Future object. The result of the future is True if the wait
7027db96d56Sopenharmony_ci        completed, or False if the wait did not complete (on timeout).
7037db96d56Sopenharmony_ci        """
7047db96d56Sopenharmony_ci        return self._wait_for_handle(handle, timeout, False)
7057db96d56Sopenharmony_ci
7067db96d56Sopenharmony_ci    def _wait_cancel(self, event, done_callback):
7077db96d56Sopenharmony_ci        fut = self._wait_for_handle(event, None, True)
7087db96d56Sopenharmony_ci        # add_done_callback() cannot be used because the wait may only complete
7097db96d56Sopenharmony_ci        # in IocpProactor.close(), while the event loop is not running.
7107db96d56Sopenharmony_ci        fut._done_callback = done_callback
7117db96d56Sopenharmony_ci        return fut
7127db96d56Sopenharmony_ci
7137db96d56Sopenharmony_ci    def _wait_for_handle(self, handle, timeout, _is_cancel):
7147db96d56Sopenharmony_ci        self._check_closed()
7157db96d56Sopenharmony_ci
7167db96d56Sopenharmony_ci        if timeout is None:
7177db96d56Sopenharmony_ci            ms = _winapi.INFINITE
7187db96d56Sopenharmony_ci        else:
7197db96d56Sopenharmony_ci            # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
7207db96d56Sopenharmony_ci            # round away from zero to wait *at least* timeout seconds.
7217db96d56Sopenharmony_ci            ms = math.ceil(timeout * 1e3)
7227db96d56Sopenharmony_ci
7237db96d56Sopenharmony_ci        # We only create ov so we can use ov.address as a key for the cache.
7247db96d56Sopenharmony_ci        ov = _overlapped.Overlapped(NULL)
7257db96d56Sopenharmony_ci        wait_handle = _overlapped.RegisterWaitWithQueue(
7267db96d56Sopenharmony_ci            handle, self._iocp, ov.address, ms)
7277db96d56Sopenharmony_ci        if _is_cancel:
7287db96d56Sopenharmony_ci            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
7297db96d56Sopenharmony_ci        else:
7307db96d56Sopenharmony_ci            f = _WaitHandleFuture(ov, handle, wait_handle, self,
7317db96d56Sopenharmony_ci                                  loop=self._loop)
7327db96d56Sopenharmony_ci        if f._source_traceback:
7337db96d56Sopenharmony_ci            del f._source_traceback[-1]
7347db96d56Sopenharmony_ci
7357db96d56Sopenharmony_ci        def finish_wait_for_handle(trans, key, ov):
7367db96d56Sopenharmony_ci            # Note that this second wait means that we should only use
7377db96d56Sopenharmony_ci            # this with handles types where a successful wait has no
7387db96d56Sopenharmony_ci            # effect.  So events or processes are all right, but locks
7397db96d56Sopenharmony_ci            # or semaphores are not.  Also note if the handle is
7407db96d56Sopenharmony_ci            # signalled and then quickly reset, then we may return
7417db96d56Sopenharmony_ci            # False even though we have not timed out.
7427db96d56Sopenharmony_ci            return f._poll()
7437db96d56Sopenharmony_ci
7447db96d56Sopenharmony_ci        self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
7457db96d56Sopenharmony_ci        return f
7467db96d56Sopenharmony_ci
7477db96d56Sopenharmony_ci    def _register_with_iocp(self, obj):
7487db96d56Sopenharmony_ci        # To get notifications of finished ops on this objects sent to the
7497db96d56Sopenharmony_ci        # completion port, were must register the handle.
7507db96d56Sopenharmony_ci        if obj not in self._registered:
7517db96d56Sopenharmony_ci            self._registered.add(obj)
7527db96d56Sopenharmony_ci            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
7537db96d56Sopenharmony_ci            # XXX We could also use SetFileCompletionNotificationModes()
7547db96d56Sopenharmony_ci            # to avoid sending notifications to completion port of ops
7557db96d56Sopenharmony_ci            # that succeed immediately.
7567db96d56Sopenharmony_ci
7577db96d56Sopenharmony_ci    def _register(self, ov, obj, callback):
7587db96d56Sopenharmony_ci        self._check_closed()
7597db96d56Sopenharmony_ci
7607db96d56Sopenharmony_ci        # Return a future which will be set with the result of the
7617db96d56Sopenharmony_ci        # operation when it completes.  The future's value is actually
7627db96d56Sopenharmony_ci        # the value returned by callback().
7637db96d56Sopenharmony_ci        f = _OverlappedFuture(ov, loop=self._loop)
7647db96d56Sopenharmony_ci        if f._source_traceback:
7657db96d56Sopenharmony_ci            del f._source_traceback[-1]
7667db96d56Sopenharmony_ci        if not ov.pending:
7677db96d56Sopenharmony_ci            # The operation has completed, so no need to postpone the
7687db96d56Sopenharmony_ci            # work.  We cannot take this short cut if we need the
7697db96d56Sopenharmony_ci            # NumberOfBytes, CompletionKey values returned by
7707db96d56Sopenharmony_ci            # PostQueuedCompletionStatus().
7717db96d56Sopenharmony_ci            try:
7727db96d56Sopenharmony_ci                value = callback(None, None, ov)
7737db96d56Sopenharmony_ci            except OSError as e:
7747db96d56Sopenharmony_ci                f.set_exception(e)
7757db96d56Sopenharmony_ci            else:
7767db96d56Sopenharmony_ci                f.set_result(value)
7777db96d56Sopenharmony_ci            # Even if GetOverlappedResult() was called, we have to wait for the
7787db96d56Sopenharmony_ci            # notification of the completion in GetQueuedCompletionStatus().
7797db96d56Sopenharmony_ci            # Register the overlapped operation to keep a reference to the
7807db96d56Sopenharmony_ci            # OVERLAPPED object, otherwise the memory is freed and Windows may
7817db96d56Sopenharmony_ci            # read uninitialized memory.
7827db96d56Sopenharmony_ci
7837db96d56Sopenharmony_ci        # Register the overlapped operation for later.  Note that
7847db96d56Sopenharmony_ci        # we only store obj to prevent it from being garbage
7857db96d56Sopenharmony_ci        # collected too early.
7867db96d56Sopenharmony_ci        self._cache[ov.address] = (f, ov, obj, callback)
7877db96d56Sopenharmony_ci        return f
7887db96d56Sopenharmony_ci
7897db96d56Sopenharmony_ci    def _unregister(self, ov):
7907db96d56Sopenharmony_ci        """Unregister an overlapped object.
7917db96d56Sopenharmony_ci
7927db96d56Sopenharmony_ci        Call this method when its future has been cancelled. The event can
7937db96d56Sopenharmony_ci        already be signalled (pending in the proactor event queue). It is also
7947db96d56Sopenharmony_ci        safe if the event is never signalled (because it was cancelled).
7957db96d56Sopenharmony_ci        """
7967db96d56Sopenharmony_ci        self._check_closed()
7977db96d56Sopenharmony_ci        self._unregistered.append(ov)
7987db96d56Sopenharmony_ci
7997db96d56Sopenharmony_ci    def _get_accept_socket(self, family):
8007db96d56Sopenharmony_ci        s = socket.socket(family)
8017db96d56Sopenharmony_ci        s.settimeout(0)
8027db96d56Sopenharmony_ci        return s
8037db96d56Sopenharmony_ci
8047db96d56Sopenharmony_ci    def _poll(self, timeout=None):
8057db96d56Sopenharmony_ci        if timeout is None:
8067db96d56Sopenharmony_ci            ms = INFINITE
8077db96d56Sopenharmony_ci        elif timeout < 0:
8087db96d56Sopenharmony_ci            raise ValueError("negative timeout")
8097db96d56Sopenharmony_ci        else:
8107db96d56Sopenharmony_ci            # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
8117db96d56Sopenharmony_ci            # round away from zero to wait *at least* timeout seconds.
8127db96d56Sopenharmony_ci            ms = math.ceil(timeout * 1e3)
8137db96d56Sopenharmony_ci            if ms >= INFINITE:
8147db96d56Sopenharmony_ci                raise ValueError("timeout too big")
8157db96d56Sopenharmony_ci
8167db96d56Sopenharmony_ci        while True:
8177db96d56Sopenharmony_ci            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
8187db96d56Sopenharmony_ci            if status is None:
8197db96d56Sopenharmony_ci                break
8207db96d56Sopenharmony_ci            ms = 0
8217db96d56Sopenharmony_ci
8227db96d56Sopenharmony_ci            err, transferred, key, address = status
8237db96d56Sopenharmony_ci            try:
8247db96d56Sopenharmony_ci                f, ov, obj, callback = self._cache.pop(address)
8257db96d56Sopenharmony_ci            except KeyError:
8267db96d56Sopenharmony_ci                if self._loop.get_debug():
8277db96d56Sopenharmony_ci                    self._loop.call_exception_handler({
8287db96d56Sopenharmony_ci                        'message': ('GetQueuedCompletionStatus() returned an '
8297db96d56Sopenharmony_ci                                    'unexpected event'),
8307db96d56Sopenharmony_ci                        'status': ('err=%s transferred=%s key=%#x address=%#x'
8317db96d56Sopenharmony_ci                                   % (err, transferred, key, address)),
8327db96d56Sopenharmony_ci                    })
8337db96d56Sopenharmony_ci
8347db96d56Sopenharmony_ci                # key is either zero, or it is used to return a pipe
8357db96d56Sopenharmony_ci                # handle which should be closed to avoid a leak.
8367db96d56Sopenharmony_ci                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
8377db96d56Sopenharmony_ci                    _winapi.CloseHandle(key)
8387db96d56Sopenharmony_ci                continue
8397db96d56Sopenharmony_ci
8407db96d56Sopenharmony_ci            if obj in self._stopped_serving:
8417db96d56Sopenharmony_ci                f.cancel()
8427db96d56Sopenharmony_ci            # Don't call the callback if _register() already read the result or
8437db96d56Sopenharmony_ci            # if the overlapped has been cancelled
8447db96d56Sopenharmony_ci            elif not f.done():
8457db96d56Sopenharmony_ci                try:
8467db96d56Sopenharmony_ci                    value = callback(transferred, key, ov)
8477db96d56Sopenharmony_ci                except OSError as e:
8487db96d56Sopenharmony_ci                    f.set_exception(e)
8497db96d56Sopenharmony_ci                    self._results.append(f)
8507db96d56Sopenharmony_ci                else:
8517db96d56Sopenharmony_ci                    f.set_result(value)
8527db96d56Sopenharmony_ci                    self._results.append(f)
8537db96d56Sopenharmony_ci                finally:
8547db96d56Sopenharmony_ci                    f = None
8557db96d56Sopenharmony_ci
8567db96d56Sopenharmony_ci        # Remove unregistered futures
8577db96d56Sopenharmony_ci        for ov in self._unregistered:
8587db96d56Sopenharmony_ci            self._cache.pop(ov.address, None)
8597db96d56Sopenharmony_ci        self._unregistered.clear()
8607db96d56Sopenharmony_ci
8617db96d56Sopenharmony_ci    def _stop_serving(self, obj):
8627db96d56Sopenharmony_ci        # obj is a socket or pipe handle.  It will be closed in
8637db96d56Sopenharmony_ci        # BaseProactorEventLoop._stop_serving() which will make any
8647db96d56Sopenharmony_ci        # pending operations fail quickly.
8657db96d56Sopenharmony_ci        self._stopped_serving.add(obj)
8667db96d56Sopenharmony_ci
8677db96d56Sopenharmony_ci    def close(self):
8687db96d56Sopenharmony_ci        if self._iocp is None:
8697db96d56Sopenharmony_ci            # already closed
8707db96d56Sopenharmony_ci            return
8717db96d56Sopenharmony_ci
8727db96d56Sopenharmony_ci        # Cancel remaining registered operations.
8737db96d56Sopenharmony_ci        for fut, ov, obj, callback in list(self._cache.values()):
8747db96d56Sopenharmony_ci            if fut.cancelled():
8757db96d56Sopenharmony_ci                # Nothing to do with cancelled futures
8767db96d56Sopenharmony_ci                pass
8777db96d56Sopenharmony_ci            elif isinstance(fut, _WaitCancelFuture):
8787db96d56Sopenharmony_ci                # _WaitCancelFuture must not be cancelled
8797db96d56Sopenharmony_ci                pass
8807db96d56Sopenharmony_ci            else:
8817db96d56Sopenharmony_ci                try:
8827db96d56Sopenharmony_ci                    fut.cancel()
8837db96d56Sopenharmony_ci                except OSError as exc:
8847db96d56Sopenharmony_ci                    if self._loop is not None:
8857db96d56Sopenharmony_ci                        context = {
8867db96d56Sopenharmony_ci                            'message': 'Cancelling a future failed',
8877db96d56Sopenharmony_ci                            'exception': exc,
8887db96d56Sopenharmony_ci                            'future': fut,
8897db96d56Sopenharmony_ci                        }
8907db96d56Sopenharmony_ci                        if fut._source_traceback:
8917db96d56Sopenharmony_ci                            context['source_traceback'] = fut._source_traceback
8927db96d56Sopenharmony_ci                        self._loop.call_exception_handler(context)
8937db96d56Sopenharmony_ci
8947db96d56Sopenharmony_ci        # Wait until all cancelled overlapped complete: don't exit with running
8957db96d56Sopenharmony_ci        # overlapped to prevent a crash. Display progress every second if the
8967db96d56Sopenharmony_ci        # loop is still running.
8977db96d56Sopenharmony_ci        msg_update = 1.0
8987db96d56Sopenharmony_ci        start_time = time.monotonic()
8997db96d56Sopenharmony_ci        next_msg = start_time + msg_update
9007db96d56Sopenharmony_ci        while self._cache:
9017db96d56Sopenharmony_ci            if next_msg <= time.monotonic():
9027db96d56Sopenharmony_ci                logger.debug('%r is running after closing for %.1f seconds',
9037db96d56Sopenharmony_ci                             self, time.monotonic() - start_time)
9047db96d56Sopenharmony_ci                next_msg = time.monotonic() + msg_update
9057db96d56Sopenharmony_ci
9067db96d56Sopenharmony_ci            # handle a few events, or timeout
9077db96d56Sopenharmony_ci            self._poll(msg_update)
9087db96d56Sopenharmony_ci
9097db96d56Sopenharmony_ci        self._results = []
9107db96d56Sopenharmony_ci
9117db96d56Sopenharmony_ci        _winapi.CloseHandle(self._iocp)
9127db96d56Sopenharmony_ci        self._iocp = None
9137db96d56Sopenharmony_ci
9147db96d56Sopenharmony_ci    def __del__(self):
9157db96d56Sopenharmony_ci        self.close()
9167db96d56Sopenharmony_ci
9177db96d56Sopenharmony_ci
9187db96d56Sopenharmony_ciclass _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
9197db96d56Sopenharmony_ci
9207db96d56Sopenharmony_ci    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
9217db96d56Sopenharmony_ci        self._proc = windows_utils.Popen(
9227db96d56Sopenharmony_ci            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
9237db96d56Sopenharmony_ci            bufsize=bufsize, **kwargs)
9247db96d56Sopenharmony_ci
9257db96d56Sopenharmony_ci        def callback(f):
9267db96d56Sopenharmony_ci            returncode = self._proc.poll()
9277db96d56Sopenharmony_ci            self._process_exited(returncode)
9287db96d56Sopenharmony_ci
9297db96d56Sopenharmony_ci        f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
9307db96d56Sopenharmony_ci        f.add_done_callback(callback)
9317db96d56Sopenharmony_ci
9327db96d56Sopenharmony_ci
9337db96d56Sopenharmony_ciSelectorEventLoop = _WindowsSelectorEventLoop
9347db96d56Sopenharmony_ci
9357db96d56Sopenharmony_ci
9367db96d56Sopenharmony_ciclass WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
9377db96d56Sopenharmony_ci    _loop_factory = SelectorEventLoop
9387db96d56Sopenharmony_ci
9397db96d56Sopenharmony_ci
9407db96d56Sopenharmony_ciclass WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
9417db96d56Sopenharmony_ci    _loop_factory = ProactorEventLoop
9427db96d56Sopenharmony_ci
9437db96d56Sopenharmony_ci
9447db96d56Sopenharmony_ciDefaultEventLoopPolicy = WindowsProactorEventLoopPolicy
945