Lines Matching refs:self
63 def __init__(self, selector=None):
65 self._signal_handlers = {}
67 def close(self):
70 for sig in list(self._signal_handlers):
71 self.remove_signal_handler(sig)
73 if self._signal_handlers:
74 warnings.warn(f"Closing the loop {self!r} "
78 source=self)
79 self._signal_handlers.clear()
81 def _process_self_data(self, data):
86 self._handle_signal(signum)
88 def add_signal_handler(self, sig, callback, *args):
98 self._check_signal(sig)
99 self._check_closed()
105 signal.set_wakeup_fd(self._csock.fileno())
109 handle = events.Handle(callback, args, self, None)
110 self._signal_handlers[sig] = handle
121 del self._signal_handlers[sig]
122 if not self._signal_handlers:
133 def _handle_signal(self, sig):
135 handle = self._signal_handlers.get(sig)
139 self.remove_signal_handler(sig) # Remove it properly.
141 self._add_callback_signalsafe(handle)
143 def remove_signal_handler(self, sig):
148 self._check_signal(sig)
150 del self._signal_handlers[sig]
167 if not self._signal_handlers:
175 def _check_signal(self, sig):
187 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
189 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
191 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
193 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
195 async def _make_subprocess_transport(self, protocol, args, shell,
206 waiter = self.create_future()
207 transp = _UnixSubprocessTransport(self, protocol, args, shell,
213 self._child_watcher_callback, transp)
225 def _child_watcher_callback(self, pid, returncode, transp):
227 self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode)
230 self, protocol_factory, path=None, *,
259 await self.sock_connect(sock, path)
273 transport, protocol = await self._create_connection_transport(
280 self, protocol_factory, path=None, *,
341 server = base_events.Server(self, [sock], protocol_factory,
352 async def _sock_sendfile_native(self, sock, file, offset, count):
370 fut = self.create_future()
371 self._sock_sendfile_native_impl(fut, None, sock, fileno,
375 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
383 self.remove_writer(registered_fd)
385 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
390 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
398 self._sock_add_cancellation_callback(fut, sock)
399 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
421 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
424 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
429 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
434 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
440 self._sock_add_cancellation_callback(fut, sock)
441 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
445 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
449 def _sock_add_cancellation_callback(self, fut, sock):
454 self.remove_writer(fd)
462 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
464 self._extra['pipe'] = pipe
465 self._loop = loop
466 self._pipe = pipe
467 self._fileno = pipe.fileno()
468 self._protocol = protocol
469 self._closing = False
470 self._paused = False
472 mode = os.fstat(self._fileno).st_mode
476 self._pipe = None
477 self._fileno = None
478 self._protocol = None
481 os.set_blocking(self._fileno, False)
483 self._loop.call_soon(self._protocol.connection_made, self)
485 self._loop.call_soon(self._add_reader,
486 self._fileno, self._read_ready)
489 self._loop.call_soon(futures._set_result_unless_cancelled,
492 def _add_reader(self, fd, callback):
493 if not self.is_reading():
495 self._loop._add_reader(fd, callback)
497 def is_reading(self):
498 return not self._paused and not self._closing
500 def __repr__(self):
501 info = [self.__class__.__name__]
502 if self._pipe is None:
504 elif self._closing:
506 info.append(f'fd={self._fileno}')
507 selector = getattr(self._loop, '_selector', None)
508 if self._pipe is not None and selector is not None:
510 selector, self._fileno, selectors.EVENT_READ)
515 elif self._pipe is not None:
521 def _read_ready(self):
523 data = os.read(self._fileno, self.max_size)
527 self._fatal_error(exc, 'Fatal read error on pipe transport')
530 self._protocol.data_received(data)
532 if self._loop.get_debug():
533 logger.info("%r was closed by peer", self)
534 self._closing = True
535 self._loop._remove_reader(self._fileno)
536 self._loop.call_soon(self._protocol.eof_received)
537 self._loop.call_soon(self._call_connection_lost, None)
539 def pause_reading(self):
540 if not self.is_reading():
542 self._paused = True
543 self._loop._remove_reader(self._fileno)
544 if self._loop.get_debug():
545 logger.debug("%r pauses reading", self)
547 def resume_reading(self):
548 if self._closing or not self._paused:
550 self._paused = False
551 self._loop._add_reader(self._fileno, self._read_ready)
552 if self._loop.get_debug():
553 logger.debug("%r resumes reading", self)
555 def set_protocol(self, protocol):
556 self._protocol = protocol
558 def get_protocol(self):
559 return self._protocol
561 def is_closing(self):
562 return self._closing
564 def close(self):
565 if not self._closing:
566 self._close(None)
568 def __del__(self, _warn=warnings.warn):
569 if self._pipe is not None:
570 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
571 self._pipe.close()
573 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
576 if self._loop.get_debug():
577 logger.debug("%r: %s", self, message, exc_info=True)
579 self._loop.call_exception_handler({
582 'transport': self,
583 'protocol': self._protocol,
585 self._close(exc)
587 def _close(self, exc):
588 self._closing = True
589 self._loop._remove_reader(self._fileno)
590 self._loop.call_soon(self._call_connection_lost, exc)
592 def _call_connection_lost(self, exc):
594 self._protocol.connection_lost(exc)
596 self._pipe.close()
597 self._pipe = None
598 self._protocol = None
599 self._loop = None
605 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
607 self._extra['pipe'] = pipe
608 self._pipe = pipe
609 self._fileno = pipe.fileno()
610 self._protocol = protocol
611 self._buffer = bytearray()
612 self._conn_lost = 0
613 self._closing = False # Set when close() or write_eof() called.
615 mode = os.fstat(self._fileno).st_mode
620 self._pipe = None
621 self._fileno = None
622 self._protocol = None
626 os.set_blocking(self._fileno, False)
627 self._loop.call_soon(self._protocol.connection_made, self)
634 self._loop.call_soon(self._loop._add_reader,
635 self._fileno, self._read_ready)
639 self._loop.call_soon(futures._set_result_unless_cancelled,
642 def __repr__(self):
643 info = [self.__class__.__name__]
644 if self._pipe is None:
646 elif self._closing:
648 info.append(f'fd={self._fileno}')
649 selector = getattr(self._loop, '_selector', None)
650 if self._pipe is not None and selector is not None:
652 selector, self._fileno, selectors.EVENT_WRITE)
658 bufsize = self.get_write_buffer_size()
660 elif self._pipe is not None:
666 def get_write_buffer_size(self):
667 return len(self._buffer)
669 def _read_ready(self):
671 if self._loop.get_debug():
672 logger.info("%r was closed by peer", self)
673 if self._buffer:
674 self._close(BrokenPipeError())
676 self._close()
678 def write(self, data):
685 if self._conn_lost or self._closing:
686 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
689 self._conn_lost += 1
692 if not self._buffer:
695 n = os.write(self._fileno, data)
701 self._conn_lost += 1
702 self._fatal_error(exc, 'Fatal write error on pipe transport')
708 self._loop._add_writer(self._fileno, self._write_ready)
710 self._buffer += data
711 self._maybe_pause_protocol()
713 def _write_ready(self):
714 assert self._buffer, 'Data should not be empty'
717 n = os.write(self._fileno, self._buffer)
723 self._buffer.clear()
724 self._conn_lost += 1
727 self._loop._remove_writer(self._fileno)
728 self._fatal_error(exc, 'Fatal write error on pipe transport')
730 if n == len(self._buffer):
731 self._buffer.clear()
732 self._loop._remove_writer(self._fileno)
733 self._maybe_resume_protocol() # May append to buffer.
734 if self._closing:
735 self._loop._remove_reader(self._fileno)
736 self._call_connection_lost(None)
739 del self._buffer[:n]
741 def can_write_eof(self):
744 def write_eof(self):
745 if self._closing:
747 assert self._pipe
748 self._closing = True
749 if not self._buffer:
750 self._loop._remove_reader(self._fileno)
751 self._loop.call_soon(self._call_connection_lost, None)
753 def set_protocol(self, protocol):
754 self._protocol = protocol
756 def get_protocol(self):
757 return self._protocol
759 def is_closing(self):
760 return self._closing
762 def close(self):
763 if self._pipe is not None and not self._closing:
765 self.write_eof()
767 def __del__(self, _warn=warnings.warn):
768 if self._pipe is not None:
769 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
770 self._pipe.close()
772 def abort(self):
773 self._close(None)
775 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
778 if self._loop.get_debug():
779 logger.debug("%r: %s", self, message, exc_info=True)
781 self._loop.call_exception_handler({
784 'transport': self,
785 'protocol': self._protocol,
787 self._close(exc)
789 def _close(self, exc=None):
790 self._closing = True
791 if self._buffer:
792 self._loop._remove_writer(self._fileno)
793 self._buffer.clear()
794 self._loop._remove_reader(self._fileno)
795 self._loop.call_soon(self._call_connection_lost, exc)
797 def _call_connection_lost(self, exc):
799 self._protocol.connection_lost(exc)
801 self._pipe.close()
802 self._pipe = None
803 self._protocol = None
804 self._loop = None
809 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
818 self._proc = subprocess.Popen(
823 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
854 def add_child_handler(self, pid, callback, *args):
865 def remove_child_handler(self, pid):
873 def attach_loop(self, loop):
883 def close(self):
890 def is_active(self):
899 def __enter__(self):
902 This function must return self"""
905 def __exit__(self, a, b, c):
922 def __init__(self):
923 self._loop = None
924 self._callbacks = {}
926 def __enter__(self):
927 return self
929 def __exit__(self, exc_type, exc_value, exc_traceback):
932 def is_active(self):
933 return self._loop is not None and self._loop.is_running()
935 def close(self):
936 self.attach_loop(None)
938 def attach_loop(self, loop):
939 if self._loop is not None and loop is None and self._callbacks:
944 for pidfd, _, _ in self._callbacks.values():
945 self._loop._remove_reader(pidfd)
947 self._callbacks.clear()
948 self._loop = loop
950 def add_child_handler(self, pid, callback, *args):
951 existing = self._callbacks.get(pid)
953 self._callbacks[pid] = existing[0], callback, args
956 self._loop._add_reader(pidfd, self._do_wait, pid)
957 self._callbacks[pid] = pidfd, callback, args
959 def _do_wait(self, pid):
960 pidfd, callback, args = self._callbacks.pop(pid)
961 self._loop._remove_reader(pidfd)
978 def remove_child_handler(self, pid):
980 pidfd, _, _ = self._callbacks.pop(pid)
983 self._loop._remove_reader(pidfd)
990 def __init__(self):
991 self._loop = None
992 self._callbacks = {}
994 def close(self):
995 self.attach_loop(None)
997 def is_active(self):
998 return self._loop is not None and self._loop.is_running()
1000 def _do_waitpid(self, expected_pid):
1003 def _do_waitpid_all(self):
1006 def attach_loop(self, loop):
1009 if self._loop is not None and loop is None and self._callbacks:
1015 if self._loop is not None:
1016 self._loop.remove_signal_handler(signal.SIGCHLD)
1018 self._loop = loop
1020 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
1024 self._do_waitpid_all()
1026 def _sig_chld(self):
1028 self._do_waitpid_all()
1032 # self._loop should always be available here
1035 self._loop.call_exception_handler({
1052 def close(self):
1053 self._callbacks.clear()
1056 def __enter__(self):
1057 return self
1059 def __exit__(self, a, b, c):
1062 def add_child_handler(self, pid, callback, *args):
1063 self._callbacks[pid] = (callback, args)
1066 self._do_waitpid(pid)
1068 def remove_child_handler(self, pid):
1070 del self._callbacks[pid]
1075 def _do_waitpid_all(self):
1077 for pid in list(self._callbacks):
1078 self._do_waitpid(pid)
1080 def _do_waitpid(self, expected_pid):
1099 if self._loop.get_debug():
1104 callback, args = self._callbacks.pop(pid)
1108 if self._loop.get_debug():
1125 def __init__(self):
1127 self._lock = threading.Lock()
1128 self._zombies = {}
1129 self._forks = 0
1131 def close(self):
1132 self._callbacks.clear()
1133 self._zombies.clear()
1136 def __enter__(self):
1137 with self._lock:
1138 self._forks += 1
1140 return self
1142 def __exit__(self, a, b, c):
1143 with self._lock:
1144 self._forks -= 1
1146 if self._forks or not self._zombies:
1149 collateral_victims = str(self._zombies)
1150 self._zombies.clear()
1156 def add_child_handler(self, pid, callback, *args):
1157 assert self._forks, "Must use the context manager"
1159 with self._lock:
1161 returncode = self._zombies.pop(pid)
1164 self._callbacks[pid] = callback, args
1170 def remove_child_handler(self, pid):
1172 del self._callbacks[pid]
1177 def _do_waitpid_all(self):
1193 with self._lock:
1195 callback, args = self._callbacks.pop(pid)
1198 if self._forks:
1200 self._zombies[pid] = returncode
1201 if self._loop.get_debug():
1208 if self._loop.get_debug():
1239 def __init__(self):
1240 self._callbacks = {}
1241 self._saved_sighandler = None
1243 def is_active(self):
1244 return self._saved_sighandler is not None
1246 def close(self):
1247 self._callbacks.clear()
1248 if self._saved_sighandler is None:
1252 if handler != self._sig_chld:
1255 signal.signal(signal.SIGCHLD, self._saved_sighandler)
1256 self._saved_sighandler = None
1258 def __enter__(self):
1259 return self
1261 def __exit__(self, exc_type, exc_val, exc_tb):
1264 def add_child_handler(self, pid, callback, *args):
1266 self._callbacks[pid] = (loop, callback, args)
1269 self._do_waitpid(pid)
1271 def remove_child_handler(self, pid):
1273 del self._callbacks[pid]
1278 def attach_loop(self, loop):
1283 if self._saved_sighandler is not None:
1286 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1287 if self._saved_sighandler is None:
1290 self._saved_sighandler = signal.SIG_DFL
1295 def _do_waitpid_all(self):
1296 for pid in list(self._callbacks):
1297 self._do_waitpid(pid)
1299 def _do_waitpid(self, expected_pid):
1321 loop, callback, args = self._callbacks.pop(pid)
1336 def _sig_chld(self, signum, frame):
1338 self._do_waitpid_all()
1358 def __init__(self):
1359 self._pid_counter = itertools.count(0)
1360 self._threads = {}
1362 def is_active(self):
1365 def close(self):
1366 self._join_threads()
1368 def _join_threads(self):
1370 threads = [thread for thread in list(self._threads.values())
1375 def __enter__(self):
1376 return self
1378 def __exit__(self, exc_type, exc_val, exc_tb):
1381 def __del__(self, _warn=warnings.warn):
1382 threads = [thread for thread in list(self._threads.values())
1385 _warn(f"{self.__class__} has registered but not finished child processes",
1387 source=self)
1389 def add_child_handler(self, pid, callback, *args):
1391 thread = threading.Thread(target=self._do_waitpid,
1392 name=f"waitpid-{next(self._pid_counter)}",
1395 self._threads[pid] = thread
1398 def remove_child_handler(self, pid):
1404 def attach_loop(self, loop):
1407 def _do_waitpid(self, loop, expected_pid, callback, args):
1431 self._threads.pop(expected_pid)
1438 def __init__(self):
1440 self._watcher = None
1442 def _init_watcher(self):
1444 if self._watcher is None: # pragma: no branch
1445 self._watcher = ThreadedChildWatcher()
1447 self._watcher.attach_loop(self._local._loop)
1449 def set_event_loop(self, loop):
1459 if (self._watcher is not None and
1461 self._watcher.attach_loop(loop)
1463 def get_child_watcher(self):
1468 if self._watcher is None:
1469 self._init_watcher()
1471 return self._watcher
1473 def set_child_watcher(self, watcher):
1478 if self._watcher is not None:
1479 self._watcher.close()
1481 self._watcher = watcher