Lines Matching refs:self
207 def __init__(self, transp):
210 self._transport = transp
211 self._proto = transp.get_protocol()
212 self._should_resume_reading = transp.is_reading()
213 self._should_resume_writing = transp._protocol_paused
215 transp.set_protocol(self)
216 if self._should_resume_writing:
217 self._write_ready_fut = self._transport._loop.create_future()
219 self._write_ready_fut = None
221 async def drain(self):
222 if self._transport.is_closing():
224 fut = self._write_ready_fut
229 def connection_made(self, transport):
233 def connection_lost(self, exc):
234 if self._write_ready_fut is not None:
238 self._write_ready_fut.set_exception(
241 self._write_ready_fut.set_exception(exc)
242 self._proto.connection_lost(exc)
244 def pause_writing(self):
245 if self._write_ready_fut is not None:
247 self._write_ready_fut = self._transport._loop.create_future()
249 def resume_writing(self):
250 if self._write_ready_fut is None:
252 self._write_ready_fut.set_result(False)
253 self._write_ready_fut = None
255 def data_received(self, data):
258 def eof_received(self):
261 async def restore(self):
262 self._transport.set_protocol(self._proto)
263 if self._should_resume_reading:
264 self._transport.resume_reading()
265 if self._write_ready_fut is not None:
269 self._write_ready_fut.cancel()
270 if self._should_resume_writing:
271 self._proto.resume_writing()
276 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
278 self._loop = loop
279 self._sockets = sockets
280 self._active_count = 0
281 self._waiters = []
282 self._protocol_factory = protocol_factory
283 self._backlog = backlog
284 self._ssl_context = ssl_context
285 self._ssl_handshake_timeout = ssl_handshake_timeout
286 self._ssl_shutdown_timeout = ssl_shutdown_timeout
287 self._serving = False
288 self._serving_forever_fut = None
290 def __repr__(self):
291 return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
293 def _attach(self):
294 assert self._sockets is not None
295 self._active_count += 1
297 def _detach(self):
298 assert self._active_count > 0
299 self._active_count -= 1
300 if self._active_count == 0 and self._sockets is None:
301 self._wakeup()
303 def _wakeup(self):
304 waiters = self._waiters
305 self._waiters = None
310 def _start_serving(self):
311 if self._serving:
313 self._serving = True
314 for sock in self._sockets:
315 sock.listen(self._backlog)
316 self._loop._start_serving(
317 self._protocol_factory, sock, self._ssl_context,
318 self, self._backlog, self._ssl_handshake_timeout,
319 self._ssl_shutdown_timeout)
321 def get_loop(self):
322 return self._loop
324 def is_serving(self):
325 return self._serving
328 def sockets(self):
329 if self._sockets is None:
331 return tuple(trsock.TransportSocket(s) for s in self._sockets)
333 def close(self):
334 sockets = self._sockets
337 self._sockets = None
340 self._loop._stop_serving(sock)
342 self._serving = False
344 if (self._serving_forever_fut is not None and
345 not self._serving_forever_fut.done()):
346 self._serving_forever_fut.cancel()
347 self._serving_forever_fut = None
349 if self._active_count == 0:
350 self._wakeup()
352 async def start_serving(self):
353 self._start_serving()
358 async def serve_forever(self):
359 if self._serving_forever_fut is not None:
361 f'server {self!r} is already being awaited on serve_forever()')
362 if self._sockets is None:
363 raise RuntimeError(f'server {self!r} is closed')
365 self._start_serving()
366 self._serving_forever_fut = self._loop.create_future()
369 await self._serving_forever_fut
372 self.close()
373 await self.wait_closed()
377 self._serving_forever_fut = None
379 async def wait_closed(self):
380 if self._sockets is None or self._waiters is None:
382 waiter = self._loop.create_future()
383 self._waiters.append(waiter)
389 def __init__(self):
390 self._timer_cancelled_count = 0
391 self._closed = False
392 self._stopping = False
393 self._ready = collections.deque()
394 self._scheduled = []
395 self._default_executor = None
396 self._internal_fds = 0
399 self._thread_id = None
400 self._clock_resolution = time.get_clock_info('monotonic').resolution
401 self._exception_handler = None
402 self.set_debug(coroutines._is_debug_mode())
405 self.slow_callback_duration = 0.1
406 self._current_handle = None
407 self._task_factory = None
408 self._coroutine_origin_tracking_enabled = False
409 self._coroutine_origin_tracking_saved_depth = None
413 self._asyncgens = weakref.WeakSet()
415 self._asyncgens_shutdown_called = False
417 self._executor_shutdown_called = False
419 def __repr__(self):
421 f'<{self.__class__.__name__} running={self.is_running()} '
422 f'closed={self.is_closed()} debug={self.get_debug()}>'
425 def create_future(self):
427 return futures.Future(loop=self)
429 def create_task(self, coro, *, name=None, context=None):
434 self._check_closed()
435 if self._task_factory is None:
436 task = tasks.Task(coro, loop=self, name=name, context=context)
442 task = self._task_factory(self, coro)
444 task = self._task_factory(self, coro, context=context)
450 def set_task_factory(self, factory):
462 self._task_factory = factory
464 def get_task_factory(self):
466 return self._task_factory
468 def _make_socket_transport(self, sock, protocol, waiter=None, *,
474 self, rawsock, protocol, sslcontext, waiter=None,
483 def _make_datagram_transport(self, sock, protocol,
488 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
493 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
498 async def _make_subprocess_transport(self, protocol, args, shell,
504 def _write_to_self(self):
505 """Write a byte to self-pipe, to wake up the event loop.
509 The subclass is responsible for implementing the self-pipe.
513 def _process_events(self, event_list):
517 def _check_closed(self):
518 if self._closed:
521 def _check_default_executor(self):
522 if self._executor_shutdown_called:
525 def _asyncgen_finalizer_hook(self, agen):
526 self._asyncgens.discard(agen)
527 if not self.is_closed():
528 self.call_soon_threadsafe(self.create_task, agen.aclose())
530 def _asyncgen_firstiter_hook(self, agen):
531 if self._asyncgens_shutdown_called:
535 ResourceWarning, source=self)
537 self._asyncgens.add(agen)
539 async def shutdown_asyncgens(self):
541 self._asyncgens_shutdown_called = True
543 if not len(self._asyncgens):
548 closing_agens = list(self._asyncgens)
549 self._asyncgens.clear()
557 self.call_exception_handler({
564 async def shutdown_default_executor(self):
566 self._executor_shutdown_called = True
567 if self._default_executor is None:
569 future = self.create_future()
570 thread = threading.Thread(target=self._do_shutdown, args=(future,))
577 def _do_shutdown(self, future):
579 self._default_executor.shutdown(wait=True)
580 if not self.is_closed():
581 self.call_soon_threadsafe(future.set_result, None)
583 if not self.is_closed():
584 self.call_soon_threadsafe(future.set_exception, ex)
586 def _check_running(self):
587 if self.is_running():
593 def run_forever(self):
595 self._check_closed()
596 self._check_running()
597 self._set_coroutine_origin_tracking(self._debug)
601 self._thread_id = threading.get_ident()
602 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
603 finalizer=self._asyncgen_finalizer_hook)
605 events._set_running_loop(self)
607 self._run_once()
608 if self._stopping:
611 self._stopping = False
612 self._thread_id = None
614 self._set_coroutine_origin_tracking(False)
617 def run_until_complete(self, future):
628 self._check_closed()
629 self._check_running()
632 future = tasks.ensure_future(future, loop=self)
640 self.run_forever()
655 def stop(self):
661 self._stopping = True
663 def close(self):
671 if self.is_running():
673 if self._closed:
675 if self._debug:
676 logger.debug("Close %r", self)
677 self._closed = True
678 self._ready.clear()
679 self._scheduled.clear()
680 self._executor_shutdown_called = True
681 executor = self._default_executor
683 self._default_executor = None
686 def is_closed(self):
688 return self._closed
690 def __del__(self, _warn=warnings.warn):
691 if not self.is_closed():
692 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
693 if not self.is_running():
694 self.close()
696 def is_running(self):
698 return (self._thread_id is not None)
700 def time(self):
709 def call_later(self, delay, callback, *args, context=None):
727 timer = self.call_at(self.time() + delay, callback, *args,
733 def call_at(self, when, callback, *args, context=None):
740 self._check_closed()
741 if self._debug:
742 self._check_thread()
743 self._check_callback(callback, 'call_at')
744 timer = events.TimerHandle(when, callback, args, self, context)
747 heapq.heappush(self._scheduled, timer)
751 def call_soon(self, callback, *args, context=None):
761 self._check_closed()
762 if self._debug:
763 self._check_thread()
764 self._check_callback(callback, 'call_soon')
765 handle = self._call_soon(callback, args, context)
770 def _check_callback(self, callback, method):
780 def _call_soon(self, callback, args, context):
781 handle = events.Handle(callback, args, self, context)
784 self._ready.append(handle)
787 def _check_thread(self):
793 Should only be called when (self._debug == True). The caller is
796 if self._thread_id is None:
799 if thread_id != self._thread_id:
804 def call_soon_threadsafe(self, callback, *args, context=None):
806 self._check_closed()
807 if self._debug:
808 self._check_callback(callback, 'call_soon_threadsafe')
809 handle = self._call_soon(callback, args, context)
812 self._write_to_self()
815 def run_in_executor(self, executor, func, *args):
816 self._check_closed()
817 if self._debug:
818 self._check_callback(func, 'run_in_executor')
820 executor = self._default_executor
822 self._check_default_executor()
827 self._default_executor = executor
829 executor.submit(func, *args), loop=self)
831 def set_default_executor(self, executor):
834 self._default_executor = executor
836 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
849 t0 = self.time()
851 dt = self.time() - t0
854 if dt >= self.slow_callback_duration:
860 async def getaddrinfo(self, host, port, *,
862 if self._debug:
863 getaddr_func = self._getaddrinfo_debug
867 return await self.run_in_executor(
870 async def getnameinfo(self, sockaddr, flags=0):
871 return await self.run_in_executor(
874 async def sock_sendfile(self, sock, file, offset=0, count=None,
876 if self._debug and sock.gettimeout() != 0:
879 self._check_sendfile_params(sock, file, offset, count)
881 return await self._sock_sendfile_native(sock, file,
886 return await self._sock_sendfile_fallback(sock, file,
889 async def _sock_sendfile_native(self, sock, file, offset, count):
896 async def _sock_sendfile_fallback(self, sock, file, offset, count):
912 read = await self.run_in_executor(None, file.readinto, view)
915 await self.sock_sendall(sock, view[:read])
922 def _check_sendfile_params(self, sock, file, offset, count):
943 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
973 await self.sock_connect(sock, address)
988 self, protocol_factory, host=None, port=None,
1045 infos = await self._ensure_resolved(
1047 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1052 laddr_infos = await self._ensure_resolved(
1055 flags=flags, loop=self)
1069 sock = await self._connect_sock(
1076 (functools.partial(self._connect_sock,
1079 happy_eyeballs_delay, loop=self)
1112 transport, protocol = await self._create_connection_transport(
1116 if self._debug:
1125 self, sock, protocol_factory, ssl,
1133 waiter = self.create_future()
1136 transport = self._make_ssl_transport(
1142 transport = self._make_socket_transport(sock, protocol, waiter)
1152 async def sendfile(self, transport, file, offset=0, count=None,
1185 return await self._sendfile_native(transport, file,
1196 return await self._sendfile_fallback(transport, file,
1199 async def _sendfile_native(self, transp, file, offset, count):
1203 async def _sendfile_fallback(self, transp, file, offset, count):
1217 read = await self.run_in_executor(None, file.readinto, view)
1228 async def start_tls(self, transport, protocol, sslcontext, *,
1250 waiter = self.create_future()
1252 self, protocol, sslcontext, waiter,
1263 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1264 resume_cb = self.call_soon(transport.resume_reading)
1276 async def create_datagram_endpoint(self, protocol_factory,
1332 infos = await self._ensure_resolved(
1334 proto=proto, flags=flags, loop=self)
1373 await self.sock_connect(sock, remote_address)
1389 waiter = self.create_future()
1390 transport = self._make_datagram_transport(
1392 if self._debug:
1410 async def _ensure_resolved(self, address, *,
1422 async def _create_server_getaddrinfo(self, host, port, family, flags):
1423 infos = await self._ensure_resolved((host, port), family=family,
1425 flags=flags, loop=self)
1431 self, protocol_factory, host=None, port=None,
1488 fs = [self._create_server_getaddrinfo(host, port, family=family,
1502 if self._debug:
1543 server = Server(self, sockets, protocol_factory,
1552 if self._debug:
1557 self, protocol_factory, sock,
1575 transport, protocol = await self._create_connection_transport(
1579 if self._debug:
1586 async def connect_read_pipe(self, protocol_factory, pipe):
1588 waiter = self.create_future()
1589 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1597 if self._debug:
1602 async def connect_write_pipe(self, protocol_factory, pipe):
1604 waiter = self.create_future()
1605 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1613 if self._debug:
1618 def _log_subprocess(self, msg, stdin, stdout, stderr):
1631 async def subprocess_shell(self, protocol_factory, cmd, *,
1656 if self._debug:
1660 self._log_subprocess(debug_log, stdin, stdout, stderr)
1661 transport = await self._make_subprocess_transport(
1663 if self._debug and debug_log is not None:
1667 async def subprocess_exec(self, protocol_factory, program, *args,
1689 if self._debug:
1693 self._log_subprocess(debug_log, stdin, stdout, stderr)
1694 transport = await self._make_subprocess_transport(
1697 if self._debug and debug_log is not None:
1701 def get_exception_handler(self):
1704 return self._exception_handler
1706 def set_exception_handler(self, handler):
1721 self._exception_handler = handler
1723 def default_exception_handler(self, context):
1749 self._current_handle is not None and
1750 self._current_handle._source_traceback):
1752 self._current_handle._source_traceback
1773 def call_exception_handler(self, context):
1795 if self._exception_handler is None:
1797 self.default_exception_handler(context)
1808 self._exception_handler(self, context)
1815 self.default_exception_handler({
1830 def _add_callback(self, handle):
1833 self._ready.append(handle)
1835 def _add_callback_signalsafe(self, handle):
1837 self._add_callback(handle)
1838 self._write_to_self()
1840 def _timer_handle_cancelled(self, handle):
1843 self._timer_cancelled_count += 1
1845 def _run_once(self):
1853 sched_count = len(self._scheduled)
1855 self._timer_cancelled_count / sched_count >
1860 for handle in self._scheduled:
1867 self._scheduled = new_scheduled
1868 self._timer_cancelled_count = 0
1871 while self._scheduled and self._scheduled[0]._cancelled:
1872 self._timer_cancelled_count -= 1
1873 handle = heapq.heappop(self._scheduled)
1877 if self._ready or self._stopping:
1879 elif self._scheduled:
1881 when = self._scheduled[0]._when
1882 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
1884 event_list = self._selector.select(timeout)
1885 self._process_events(event_list)
1890 end_time = self.time() + self._clock_resolution
1891 while self._scheduled:
1892 handle = self._scheduled[0]
1895 handle = heapq.heappop(self._scheduled)
1897 self._ready.append(handle)
1905 ntodo = len(self._ready)
1907 handle = self._ready.popleft()
1910 if self._debug:
1912 self._current_handle = handle
1913 t0 = self.time()
1915 dt = self.time() - t0
1916 if dt >= self.slow_callback_duration:
1920 self._current_handle = None
1925 def _set_coroutine_origin_tracking(self, enabled):
1926 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
1930 self._coroutine_origin_tracking_saved_depth = (
1936 self._coroutine_origin_tracking_saved_depth)
1938 self._coroutine_origin_tracking_enabled = enabled
1940 def get_debug(self):
1941 return self._debug
1943 def set_debug(self, enabled):
1944 self._debug = enabled
1946 if self.is_running():
1947 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)