Lines Matching refs:self
50 def __init__(self, loop, sock, protocol, waiter=None,
53 self._set_extra(sock)
54 self._sock = sock
55 self.set_protocol(protocol)
56 self._server = server
57 self._buffer = None # None or bytearray.
58 self._read_fut = None
59 self._write_fut = None
60 self._pending_write = 0
61 self._conn_lost = 0
62 self._closing = False # Set when close() called.
63 self._called_connection_lost = False
64 self._eof_written = False
65 if self._server is not None:
66 self._server._attach()
67 self._loop.call_soon(self._protocol.connection_made, self)
70 self._loop.call_soon(futures._set_result_unless_cancelled,
73 def __repr__(self):
74 info = [self.__class__.__name__]
75 if self._sock is None:
77 elif self._closing:
79 if self._sock is not None:
80 info.append(f'fd={self._sock.fileno()}')
81 if self._read_fut is not None:
82 info.append(f'read={self._read_fut!r}')
83 if self._write_fut is not None:
84 info.append(f'write={self._write_fut!r}')
85 if self._buffer:
86 info.append(f'write_bufsize={len(self._buffer)}')
87 if self._eof_written:
91 def _set_extra(self, sock):
92 self._extra['pipe'] = sock
94 def set_protocol(self, protocol):
95 self._protocol = protocol
97 def get_protocol(self):
98 return self._protocol
100 def is_closing(self):
101 return self._closing
103 def close(self):
104 if self._closing:
106 self._closing = True
107 self._conn_lost += 1
108 if not self._buffer and self._write_fut is None:
109 self._loop.call_soon(self._call_connection_lost, None)
110 if self._read_fut is not None:
111 self._read_fut.cancel()
112 self._read_fut = None
114 def __del__(self, _warn=warnings.warn):
115 if self._sock is not None:
116 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
117 self._sock.close()
119 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
122 if self._loop.get_debug():
123 logger.debug("%r: %s", self, message, exc_info=True)
125 self._loop.call_exception_handler({
128 'transport': self,
129 'protocol': self._protocol,
132 self._force_close(exc)
134 def _force_close(self, exc):
135 if self._empty_waiter is not None and not self._empty_waiter.done():
137 self._empty_waiter.set_result(None)
139 self._empty_waiter.set_exception(exc)
140 if self._closing and self._called_connection_lost:
142 self._closing = True
143 self._conn_lost += 1
144 if self._write_fut:
145 self._write_fut.cancel()
146 self._write_fut = None
147 if self._read_fut:
148 self._read_fut.cancel()
149 self._read_fut = None
150 self._pending_write = 0
151 self._buffer = None
152 self._loop.call_soon(self._call_connection_lost, exc)
154 def _call_connection_lost(self, exc):
155 if self._called_connection_lost:
158 self._protocol.connection_lost(exc)
164 if hasattr(self._sock, 'shutdown') and self._sock.fileno() != -1:
165 self._sock.shutdown(socket.SHUT_RDWR)
166 self._sock.close()
167 self._sock = None
168 server = self._server
171 self._server = None
172 self._called_connection_lost = True
174 def get_write_buffer_size(self):
175 size = self._pending_write
176 if self._buffer is not None:
177 size += len(self._buffer)
185 def __init__(self, loop, sock, protocol, waiter=None,
187 self._pending_data_length = -1
188 self._paused = True
191 self._data = bytearray(buffer_size)
192 self._loop.call_soon(self._loop_reading)
193 self._paused = False
195 def is_reading(self):
196 return not self._paused and not self._closing
198 def pause_reading(self):
199 if self._closing or self._paused:
201 self._paused = True
203 # bpo-33694: Don't cancel self._read_fut because cancelling an
214 if self._loop.get_debug():
215 logger.debug("%r pauses reading", self)
217 def resume_reading(self):
218 if self._closing or not self._paused:
221 self._paused = False
222 if self._read_fut is None:
223 self._loop.call_soon(self._loop_reading, None)
225 length = self._pending_data_length
226 self._pending_data_length = -1
230 self._loop.call_soon(self._data_received, self._data[:length], length)
232 if self._loop.get_debug():
233 logger.debug("%r resumes reading", self)
235 def _eof_received(self):
236 if self._loop.get_debug():
237 logger.debug("%r received EOF", self)
240 keep_open = self._protocol.eof_received()
244 self._fatal_error(
249 self.close()
251 def _data_received(self, data, length):
252 if self._paused:
255 assert self._pending_data_length == -1
256 self._pending_data_length = length
260 self._eof_received()
263 if isinstance(self._protocol, protocols.BufferedProtocol):
265 protocols._feed_data_to_buffered_proto(self._protocol, data)
269 self._fatal_error(exc,
274 self._protocol.data_received(data)
276 def _loop_reading(self, fut=None):
281 assert self._read_fut is fut or (self._read_fut is None and
282 self._closing)
283 self._read_fut = None
291 data = self._data[:length]
296 if self._closing:
303 if not self._paused:
305 self._read_fut = self._loop._proactor.recv_into(self._sock, self._data)
307 if not self._closing:
308 self._fatal_error(exc, 'Fatal read error on pipe transport')
309 elif self._loop.get_debug():
313 self._force_close(exc)
315 self._fatal_error(exc, 'Fatal read error on pipe transport')
317 if not self._closing:
320 if not self._paused:
321 self._read_fut.add_done_callback(self._loop_reading)
324 self._data_received(data, length)
333 def __init__(self, *args, **kw):
335 self._empty_waiter = None
337 def write(self, data):
342 if self._eof_written:
344 if self._empty_waiter is not None:
350 if self._conn_lost:
351 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
353 self._conn_lost += 1
362 if self._write_fut is None: # IDLE -> WRITING
363 assert self._buffer is None
365 self._loop_writing(data=bytes(data))
366 elif not self._buffer: # WRITING -> BACKED UP
368 self._buffer = bytearray(data)
369 self._maybe_pause_protocol()
372 self._buffer.extend(data)
373 self._maybe_pause_protocol()
375 def _loop_writing(self, f=None, data=None):
377 if f is not None and self._write_fut is None and self._closing:
378 # XXX most likely self._force_close() has been called, and
379 # it has set self._write_fut to None.
381 assert f is self._write_fut
382 self._write_fut = None
383 self._pending_write = 0
387 data = self._buffer
388 self._buffer = None
390 if self._closing:
391 self._loop.call_soon(self._call_connection_lost, None)
392 if self._eof_written:
393 self._sock.shutdown(socket.SHUT_WR)
399 self._maybe_resume_protocol()
401 self._write_fut = self._loop._proactor.send(self._sock, data)
402 if not self._write_fut.done():
403 assert self._pending_write == 0
404 self._pending_write = len(data)
405 self._write_fut.add_done_callback(self._loop_writing)
406 self._maybe_pause_protocol()
408 self._write_fut.add_done_callback(self._loop_writing)
409 if self._empty_waiter is not None and self._write_fut is None:
410 self._empty_waiter.set_result(None)
412 self._force_close(exc)
414 self._fatal_error(exc, 'Fatal write error on pipe transport')
416 def can_write_eof(self):
419 def write_eof(self):
420 self.close()
422 def abort(self):
423 self._force_close(None)
425 def _make_empty_waiter(self):
426 if self._empty_waiter is not None:
428 self._empty_waiter = self._loop.create_future()
429 if self._write_fut is None:
430 self._empty_waiter.set_result(None)
431 return self._empty_waiter
433 def _reset_empty_waiter(self):
434 self._empty_waiter = None
438 def __init__(self, *args, **kw):
440 self._read_fut = self._loop._proactor.recv(self._sock, 16)
441 self._read_fut.add_done_callback(self._pipe_closed)
443 def _pipe_closed(self, fut):
448 if self._closing:
449 assert self._read_fut is None
451 assert fut is self._read_fut, (fut, self._read_fut)
452 self._read_fut = None
453 if self._write_fut is not None:
454 self._force_close(BrokenPipeError())
456 self.close()
462 def __init__(self, loop, sock, protocol, address=None,
464 self._address = address
465 self._empty_waiter = None
466 self._buffer_size = 0
472 self._buffer = collections.deque()
473 self._loop.call_soon(self._loop_reading)
475 def _set_extra(self, sock):
476 _set_socket_extra(self, sock)
478 def get_write_buffer_size(self):
479 return self._buffer_size
481 def abort(self):
482 self._force_close(None)
484 def sendto(self, data, addr=None):
492 if self._address is not None and addr not in (None, self._address):
494 f'Invalid address: must be None or {self._address}')
496 if self._conn_lost and self._address:
497 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
499 self._conn_lost += 1
503 self._buffer.append((bytes(data), addr))
504 self._buffer_size += len(data)
506 if self._write_fut is None:
508 self._loop_writing()
511 self._maybe_pause_protocol()
513 def _loop_writing(self, fut=None):
515 if self._conn_lost:
518 assert fut is self._write_fut
519 self._write_fut = None
524 if not self._buffer or (self._conn_lost and self._address):
526 if self._closing:
527 self._loop.call_soon(self._call_connection_lost, None)
530 data, addr = self._buffer.popleft()
531 self._buffer_size -= len(data)
532 if self._address is not None:
533 self._write_fut = self._loop._proactor.send(self._sock,
536 self._write_fut = self._loop._proactor.sendto(self._sock,
540 self._protocol.error_received(exc)
542 self._fatal_error(exc, 'Fatal write error on datagram transport')
544 self._write_fut.add_done_callback(self._loop_writing)
545 self._maybe_resume_protocol()
547 def _loop_reading(self, fut=None):
550 if self._conn_lost:
553 assert self._read_fut is fut or (self._read_fut is None and
554 self._closing)
556 self._read_fut = None
560 if self._closing:
565 if self._address is not None:
566 data, addr = res, self._address
570 if self._conn_lost:
572 if self._address is not None:
573 self._read_fut = self._loop._proactor.recv(self._sock,
574 self.max_size)
576 self._read_fut = self._loop._proactor.recvfrom(self._sock,
577 self.max_size)
579 self._protocol.error_received(exc)
581 if not self._closing:
584 if self._read_fut is not None:
585 self._read_fut.add_done_callback(self._loop_reading)
588 self._protocol.datagram_received(data, addr)
596 def can_write_eof(self):
599 def write_eof(self):
610 def __init__(self, loop, sock, protocol, waiter=None,
615 def _set_extra(self, sock):
616 _set_socket_extra(self, sock)
618 def can_write_eof(self):
621 def write_eof(self):
622 if self._closing or self._eof_written:
624 self._eof_written = True
625 if self._write_fut is None:
626 self._sock.shutdown(socket.SHUT_WR)
631 def __init__(self, proactor):
634 self._proactor = proactor
635 self._selector = proactor # convenient alias
636 self._self_reading_future = None
637 self._accept_futures = {} # socket file descriptor => Future
638 proactor.set_loop(self)
639 self._make_self_pipe()
642 signal.set_wakeup_fd(self._csock.fileno())
644 def _make_socket_transport(self, sock, protocol, waiter=None,
646 return _ProactorSocketTransport(self, sock, protocol, waiter,
650 self, rawsock, protocol, sslcontext, waiter=None,
656 self, protocol, sslcontext, waiter,
660 _ProactorSocketTransport(self, rawsock, ssl_protocol,
664 def _make_datagram_transport(self, sock, protocol,
666 return _ProactorDatagramTransport(self, sock, protocol, address,
669 def _make_duplex_pipe_transport(self, sock, protocol, waiter=None,
671 return _ProactorDuplexPipeTransport(self,
674 def _make_read_pipe_transport(self, sock, protocol, waiter=None,
676 return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra)
678 def _make_write_pipe_transport(self, sock, protocol, waiter=None,
681 return _ProactorWritePipeTransport(self,
684 def close(self):
685 if self.is_running():
687 if self.is_closed():
695 self._stop_accept_futures()
696 self._close_self_pipe()
697 self._proactor.close()
698 self._proactor = None
699 self._selector = None
704 async def sock_recv(self, sock, n):
705 return await self._proactor.recv(sock, n)
707 async def sock_recv_into(self, sock, buf):
708 return await self._proactor.recv_into(sock, buf)
710 async def sock_recvfrom(self, sock, bufsize):
711 return await self._proactor.recvfrom(sock, bufsize)
713 async def sock_recvfrom_into(self, sock, buf, nbytes=0):
717 return await self._proactor.recvfrom_into(sock, buf, nbytes)
719 async def sock_sendall(self, sock, data):
720 return await self._proactor.send(sock, data)
722 async def sock_sendto(self, sock, data, address):
723 return await self._proactor.sendto(sock, data, 0, address)
725 async def sock_connect(self, sock, address):
726 return await self._proactor.connect(sock, address)
728 async def sock_accept(self, sock):
729 return await self._proactor.accept(sock)
731 async def _sock_sendfile_native(self, sock, file, offset, count):
753 await self._proactor.sendfile(sock, file, offset, blocksize)
760 async def _sendfile_native(self, transp, file, offset, count):
765 return await self.sock_sendfile(transp._sock, file, offset, count,
772 def _close_self_pipe(self):
773 if self._self_reading_future is not None:
774 self._self_reading_future.cancel()
775 self._self_reading_future = None
776 self._ssock.close()
777 self._ssock = None
778 self._csock.close()
779 self._csock = None
780 self._internal_fds -= 1
782 def _make_self_pipe(self):
783 # A self-socket, really. :-)
784 self._ssock, self._csock = socket.socketpair()
785 self._ssock.setblocking(False)
786 self._csock.setblocking(False)
787 self._internal_fds += 1
789 def _loop_self_reading(self, f=None):
793 if self._self_reading_future is not f:
801 f = self._proactor.recv(self._ssock, 4096)
808 self.call_exception_handler({
809 'message': 'Error on reading from the event loop self pipe',
811 'loop': self,
814 self._self_reading_future = f
815 f.add_done_callback(self._loop_self_reading)
817 def _write_to_self(self):
820 # running. Guard for self._csock being None or closed. When
823 csock = self._csock
830 if self._debug:
832 "self-pipe socket",
835 def _start_serving(self, protocol_factory, sock,
844 if self._debug:
849 self._make_ssl_transport(
855 self._make_socket_transport(
858 if self.is_closed():
860 f = self._proactor.accept(sock)
863 self.call_exception_handler({
869 elif self._debug:
875 self._accept_futures[sock.fileno()] = f
878 self.call_soon(loop)
880 def _process_events(self, event_list):
884 def _stop_accept_futures(self):
885 for future in self._accept_futures.values():
887 self._accept_futures.clear()
889 def _stop_serving(self, sock):
890 future = self._accept_futures.pop(sock.fileno(), None)
893 self._proactor._stop_serving(sock)