17db96d56Sopenharmony_ciimport collections
27db96d56Sopenharmony_ciimport subprocess
37db96d56Sopenharmony_ciimport warnings
47db96d56Sopenharmony_ci
57db96d56Sopenharmony_cifrom . import protocols
67db96d56Sopenharmony_cifrom . import transports
77db96d56Sopenharmony_cifrom .log import logger
87db96d56Sopenharmony_ci
97db96d56Sopenharmony_ci
107db96d56Sopenharmony_ciclass BaseSubprocessTransport(transports.SubprocessTransport):
117db96d56Sopenharmony_ci
127db96d56Sopenharmony_ci    def __init__(self, loop, protocol, args, shell,
137db96d56Sopenharmony_ci                 stdin, stdout, stderr, bufsize,
147db96d56Sopenharmony_ci                 waiter=None, extra=None, **kwargs):
157db96d56Sopenharmony_ci        super().__init__(extra)
167db96d56Sopenharmony_ci        self._closed = False
177db96d56Sopenharmony_ci        self._protocol = protocol
187db96d56Sopenharmony_ci        self._loop = loop
197db96d56Sopenharmony_ci        self._proc = None
207db96d56Sopenharmony_ci        self._pid = None
217db96d56Sopenharmony_ci        self._returncode = None
227db96d56Sopenharmony_ci        self._exit_waiters = []
237db96d56Sopenharmony_ci        self._pending_calls = collections.deque()
247db96d56Sopenharmony_ci        self._pipes = {}
257db96d56Sopenharmony_ci        self._finished = False
267db96d56Sopenharmony_ci
277db96d56Sopenharmony_ci        if stdin == subprocess.PIPE:
287db96d56Sopenharmony_ci            self._pipes[0] = None
297db96d56Sopenharmony_ci        if stdout == subprocess.PIPE:
307db96d56Sopenharmony_ci            self._pipes[1] = None
317db96d56Sopenharmony_ci        if stderr == subprocess.PIPE:
327db96d56Sopenharmony_ci            self._pipes[2] = None
337db96d56Sopenharmony_ci
347db96d56Sopenharmony_ci        # Create the child process: set the _proc attribute
357db96d56Sopenharmony_ci        try:
367db96d56Sopenharmony_ci            self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
377db96d56Sopenharmony_ci                        stderr=stderr, bufsize=bufsize, **kwargs)
387db96d56Sopenharmony_ci        except:
397db96d56Sopenharmony_ci            self.close()
407db96d56Sopenharmony_ci            raise
417db96d56Sopenharmony_ci
427db96d56Sopenharmony_ci        self._pid = self._proc.pid
437db96d56Sopenharmony_ci        self._extra['subprocess'] = self._proc
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci        if self._loop.get_debug():
467db96d56Sopenharmony_ci            if isinstance(args, (bytes, str)):
477db96d56Sopenharmony_ci                program = args
487db96d56Sopenharmony_ci            else:
497db96d56Sopenharmony_ci                program = args[0]
507db96d56Sopenharmony_ci            logger.debug('process %r created: pid %s',
517db96d56Sopenharmony_ci                         program, self._pid)
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ci        self._loop.create_task(self._connect_pipes(waiter))
547db96d56Sopenharmony_ci
557db96d56Sopenharmony_ci    def __repr__(self):
567db96d56Sopenharmony_ci        info = [self.__class__.__name__]
577db96d56Sopenharmony_ci        if self._closed:
587db96d56Sopenharmony_ci            info.append('closed')
597db96d56Sopenharmony_ci        if self._pid is not None:
607db96d56Sopenharmony_ci            info.append(f'pid={self._pid}')
617db96d56Sopenharmony_ci        if self._returncode is not None:
627db96d56Sopenharmony_ci            info.append(f'returncode={self._returncode}')
637db96d56Sopenharmony_ci        elif self._pid is not None:
647db96d56Sopenharmony_ci            info.append('running')
657db96d56Sopenharmony_ci        else:
667db96d56Sopenharmony_ci            info.append('not started')
677db96d56Sopenharmony_ci
687db96d56Sopenharmony_ci        stdin = self._pipes.get(0)
697db96d56Sopenharmony_ci        if stdin is not None:
707db96d56Sopenharmony_ci            info.append(f'stdin={stdin.pipe}')
717db96d56Sopenharmony_ci
727db96d56Sopenharmony_ci        stdout = self._pipes.get(1)
737db96d56Sopenharmony_ci        stderr = self._pipes.get(2)
747db96d56Sopenharmony_ci        if stdout is not None and stderr is stdout:
757db96d56Sopenharmony_ci            info.append(f'stdout=stderr={stdout.pipe}')
767db96d56Sopenharmony_ci        else:
777db96d56Sopenharmony_ci            if stdout is not None:
787db96d56Sopenharmony_ci                info.append(f'stdout={stdout.pipe}')
797db96d56Sopenharmony_ci            if stderr is not None:
807db96d56Sopenharmony_ci                info.append(f'stderr={stderr.pipe}')
817db96d56Sopenharmony_ci
827db96d56Sopenharmony_ci        return '<{}>'.format(' '.join(info))
837db96d56Sopenharmony_ci
847db96d56Sopenharmony_ci    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
857db96d56Sopenharmony_ci        raise NotImplementedError
867db96d56Sopenharmony_ci
877db96d56Sopenharmony_ci    def set_protocol(self, protocol):
887db96d56Sopenharmony_ci        self._protocol = protocol
897db96d56Sopenharmony_ci
907db96d56Sopenharmony_ci    def get_protocol(self):
917db96d56Sopenharmony_ci        return self._protocol
927db96d56Sopenharmony_ci
937db96d56Sopenharmony_ci    def is_closing(self):
947db96d56Sopenharmony_ci        return self._closed
957db96d56Sopenharmony_ci
967db96d56Sopenharmony_ci    def close(self):
977db96d56Sopenharmony_ci        if self._closed:
987db96d56Sopenharmony_ci            return
997db96d56Sopenharmony_ci        self._closed = True
1007db96d56Sopenharmony_ci
1017db96d56Sopenharmony_ci        for proto in self._pipes.values():
1027db96d56Sopenharmony_ci            if proto is None:
1037db96d56Sopenharmony_ci                continue
1047db96d56Sopenharmony_ci            proto.pipe.close()
1057db96d56Sopenharmony_ci
1067db96d56Sopenharmony_ci        if (self._proc is not None and
1077db96d56Sopenharmony_ci                # has the child process finished?
1087db96d56Sopenharmony_ci                self._returncode is None and
1097db96d56Sopenharmony_ci                # the child process has finished, but the
1107db96d56Sopenharmony_ci                # transport hasn't been notified yet?
1117db96d56Sopenharmony_ci                self._proc.poll() is None):
1127db96d56Sopenharmony_ci
1137db96d56Sopenharmony_ci            if self._loop.get_debug():
1147db96d56Sopenharmony_ci                logger.warning('Close running child process: kill %r', self)
1157db96d56Sopenharmony_ci
1167db96d56Sopenharmony_ci            try:
1177db96d56Sopenharmony_ci                self._proc.kill()
1187db96d56Sopenharmony_ci            except ProcessLookupError:
1197db96d56Sopenharmony_ci                pass
1207db96d56Sopenharmony_ci
1217db96d56Sopenharmony_ci            # Don't clear the _proc reference yet: _post_init() may still run
1227db96d56Sopenharmony_ci
1237db96d56Sopenharmony_ci    def __del__(self, _warn=warnings.warn):
1247db96d56Sopenharmony_ci        if not self._closed:
1257db96d56Sopenharmony_ci            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
1267db96d56Sopenharmony_ci            self.close()
1277db96d56Sopenharmony_ci
1287db96d56Sopenharmony_ci    def get_pid(self):
1297db96d56Sopenharmony_ci        return self._pid
1307db96d56Sopenharmony_ci
1317db96d56Sopenharmony_ci    def get_returncode(self):
1327db96d56Sopenharmony_ci        return self._returncode
1337db96d56Sopenharmony_ci
1347db96d56Sopenharmony_ci    def get_pipe_transport(self, fd):
1357db96d56Sopenharmony_ci        if fd in self._pipes:
1367db96d56Sopenharmony_ci            return self._pipes[fd].pipe
1377db96d56Sopenharmony_ci        else:
1387db96d56Sopenharmony_ci            return None
1397db96d56Sopenharmony_ci
1407db96d56Sopenharmony_ci    def _check_proc(self):
1417db96d56Sopenharmony_ci        if self._proc is None:
1427db96d56Sopenharmony_ci            raise ProcessLookupError()
1437db96d56Sopenharmony_ci
1447db96d56Sopenharmony_ci    def send_signal(self, signal):
1457db96d56Sopenharmony_ci        self._check_proc()
1467db96d56Sopenharmony_ci        self._proc.send_signal(signal)
1477db96d56Sopenharmony_ci
1487db96d56Sopenharmony_ci    def terminate(self):
1497db96d56Sopenharmony_ci        self._check_proc()
1507db96d56Sopenharmony_ci        self._proc.terminate()
1517db96d56Sopenharmony_ci
1527db96d56Sopenharmony_ci    def kill(self):
1537db96d56Sopenharmony_ci        self._check_proc()
1547db96d56Sopenharmony_ci        self._proc.kill()
1557db96d56Sopenharmony_ci
1567db96d56Sopenharmony_ci    async def _connect_pipes(self, waiter):
1577db96d56Sopenharmony_ci        try:
1587db96d56Sopenharmony_ci            proc = self._proc
1597db96d56Sopenharmony_ci            loop = self._loop
1607db96d56Sopenharmony_ci
1617db96d56Sopenharmony_ci            if proc.stdin is not None:
1627db96d56Sopenharmony_ci                _, pipe = await loop.connect_write_pipe(
1637db96d56Sopenharmony_ci                    lambda: WriteSubprocessPipeProto(self, 0),
1647db96d56Sopenharmony_ci                    proc.stdin)
1657db96d56Sopenharmony_ci                self._pipes[0] = pipe
1667db96d56Sopenharmony_ci
1677db96d56Sopenharmony_ci            if proc.stdout is not None:
1687db96d56Sopenharmony_ci                _, pipe = await loop.connect_read_pipe(
1697db96d56Sopenharmony_ci                    lambda: ReadSubprocessPipeProto(self, 1),
1707db96d56Sopenharmony_ci                    proc.stdout)
1717db96d56Sopenharmony_ci                self._pipes[1] = pipe
1727db96d56Sopenharmony_ci
1737db96d56Sopenharmony_ci            if proc.stderr is not None:
1747db96d56Sopenharmony_ci                _, pipe = await loop.connect_read_pipe(
1757db96d56Sopenharmony_ci                    lambda: ReadSubprocessPipeProto(self, 2),
1767db96d56Sopenharmony_ci                    proc.stderr)
1777db96d56Sopenharmony_ci                self._pipes[2] = pipe
1787db96d56Sopenharmony_ci
1797db96d56Sopenharmony_ci            assert self._pending_calls is not None
1807db96d56Sopenharmony_ci
1817db96d56Sopenharmony_ci            loop.call_soon(self._protocol.connection_made, self)
1827db96d56Sopenharmony_ci            for callback, data in self._pending_calls:
1837db96d56Sopenharmony_ci                loop.call_soon(callback, *data)
1847db96d56Sopenharmony_ci            self._pending_calls = None
1857db96d56Sopenharmony_ci        except (SystemExit, KeyboardInterrupt):
1867db96d56Sopenharmony_ci            raise
1877db96d56Sopenharmony_ci        except BaseException as exc:
1887db96d56Sopenharmony_ci            if waiter is not None and not waiter.cancelled():
1897db96d56Sopenharmony_ci                waiter.set_exception(exc)
1907db96d56Sopenharmony_ci        else:
1917db96d56Sopenharmony_ci            if waiter is not None and not waiter.cancelled():
1927db96d56Sopenharmony_ci                waiter.set_result(None)
1937db96d56Sopenharmony_ci
1947db96d56Sopenharmony_ci    def _call(self, cb, *data):
1957db96d56Sopenharmony_ci        if self._pending_calls is not None:
1967db96d56Sopenharmony_ci            self._pending_calls.append((cb, data))
1977db96d56Sopenharmony_ci        else:
1987db96d56Sopenharmony_ci            self._loop.call_soon(cb, *data)
1997db96d56Sopenharmony_ci
2007db96d56Sopenharmony_ci    def _pipe_connection_lost(self, fd, exc):
2017db96d56Sopenharmony_ci        self._call(self._protocol.pipe_connection_lost, fd, exc)
2027db96d56Sopenharmony_ci        self._try_finish()
2037db96d56Sopenharmony_ci
2047db96d56Sopenharmony_ci    def _pipe_data_received(self, fd, data):
2057db96d56Sopenharmony_ci        self._call(self._protocol.pipe_data_received, fd, data)
2067db96d56Sopenharmony_ci
2077db96d56Sopenharmony_ci    def _process_exited(self, returncode):
2087db96d56Sopenharmony_ci        assert returncode is not None, returncode
2097db96d56Sopenharmony_ci        assert self._returncode is None, self._returncode
2107db96d56Sopenharmony_ci        if self._loop.get_debug():
2117db96d56Sopenharmony_ci            logger.info('%r exited with return code %r', self, returncode)
2127db96d56Sopenharmony_ci        self._returncode = returncode
2137db96d56Sopenharmony_ci        if self._proc.returncode is None:
2147db96d56Sopenharmony_ci            # asyncio uses a child watcher: copy the status into the Popen
2157db96d56Sopenharmony_ci            # object. On Python 3.6, it is required to avoid a ResourceWarning.
2167db96d56Sopenharmony_ci            self._proc.returncode = returncode
2177db96d56Sopenharmony_ci        self._call(self._protocol.process_exited)
2187db96d56Sopenharmony_ci
2197db96d56Sopenharmony_ci        self._try_finish()
2207db96d56Sopenharmony_ci
2217db96d56Sopenharmony_ci    async def _wait(self):
2227db96d56Sopenharmony_ci        """Wait until the process exit and return the process return code.
2237db96d56Sopenharmony_ci
2247db96d56Sopenharmony_ci        This method is a coroutine."""
2257db96d56Sopenharmony_ci        if self._returncode is not None:
2267db96d56Sopenharmony_ci            return self._returncode
2277db96d56Sopenharmony_ci
2287db96d56Sopenharmony_ci        waiter = self._loop.create_future()
2297db96d56Sopenharmony_ci        self._exit_waiters.append(waiter)
2307db96d56Sopenharmony_ci        return await waiter
2317db96d56Sopenharmony_ci
2327db96d56Sopenharmony_ci    def _try_finish(self):
2337db96d56Sopenharmony_ci        assert not self._finished
2347db96d56Sopenharmony_ci        if self._returncode is None:
2357db96d56Sopenharmony_ci            return
2367db96d56Sopenharmony_ci        if all(p is not None and p.disconnected
2377db96d56Sopenharmony_ci               for p in self._pipes.values()):
2387db96d56Sopenharmony_ci            self._finished = True
2397db96d56Sopenharmony_ci            self._call(self._call_connection_lost, None)
2407db96d56Sopenharmony_ci
2417db96d56Sopenharmony_ci    def _call_connection_lost(self, exc):
2427db96d56Sopenharmony_ci        try:
2437db96d56Sopenharmony_ci            self._protocol.connection_lost(exc)
2447db96d56Sopenharmony_ci        finally:
2457db96d56Sopenharmony_ci            # wake up futures waiting for wait()
2467db96d56Sopenharmony_ci            for waiter in self._exit_waiters:
2477db96d56Sopenharmony_ci                if not waiter.cancelled():
2487db96d56Sopenharmony_ci                    waiter.set_result(self._returncode)
2497db96d56Sopenharmony_ci            self._exit_waiters = None
2507db96d56Sopenharmony_ci            self._loop = None
2517db96d56Sopenharmony_ci            self._proc = None
2527db96d56Sopenharmony_ci            self._protocol = None
2537db96d56Sopenharmony_ci
2547db96d56Sopenharmony_ci
2557db96d56Sopenharmony_ciclass WriteSubprocessPipeProto(protocols.BaseProtocol):
2567db96d56Sopenharmony_ci
2577db96d56Sopenharmony_ci    def __init__(self, proc, fd):
2587db96d56Sopenharmony_ci        self.proc = proc
2597db96d56Sopenharmony_ci        self.fd = fd
2607db96d56Sopenharmony_ci        self.pipe = None
2617db96d56Sopenharmony_ci        self.disconnected = False
2627db96d56Sopenharmony_ci
2637db96d56Sopenharmony_ci    def connection_made(self, transport):
2647db96d56Sopenharmony_ci        self.pipe = transport
2657db96d56Sopenharmony_ci
2667db96d56Sopenharmony_ci    def __repr__(self):
2677db96d56Sopenharmony_ci        return f'<{self.__class__.__name__} fd={self.fd} pipe={self.pipe!r}>'
2687db96d56Sopenharmony_ci
2697db96d56Sopenharmony_ci    def connection_lost(self, exc):
2707db96d56Sopenharmony_ci        self.disconnected = True
2717db96d56Sopenharmony_ci        self.proc._pipe_connection_lost(self.fd, exc)
2727db96d56Sopenharmony_ci        self.proc = None
2737db96d56Sopenharmony_ci
2747db96d56Sopenharmony_ci    def pause_writing(self):
2757db96d56Sopenharmony_ci        self.proc._protocol.pause_writing()
2767db96d56Sopenharmony_ci
2777db96d56Sopenharmony_ci    def resume_writing(self):
2787db96d56Sopenharmony_ci        self.proc._protocol.resume_writing()
2797db96d56Sopenharmony_ci
2807db96d56Sopenharmony_ci
2817db96d56Sopenharmony_ciclass ReadSubprocessPipeProto(WriteSubprocessPipeProto,
2827db96d56Sopenharmony_ci                              protocols.Protocol):
2837db96d56Sopenharmony_ci
2847db96d56Sopenharmony_ci    def data_received(self, data):
2857db96d56Sopenharmony_ci        self.proc._pipe_data_received(self.fd, data)
286