17db96d56Sopenharmony_ciimport collections 27db96d56Sopenharmony_ciimport subprocess 37db96d56Sopenharmony_ciimport warnings 47db96d56Sopenharmony_ci 57db96d56Sopenharmony_cifrom . import protocols 67db96d56Sopenharmony_cifrom . import transports 77db96d56Sopenharmony_cifrom .log import logger 87db96d56Sopenharmony_ci 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_ciclass BaseSubprocessTransport(transports.SubprocessTransport): 117db96d56Sopenharmony_ci 127db96d56Sopenharmony_ci def __init__(self, loop, protocol, args, shell, 137db96d56Sopenharmony_ci stdin, stdout, stderr, bufsize, 147db96d56Sopenharmony_ci waiter=None, extra=None, **kwargs): 157db96d56Sopenharmony_ci super().__init__(extra) 167db96d56Sopenharmony_ci self._closed = False 177db96d56Sopenharmony_ci self._protocol = protocol 187db96d56Sopenharmony_ci self._loop = loop 197db96d56Sopenharmony_ci self._proc = None 207db96d56Sopenharmony_ci self._pid = None 217db96d56Sopenharmony_ci self._returncode = None 227db96d56Sopenharmony_ci self._exit_waiters = [] 237db96d56Sopenharmony_ci self._pending_calls = collections.deque() 247db96d56Sopenharmony_ci self._pipes = {} 257db96d56Sopenharmony_ci self._finished = False 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ci if stdin == subprocess.PIPE: 287db96d56Sopenharmony_ci self._pipes[0] = None 297db96d56Sopenharmony_ci if stdout == subprocess.PIPE: 307db96d56Sopenharmony_ci self._pipes[1] = None 317db96d56Sopenharmony_ci if stderr == subprocess.PIPE: 327db96d56Sopenharmony_ci self._pipes[2] = None 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ci # Create the child process: set the _proc attribute 357db96d56Sopenharmony_ci try: 367db96d56Sopenharmony_ci self._start(args=args, shell=shell, stdin=stdin, stdout=stdout, 377db96d56Sopenharmony_ci stderr=stderr, bufsize=bufsize, **kwargs) 387db96d56Sopenharmony_ci except: 397db96d56Sopenharmony_ci self.close() 407db96d56Sopenharmony_ci raise 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ci self._pid = self._proc.pid 437db96d56Sopenharmony_ci self._extra['subprocess'] = self._proc 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci if self._loop.get_debug(): 467db96d56Sopenharmony_ci if isinstance(args, (bytes, str)): 477db96d56Sopenharmony_ci program = args 487db96d56Sopenharmony_ci else: 497db96d56Sopenharmony_ci program = args[0] 507db96d56Sopenharmony_ci logger.debug('process %r created: pid %s', 517db96d56Sopenharmony_ci program, self._pid) 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ci self._loop.create_task(self._connect_pipes(waiter)) 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ci def __repr__(self): 567db96d56Sopenharmony_ci info = [self.__class__.__name__] 577db96d56Sopenharmony_ci if self._closed: 587db96d56Sopenharmony_ci info.append('closed') 597db96d56Sopenharmony_ci if self._pid is not None: 607db96d56Sopenharmony_ci info.append(f'pid={self._pid}') 617db96d56Sopenharmony_ci if self._returncode is not None: 627db96d56Sopenharmony_ci info.append(f'returncode={self._returncode}') 637db96d56Sopenharmony_ci elif self._pid is not None: 647db96d56Sopenharmony_ci info.append('running') 657db96d56Sopenharmony_ci else: 667db96d56Sopenharmony_ci info.append('not started') 677db96d56Sopenharmony_ci 687db96d56Sopenharmony_ci stdin = self._pipes.get(0) 697db96d56Sopenharmony_ci if stdin is not None: 707db96d56Sopenharmony_ci info.append(f'stdin={stdin.pipe}') 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci stdout = self._pipes.get(1) 737db96d56Sopenharmony_ci stderr = self._pipes.get(2) 747db96d56Sopenharmony_ci if stdout is not None and stderr is stdout: 757db96d56Sopenharmony_ci info.append(f'stdout=stderr={stdout.pipe}') 767db96d56Sopenharmony_ci else: 777db96d56Sopenharmony_ci if stdout is not None: 787db96d56Sopenharmony_ci info.append(f'stdout={stdout.pipe}') 797db96d56Sopenharmony_ci if stderr is not None: 807db96d56Sopenharmony_ci info.append(f'stderr={stderr.pipe}') 817db96d56Sopenharmony_ci 827db96d56Sopenharmony_ci return '<{}>'.format(' '.join(info)) 837db96d56Sopenharmony_ci 847db96d56Sopenharmony_ci def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 857db96d56Sopenharmony_ci raise NotImplementedError 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ci def set_protocol(self, protocol): 887db96d56Sopenharmony_ci self._protocol = protocol 897db96d56Sopenharmony_ci 907db96d56Sopenharmony_ci def get_protocol(self): 917db96d56Sopenharmony_ci return self._protocol 927db96d56Sopenharmony_ci 937db96d56Sopenharmony_ci def is_closing(self): 947db96d56Sopenharmony_ci return self._closed 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_ci def close(self): 977db96d56Sopenharmony_ci if self._closed: 987db96d56Sopenharmony_ci return 997db96d56Sopenharmony_ci self._closed = True 1007db96d56Sopenharmony_ci 1017db96d56Sopenharmony_ci for proto in self._pipes.values(): 1027db96d56Sopenharmony_ci if proto is None: 1037db96d56Sopenharmony_ci continue 1047db96d56Sopenharmony_ci proto.pipe.close() 1057db96d56Sopenharmony_ci 1067db96d56Sopenharmony_ci if (self._proc is not None and 1077db96d56Sopenharmony_ci # has the child process finished? 1087db96d56Sopenharmony_ci self._returncode is None and 1097db96d56Sopenharmony_ci # the child process has finished, but the 1107db96d56Sopenharmony_ci # transport hasn't been notified yet? 1117db96d56Sopenharmony_ci self._proc.poll() is None): 1127db96d56Sopenharmony_ci 1137db96d56Sopenharmony_ci if self._loop.get_debug(): 1147db96d56Sopenharmony_ci logger.warning('Close running child process: kill %r', self) 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci try: 1177db96d56Sopenharmony_ci self._proc.kill() 1187db96d56Sopenharmony_ci except ProcessLookupError: 1197db96d56Sopenharmony_ci pass 1207db96d56Sopenharmony_ci 1217db96d56Sopenharmony_ci # Don't clear the _proc reference yet: _post_init() may still run 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ci def __del__(self, _warn=warnings.warn): 1247db96d56Sopenharmony_ci if not self._closed: 1257db96d56Sopenharmony_ci _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 1267db96d56Sopenharmony_ci self.close() 1277db96d56Sopenharmony_ci 1287db96d56Sopenharmony_ci def get_pid(self): 1297db96d56Sopenharmony_ci return self._pid 1307db96d56Sopenharmony_ci 1317db96d56Sopenharmony_ci def get_returncode(self): 1327db96d56Sopenharmony_ci return self._returncode 1337db96d56Sopenharmony_ci 1347db96d56Sopenharmony_ci def get_pipe_transport(self, fd): 1357db96d56Sopenharmony_ci if fd in self._pipes: 1367db96d56Sopenharmony_ci return self._pipes[fd].pipe 1377db96d56Sopenharmony_ci else: 1387db96d56Sopenharmony_ci return None 1397db96d56Sopenharmony_ci 1407db96d56Sopenharmony_ci def _check_proc(self): 1417db96d56Sopenharmony_ci if self._proc is None: 1427db96d56Sopenharmony_ci raise ProcessLookupError() 1437db96d56Sopenharmony_ci 1447db96d56Sopenharmony_ci def send_signal(self, signal): 1457db96d56Sopenharmony_ci self._check_proc() 1467db96d56Sopenharmony_ci self._proc.send_signal(signal) 1477db96d56Sopenharmony_ci 1487db96d56Sopenharmony_ci def terminate(self): 1497db96d56Sopenharmony_ci self._check_proc() 1507db96d56Sopenharmony_ci self._proc.terminate() 1517db96d56Sopenharmony_ci 1527db96d56Sopenharmony_ci def kill(self): 1537db96d56Sopenharmony_ci self._check_proc() 1547db96d56Sopenharmony_ci self._proc.kill() 1557db96d56Sopenharmony_ci 1567db96d56Sopenharmony_ci async def _connect_pipes(self, waiter): 1577db96d56Sopenharmony_ci try: 1587db96d56Sopenharmony_ci proc = self._proc 1597db96d56Sopenharmony_ci loop = self._loop 1607db96d56Sopenharmony_ci 1617db96d56Sopenharmony_ci if proc.stdin is not None: 1627db96d56Sopenharmony_ci _, pipe = await loop.connect_write_pipe( 1637db96d56Sopenharmony_ci lambda: WriteSubprocessPipeProto(self, 0), 1647db96d56Sopenharmony_ci proc.stdin) 1657db96d56Sopenharmony_ci self._pipes[0] = pipe 1667db96d56Sopenharmony_ci 1677db96d56Sopenharmony_ci if proc.stdout is not None: 1687db96d56Sopenharmony_ci _, pipe = await loop.connect_read_pipe( 1697db96d56Sopenharmony_ci lambda: ReadSubprocessPipeProto(self, 1), 1707db96d56Sopenharmony_ci proc.stdout) 1717db96d56Sopenharmony_ci self._pipes[1] = pipe 1727db96d56Sopenharmony_ci 1737db96d56Sopenharmony_ci if proc.stderr is not None: 1747db96d56Sopenharmony_ci _, pipe = await loop.connect_read_pipe( 1757db96d56Sopenharmony_ci lambda: ReadSubprocessPipeProto(self, 2), 1767db96d56Sopenharmony_ci proc.stderr) 1777db96d56Sopenharmony_ci self._pipes[2] = pipe 1787db96d56Sopenharmony_ci 1797db96d56Sopenharmony_ci assert self._pending_calls is not None 1807db96d56Sopenharmony_ci 1817db96d56Sopenharmony_ci loop.call_soon(self._protocol.connection_made, self) 1827db96d56Sopenharmony_ci for callback, data in self._pending_calls: 1837db96d56Sopenharmony_ci loop.call_soon(callback, *data) 1847db96d56Sopenharmony_ci self._pending_calls = None 1857db96d56Sopenharmony_ci except (SystemExit, KeyboardInterrupt): 1867db96d56Sopenharmony_ci raise 1877db96d56Sopenharmony_ci except BaseException as exc: 1887db96d56Sopenharmony_ci if waiter is not None and not waiter.cancelled(): 1897db96d56Sopenharmony_ci waiter.set_exception(exc) 1907db96d56Sopenharmony_ci else: 1917db96d56Sopenharmony_ci if waiter is not None and not waiter.cancelled(): 1927db96d56Sopenharmony_ci waiter.set_result(None) 1937db96d56Sopenharmony_ci 1947db96d56Sopenharmony_ci def _call(self, cb, *data): 1957db96d56Sopenharmony_ci if self._pending_calls is not None: 1967db96d56Sopenharmony_ci self._pending_calls.append((cb, data)) 1977db96d56Sopenharmony_ci else: 1987db96d56Sopenharmony_ci self._loop.call_soon(cb, *data) 1997db96d56Sopenharmony_ci 2007db96d56Sopenharmony_ci def _pipe_connection_lost(self, fd, exc): 2017db96d56Sopenharmony_ci self._call(self._protocol.pipe_connection_lost, fd, exc) 2027db96d56Sopenharmony_ci self._try_finish() 2037db96d56Sopenharmony_ci 2047db96d56Sopenharmony_ci def _pipe_data_received(self, fd, data): 2057db96d56Sopenharmony_ci self._call(self._protocol.pipe_data_received, fd, data) 2067db96d56Sopenharmony_ci 2077db96d56Sopenharmony_ci def _process_exited(self, returncode): 2087db96d56Sopenharmony_ci assert returncode is not None, returncode 2097db96d56Sopenharmony_ci assert self._returncode is None, self._returncode 2107db96d56Sopenharmony_ci if self._loop.get_debug(): 2117db96d56Sopenharmony_ci logger.info('%r exited with return code %r', self, returncode) 2127db96d56Sopenharmony_ci self._returncode = returncode 2137db96d56Sopenharmony_ci if self._proc.returncode is None: 2147db96d56Sopenharmony_ci # asyncio uses a child watcher: copy the status into the Popen 2157db96d56Sopenharmony_ci # object. On Python 3.6, it is required to avoid a ResourceWarning. 2167db96d56Sopenharmony_ci self._proc.returncode = returncode 2177db96d56Sopenharmony_ci self._call(self._protocol.process_exited) 2187db96d56Sopenharmony_ci 2197db96d56Sopenharmony_ci self._try_finish() 2207db96d56Sopenharmony_ci 2217db96d56Sopenharmony_ci async def _wait(self): 2227db96d56Sopenharmony_ci """Wait until the process exit and return the process return code. 2237db96d56Sopenharmony_ci 2247db96d56Sopenharmony_ci This method is a coroutine.""" 2257db96d56Sopenharmony_ci if self._returncode is not None: 2267db96d56Sopenharmony_ci return self._returncode 2277db96d56Sopenharmony_ci 2287db96d56Sopenharmony_ci waiter = self._loop.create_future() 2297db96d56Sopenharmony_ci self._exit_waiters.append(waiter) 2307db96d56Sopenharmony_ci return await waiter 2317db96d56Sopenharmony_ci 2327db96d56Sopenharmony_ci def _try_finish(self): 2337db96d56Sopenharmony_ci assert not self._finished 2347db96d56Sopenharmony_ci if self._returncode is None: 2357db96d56Sopenharmony_ci return 2367db96d56Sopenharmony_ci if all(p is not None and p.disconnected 2377db96d56Sopenharmony_ci for p in self._pipes.values()): 2387db96d56Sopenharmony_ci self._finished = True 2397db96d56Sopenharmony_ci self._call(self._call_connection_lost, None) 2407db96d56Sopenharmony_ci 2417db96d56Sopenharmony_ci def _call_connection_lost(self, exc): 2427db96d56Sopenharmony_ci try: 2437db96d56Sopenharmony_ci self._protocol.connection_lost(exc) 2447db96d56Sopenharmony_ci finally: 2457db96d56Sopenharmony_ci # wake up futures waiting for wait() 2467db96d56Sopenharmony_ci for waiter in self._exit_waiters: 2477db96d56Sopenharmony_ci if not waiter.cancelled(): 2487db96d56Sopenharmony_ci waiter.set_result(self._returncode) 2497db96d56Sopenharmony_ci self._exit_waiters = None 2507db96d56Sopenharmony_ci self._loop = None 2517db96d56Sopenharmony_ci self._proc = None 2527db96d56Sopenharmony_ci self._protocol = None 2537db96d56Sopenharmony_ci 2547db96d56Sopenharmony_ci 2557db96d56Sopenharmony_ciclass WriteSubprocessPipeProto(protocols.BaseProtocol): 2567db96d56Sopenharmony_ci 2577db96d56Sopenharmony_ci def __init__(self, proc, fd): 2587db96d56Sopenharmony_ci self.proc = proc 2597db96d56Sopenharmony_ci self.fd = fd 2607db96d56Sopenharmony_ci self.pipe = None 2617db96d56Sopenharmony_ci self.disconnected = False 2627db96d56Sopenharmony_ci 2637db96d56Sopenharmony_ci def connection_made(self, transport): 2647db96d56Sopenharmony_ci self.pipe = transport 2657db96d56Sopenharmony_ci 2667db96d56Sopenharmony_ci def __repr__(self): 2677db96d56Sopenharmony_ci return f'<{self.__class__.__name__} fd={self.fd} pipe={self.pipe!r}>' 2687db96d56Sopenharmony_ci 2697db96d56Sopenharmony_ci def connection_lost(self, exc): 2707db96d56Sopenharmony_ci self.disconnected = True 2717db96d56Sopenharmony_ci self.proc._pipe_connection_lost(self.fd, exc) 2727db96d56Sopenharmony_ci self.proc = None 2737db96d56Sopenharmony_ci 2747db96d56Sopenharmony_ci def pause_writing(self): 2757db96d56Sopenharmony_ci self.proc._protocol.pause_writing() 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci def resume_writing(self): 2787db96d56Sopenharmony_ci self.proc._protocol.resume_writing() 2797db96d56Sopenharmony_ci 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ciclass ReadSubprocessPipeProto(WriteSubprocessPipeProto, 2827db96d56Sopenharmony_ci protocols.Protocol): 2837db96d56Sopenharmony_ci 2847db96d56Sopenharmony_ci def data_received(self, data): 2857db96d56Sopenharmony_ci self.proc._pipe_data_received(self.fd, data) 286