Lines Matching refs:self
49 def __init__(self, selector=None):
55 self._selector = selector
56 self._make_self_pipe()
57 self._transports = weakref.WeakValueDictionary()
59 def _make_socket_transport(self, sock, protocol, waiter=None, *,
61 return _SelectorSocketTransport(self, sock, protocol, waiter,
65 self, rawsock, protocol, sslcontext, waiter=None,
72 self, protocol, sslcontext, waiter,
77 _SelectorSocketTransport(self, rawsock, ssl_protocol,
81 def _make_datagram_transport(self, sock, protocol,
83 return _SelectorDatagramTransport(self, sock, protocol,
86 def close(self):
87 if self.is_running():
89 if self.is_closed():
91 self._close_self_pipe()
93 if self._selector is not None:
94 self._selector.close()
95 self._selector = None
97 def _close_self_pipe(self):
98 self._remove_reader(self._ssock.fileno())
99 self._ssock.close()
100 self._ssock = None
101 self._csock.close()
102 self._csock = None
103 self._internal_fds -= 1
105 def _make_self_pipe(self):
106 # A self-socket, really. :-)
107 self._ssock, self._csock = socket.socketpair()
108 self._ssock.setblocking(False)
109 self._csock.setblocking(False)
110 self._internal_fds += 1
111 self._add_reader(self._ssock.fileno(), self._read_from_self)
113 def _process_self_data(self, data):
116 def _read_from_self(self):
119 data = self._ssock.recv(4096)
122 self._process_self_data(data)
128 def _write_to_self(self):
131 # running. Guard for self._csock being None or closed. When
134 csock = self._csock
141 if self._debug:
143 "self-pipe socket",
146 def _start_serving(self, protocol_factory, sock,
150 self._add_reader(sock.fileno(), self._accept_connection,
155 self, protocol_factory, sock,
166 if self._debug:
180 self.call_exception_handler({
185 self._remove_reader(sock.fileno())
186 self.call_later(constants.ACCEPT_RETRY_DELAY,
187 self._start_serving,
195 accept = self._accept_connection2(
198 self.create_task(accept)
201 self, protocol_factory, conn, extra,
209 waiter = self.create_future()
211 transport = self._make_ssl_transport(
217 transport = self._make_socket_transport(
231 if self._debug:
241 self.call_exception_handler(context)
243 def _ensure_fd_no_transport(self, fd):
252 transport = self._transports[fileno]
261 def _add_reader(self, fd, callback, *args):
262 self._check_closed()
263 handle = events.Handle(callback, args, self, None)
265 key = self._selector.get_key(fd)
267 self._selector.register(fd, selectors.EVENT_READ,
271 self._selector.modify(fd, mask | selectors.EVENT_READ,
277 def _remove_reader(self, fd):
278 if self.is_closed():
281 key = self._selector.get_key(fd)
288 self._selector.unregister(fd)
290 self._selector.modify(fd, mask, (None, writer))
298 def _add_writer(self, fd, callback, *args):
299 self._check_closed()
300 handle = events.Handle(callback, args, self, None)
302 key = self._selector.get_key(fd)
304 self._selector.register(fd, selectors.EVENT_WRITE,
308 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
314 def _remove_writer(self, fd):
316 if self.is_closed():
319 key = self._selector.get_key(fd)
327 self._selector.unregister(fd)
329 self._selector.modify(fd, mask, (reader, None))
337 def add_reader(self, fd, callback, *args):
339 self._ensure_fd_no_transport(fd)
340 self._add_reader(fd, callback, *args)
342 def remove_reader(self, fd):
344 self._ensure_fd_no_transport(fd)
345 return self._remove_reader(fd)
347 def add_writer(self, fd, callback, *args):
349 self._ensure_fd_no_transport(fd)
350 self._add_writer(fd, callback, *args)
352 def remove_writer(self, fd):
354 self._ensure_fd_no_transport(fd)
355 return self._remove_writer(fd)
357 async def sock_recv(self, sock, n):
365 if self._debug and sock.gettimeout() != 0:
371 fut = self.create_future()
373 self._ensure_fd_no_transport(fd)
374 handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
376 functools.partial(self._sock_read_done, fd, handle=handle))
379 def _sock_read_done(self, fd, fut, handle=None):
381 self.remove_reader(fd)
383 def _sock_recv(self, fut, sock, n):
399 async def sock_recv_into(self, sock, buf):
406 if self._debug and sock.gettimeout() != 0:
412 fut = self.create_future()
414 self._ensure_fd_no_transport(fd)
415 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
417 functools.partial(self._sock_read_done, fd, handle=handle))
420 def _sock_recv_into(self, fut, sock, buf):
437 async def sock_recvfrom(self, sock, bufsize):
446 if self._debug and sock.gettimeout() != 0:
452 fut = self.create_future()
454 self._ensure_fd_no_transport(fd)
455 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
457 functools.partial(self._sock_read_done, fd, handle=handle))
460 def _sock_recvfrom(self, fut, sock, bufsize):
477 async def sock_recvfrom_into(self, sock, buf, nbytes=0):
484 if self._debug and sock.gettimeout() != 0:
493 fut = self.create_future()
495 self._ensure_fd_no_transport(fd)
496 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
499 functools.partial(self._sock_read_done, fd, handle=handle))
502 def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
519 async def sock_sendall(self, sock, data):
529 if self._debug and sock.gettimeout() != 0:
540 fut = self.create_future()
542 self._ensure_fd_no_transport(fd)
544 handle = self._add_writer(fd, self._sock_sendall, fut, sock,
547 functools.partial(self._sock_write_done, fd, handle=handle))
550 def _sock_sendall(self, fut, sock, view, pos):
572 async def sock_sendto(self, sock, data, address):
582 if self._debug and sock.gettimeout() != 0:
589 fut = self.create_future()
591 self._ensure_fd_no_transport(fd)
593 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
596 functools.partial(self._sock_write_done, fd, handle=handle))
599 def _sock_sendto(self, fut, sock, data, address):
614 async def sock_connect(self, sock, address):
620 if self._debug and sock.gettimeout() != 0:
625 resolved = await self._ensure_resolved(
627 loop=self,
631 fut = self.create_future()
632 self._sock_connect(fut, sock, address)
639 def _sock_connect(self, fut, sock, address):
648 self._ensure_fd_no_transport(fd)
649 handle = self._add_writer(
650 fd, self._sock_connect_cb, fut, sock, address)
652 functools.partial(self._sock_write_done, fd, handle=handle))
662 def _sock_write_done(self, fd, fut, handle=None):
664 self.remove_writer(fd)
666 def _sock_connect_cb(self, fut, sock, address):
687 async def sock_accept(self, sock):
696 if self._debug and sock.gettimeout() != 0:
698 fut = self.create_future()
699 self._sock_accept(fut, sock)
702 def _sock_accept(self, fut, sock):
708 self._ensure_fd_no_transport(fd)
709 handle = self._add_reader(fd, self._sock_accept, fut, sock)
711 functools.partial(self._sock_read_done, fd, handle=handle))
719 async def _sendfile_native(self, transp, file, offset, count):
720 del self._transports[transp._sock_fd]
725 return await self.sock_sendfile(transp._sock, file, offset, count,
731 self._transports[transp._sock_fd] = transp
733 def _process_events(self, event_list):
738 self._remove_reader(fileobj)
740 self._add_callback(reader)
743 self._remove_writer(fileobj)
745 self._add_callback(writer)
747 def _stop_serving(self, sock):
748 self._remove_reader(sock.fileno())
757 _buffer_factory = bytearray # Constructs initial value for self._buffer.
764 def __init__(self, loop, sock, protocol, extra=None, server=None):
766 self._extra['socket'] = trsock.TransportSocket(sock)
768 self._extra['sockname'] = sock.getsockname()
770 self._extra['sockname'] = None
771 if 'peername' not in self._extra:
773 self._extra['peername'] = sock.getpeername()
775 self._extra['peername'] = None
776 self._sock = sock
777 self._sock_fd = sock.fileno()
779 self._protocol_connected = False
780 self.set_protocol(protocol)
782 self._server = server
783 self._buffer = self._buffer_factory()
784 self._conn_lost = 0 # Set when call to connection_lost scheduled.
785 self._closing = False # Set when close() called.
786 self._paused = False # Set when pause_reading() called
788 if self._server is not None:
789 self._server._attach()
790 loop._transports[self._sock_fd] = self
792 def __repr__(self):
793 info = [self.__class__.__name__]
794 if self._sock is None:
796 elif self._closing:
798 info.append(f'fd={self._sock_fd}')
800 if self._loop is not None and not self._loop.is_closed():
801 polling = _test_selector_event(self._loop._selector,
802 self._sock_fd, selectors.EVENT_READ)
808 polling = _test_selector_event(self._loop._selector,
809 self._sock_fd,
816 bufsize = self.get_write_buffer_size()
820 def abort(self):
821 self._force_close(None)
823 def set_protocol(self, protocol):
824 self._protocol = protocol
825 self._protocol_connected = True
827 def get_protocol(self):
828 return self._protocol
830 def is_closing(self):
831 return self._closing
833 def is_reading(self):
834 return not self.is_closing() and not self._paused
836 def pause_reading(self):
837 if not self.is_reading():
839 self._paused = True
840 self._loop._remove_reader(self._sock_fd)
841 if self._loop.get_debug():
842 logger.debug("%r pauses reading", self)
844 def resume_reading(self):
845 if self._closing or not self._paused:
847 self._paused = False
848 self._add_reader(self._sock_fd, self._read_ready)
849 if self._loop.get_debug():
850 logger.debug("%r resumes reading", self)
852 def close(self):
853 if self._closing:
855 self._closing = True
856 self._loop._remove_reader(self._sock_fd)
857 if not self._buffer:
858 self._conn_lost += 1
859 self._loop._remove_writer(self._sock_fd)
860 self._loop.call_soon(self._call_connection_lost, None)
862 def __del__(self, _warn=warnings.warn):
863 if self._sock is not None:
864 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
865 self._sock.close()
867 def _fatal_error(self, exc, message='Fatal error on transport'):
870 if self._loop.get_debug():
871 logger.debug("%r: %s", self, message, exc_info=True)
873 self._loop.call_exception_handler({
876 'transport': self,
877 'protocol': self._protocol,
879 self._force_close(exc)
881 def _force_close(self, exc):
882 if self._conn_lost:
884 if self._buffer:
885 self._buffer.clear()
886 self._loop._remove_writer(self._sock_fd)
887 if not self._closing:
888 self._closing = True
889 self._loop._remove_reader(self._sock_fd)
890 self._conn_lost += 1
891 self._loop.call_soon(self._call_connection_lost, exc)
893 def _call_connection_lost(self, exc):
895 if self._protocol_connected:
896 self._protocol.connection_lost(exc)
898 self._sock.close()
899 self._sock = None
900 self._protocol = None
901 self._loop = None
902 server = self._server
905 self._server = None
907 def get_write_buffer_size(self):
908 return len(self._buffer)
910 def _add_reader(self, fd, callback, *args):
911 if not self.is_reading():
913 self._loop._add_reader(fd, callback, *args)
921 def __init__(self, loop, sock, protocol, waiter=None,
924 self._read_ready_cb = None
926 self._eof = False
927 self._empty_waiter = None
932 base_events._set_nodelay(self._sock)
934 self._loop.call_soon(self._protocol.connection_made, self)
936 self._loop.call_soon(self._add_reader,
937 self._sock_fd, self._read_ready)
940 self._loop.call_soon(futures._set_result_unless_cancelled,
943 def set_protocol(self, protocol):
945 self._read_ready_cb = self._read_ready__get_buffer
947 self._read_ready_cb = self._read_ready__data_received
951 def _read_ready(self):
952 self._read_ready_cb()
954 def _read_ready__get_buffer(self):
955 if self._conn_lost:
959 buf = self._protocol.get_buffer(-1)
965 self._fatal_error(
970 nbytes = self._sock.recv_into(buf)
976 self._fatal_error(exc, 'Fatal read error on socket transport')
980 self._read_ready__on_eof()
984 self._protocol.buffer_updated(nbytes)
988 self._fatal_error(
991 def _read_ready__data_received(self):
992 if self._conn_lost:
995 data = self._sock.recv(self.max_size)
1001 self._fatal_error(exc, 'Fatal read error on socket transport')
1005 self._read_ready__on_eof()
1009 self._protocol.data_received(data)
1013 self._fatal_error(
1016 def _read_ready__on_eof(self):
1017 if self._loop.get_debug():
1018 logger.debug("%r received EOF", self)
1021 keep_open = self._protocol.eof_received()
1025 self._fatal_error(
1033 self._loop._remove_reader(self._sock_fd)
1035 self.close()
1037 def write(self, data):
1041 if self._eof:
1043 if self._empty_waiter is not None:
1048 if self._conn_lost:
1049 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1051 self._conn_lost += 1
1054 if not self._buffer:
1057 n = self._sock.send(data)
1063 self._fatal_error(exc, 'Fatal write error on socket transport')
1070 self._loop._add_writer(self._sock_fd, self._write_ready)
1073 self._buffer.extend(data)
1074 self._maybe_pause_protocol()
1076 def _write_ready(self):
1077 assert self._buffer, 'Data should not be empty'
1079 if self._conn_lost:
1082 n = self._sock.send(self._buffer)
1088 self._loop._remove_writer(self._sock_fd)
1089 self._buffer.clear()
1090 self._fatal_error(exc, 'Fatal write error on socket transport')
1091 if self._empty_waiter is not None:
1092 self._empty_waiter.set_exception(exc)
1095 del self._buffer[:n]
1096 self._maybe_resume_protocol() # May append to buffer.
1097 if not self._buffer:
1098 self._loop._remove_writer(self._sock_fd)
1099 if self._empty_waiter is not None:
1100 self._empty_waiter.set_result(None)
1101 if self._closing:
1102 self._call_connection_lost(None)
1103 elif self._eof:
1104 self._sock.shutdown(socket.SHUT_WR)
1106 def write_eof(self):
1107 if self._closing or self._eof:
1109 self._eof = True
1110 if not self._buffer:
1111 self._sock.shutdown(socket.SHUT_WR)
1113 def can_write_eof(self):
1116 def _call_connection_lost(self, exc):
1118 if self._empty_waiter is not None:
1119 self._empty_waiter.set_exception(
1122 def _make_empty_waiter(self):
1123 if self._empty_waiter is not None:
1125 self._empty_waiter = self._loop.create_future()
1126 if not self._buffer:
1127 self._empty_waiter.set_result(None)
1128 return self._empty_waiter
1130 def _reset_empty_waiter(self):
1131 self._empty_waiter = None
1138 def __init__(self, loop, sock, protocol, address=None,
1141 self._address = address
1142 self._buffer_size = 0
1143 self._loop.call_soon(self._protocol.connection_made, self)
1145 self._loop.call_soon(self._add_reader,
1146 self._sock_fd, self._read_ready)
1149 self._loop.call_soon(futures._set_result_unless_cancelled,
1152 def get_write_buffer_size(self):
1153 return self._buffer_size
1155 def _read_ready(self):
1156 if self._conn_lost:
1159 data, addr = self._sock.recvfrom(self.max_size)
1163 self._protocol.error_received(exc)
1167 self._fatal_error(exc, 'Fatal read error on datagram transport')
1169 self._protocol.datagram_received(data, addr)
1171 def sendto(self, data, addr=None):
1178 if self._address:
1179 if addr not in (None, self._address):
1181 f'Invalid address: must be None or {self._address}')
1182 addr = self._address
1184 if self._conn_lost and self._address:
1185 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1187 self._conn_lost += 1
1190 if not self._buffer:
1193 if self._extra['peername']:
1194 self._sock.send(data)
1196 self._sock.sendto(data, addr)
1199 self._loop._add_writer(self._sock_fd, self._sendto_ready)
1201 self._protocol.error_received(exc)
1206 self._fatal_error(
1211 self._buffer.append((bytes(data), addr))
1212 self._buffer_size += len(data)
1213 self._maybe_pause_protocol()
1215 def _sendto_ready(self):
1216 while self._buffer:
1217 data, addr = self._buffer.popleft()
1218 self._buffer_size -= len(data)
1220 if self._extra['peername']:
1221 self._sock.send(data)
1223 self._sock.sendto(data, addr)
1225 self._buffer.appendleft((data, addr)) # Try again later.
1226 self._buffer_size += len(data)
1229 self._protocol.error_received(exc)
1234 self._fatal_error(
1238 self._maybe_resume_protocol() # May append to buffer.
1239 if not self._buffer:
1240 self._loop._remove_writer(self._sock_fd)
1241 if self._closing:
1242 self._call_connection_lost(None)