Lines Matching refs:self
127 def __init__(self, loop=None):
129 self._loop = events._get_event_loop(stacklevel=4)
131 self._loop = loop
132 self._paused = False
133 self._drain_waiters = collections.deque()
134 self._connection_lost = False
136 def pause_writing(self):
137 assert not self._paused
138 self._paused = True
139 if self._loop.get_debug():
140 logger.debug("%r pauses writing", self)
142 def resume_writing(self):
143 assert self._paused
144 self._paused = False
145 if self._loop.get_debug():
146 logger.debug("%r resumes writing", self)
148 for waiter in self._drain_waiters:
152 def connection_lost(self, exc):
153 self._connection_lost = True
155 if not self._paused:
158 for waiter in self._drain_waiters:
165 async def _drain_helper(self):
166 if self._connection_lost:
168 if not self._paused:
170 waiter = self._loop.create_future()
171 self._drain_waiters.append(waiter)
175 self._drain_waiters.remove(waiter)
177 def _get_close_waiter(self, stream):
192 def __init__(self, stream_reader, client_connected_cb=None, loop=None):
195 self._stream_reader_wr = weakref.ref(stream_reader)
196 self._source_traceback = stream_reader._source_traceback
198 self._stream_reader_wr = None
203 self._strong_reader = stream_reader
204 self._reject_connection = False
205 self._stream_writer = None
206 self._task = None
207 self._transport = None
208 self._client_connected_cb = client_connected_cb
209 self._over_ssl = False
210 self._closed = self._loop.create_future()
213 def _stream_reader(self):
214 if self._stream_reader_wr is None:
216 return self._stream_reader_wr()
218 def _replace_writer(self, writer):
219 loop = self._loop
221 self._stream_writer = writer
222 self._transport = transport
223 self._over_ssl = transport.get_extra_info('sslcontext') is not None
225 def connection_made(self, transport):
226 if self._reject_connection:
232 if self._source_traceback:
233 context['source_traceback'] = self._source_traceback
234 self._loop.call_exception_handler(context)
237 self._transport = transport
238 reader = self._stream_reader
241 self._over_ssl = transport.get_extra_info('sslcontext') is not None
242 if self._client_connected_cb is not None:
243 self._stream_writer = StreamWriter(transport, self,
245 self._loop)
246 res = self._client_connected_cb(reader,
247 self._stream_writer)
249 self._task = self._loop.create_task(res)
250 self._strong_reader = None
252 def connection_lost(self, exc):
253 reader = self._stream_reader
259 if not self._closed.done():
261 self._closed.set_result(None)
263 self._closed.set_exception(exc)
265 self._stream_reader_wr = None
266 self._stream_writer = None
267 self._task = None
268 self._transport = None
270 def data_received(self, data):
271 reader = self._stream_reader
275 def eof_received(self):
276 reader = self._stream_reader
279 if self._over_ssl:
286 def _get_close_waiter(self, stream):
287 return self._closed
289 def __del__(self):
291 # Better than self._closed._log_traceback = False hack
293 closed = self._closed
311 def __init__(self, transport, protocol, reader, loop):
312 self._transport = transport
313 self._protocol = protocol
316 self._reader = reader
317 self._loop = loop
318 self._complete_fut = self._loop.create_future()
319 self._complete_fut.set_result(None)
321 def __repr__(self):
322 info = [self.__class__.__name__, f'transport={self._transport!r}']
323 if self._reader is not None:
324 info.append(f'reader={self._reader!r}')
328 def transport(self):
329 return self._transport
331 def write(self, data):
332 self._transport.write(data)
334 def writelines(self, data):
335 self._transport.writelines(data)
337 def write_eof(self):
338 return self._transport.write_eof()
340 def can_write_eof(self):
341 return self._transport.can_write_eof()
343 def close(self):
344 return self._transport.close()
346 def is_closing(self):
347 return self._transport.is_closing()
349 async def wait_closed(self):
350 await self._protocol._get_close_waiter(self)
352 def get_extra_info(self, name, default=None):
353 return self._transport.get_extra_info(name, default)
355 async def drain(self):
363 if self._reader is not None:
364 exc = self._reader.exception()
367 if self._transport.is_closing():
378 await self._protocol._drain_helper()
380 async def start_tls(self, sslcontext, *,
384 server_side = self._protocol._client_connected_cb is not None
385 protocol = self._protocol
386 await self.drain()
387 new_transport = await self._loop.start_tls( # type: ignore
388 self._transport, protocol, sslcontext,
391 self._transport = new_transport
392 protocol._replace_writer(self)
399 def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
406 self._limit = limit
408 self._loop = events._get_event_loop()
410 self._loop = loop
411 self._buffer = bytearray()
412 self._eof = False # Whether we're done.
413 self._waiter = None # A future used by _wait_for_data()
414 self._exception = None
415 self._transport = None
416 self._paused = False
417 if self._loop.get_debug():
418 self._source_traceback = format_helpers.extract_stack(
421 def __repr__(self):
423 if self._buffer:
424 info.append(f'{len(self._buffer)} bytes')
425 if self._eof:
427 if self._limit != _DEFAULT_LIMIT:
428 info.append(f'limit={self._limit}')
429 if self._waiter:
430 info.append(f'waiter={self._waiter!r}')
431 if self._exception:
432 info.append(f'exception={self._exception!r}')
433 if self._transport:
434 info.append(f'transport={self._transport!r}')
435 if self._paused:
439 def exception(self):
440 return self._exception
442 def set_exception(self, exc):
443 self._exception = exc
445 waiter = self._waiter
447 self._waiter = None
451 def _wakeup_waiter(self):
453 waiter = self._waiter
455 self._waiter = None
459 def set_transport(self, transport):
460 assert self._transport is None, 'Transport already set'
461 self._transport = transport
463 def _maybe_resume_transport(self):
464 if self._paused and len(self._buffer) <= self._limit:
465 self._paused = False
466 self._transport.resume_reading()
468 def feed_eof(self):
469 self._eof = True
470 self._wakeup_waiter()
472 def at_eof(self):
474 return self._eof and not self._buffer
476 def feed_data(self, data):
477 assert not self._eof, 'feed_data after feed_eof'
482 self._buffer.extend(data)
483 self._wakeup_waiter()
485 if (self._transport is not None and
486 not self._paused and
487 len(self._buffer) > 2 * self._limit):
489 self._transport.pause_reading()
494 self._transport = None
496 self._paused = True
498 async def _wait_for_data(self, func_name):
507 if self._waiter is not None:
512 assert not self._eof, '_wait_for_data after EOF'
515 # This is essential for readexactly(n) for case when n > self._limit.
516 if self._paused:
517 self._paused = False
518 self._transport.resume_reading()
520 self._waiter = self._loop.create_future()
522 await self._waiter
524 self._waiter = None
526 async def readline(self):
545 line = await self.readuntil(sep)
549 if self._buffer.startswith(sep, e.consumed):
550 del self._buffer[:e.consumed + seplen]
552 self._buffer.clear()
553 self._maybe_resume_transport()
557 async def readuntil(self, separator=b'\n'):
581 if self._exception is not None:
582 raise self._exception
608 buflen = len(self._buffer)
613 isep = self._buffer.find(separator, offset)
622 if offset > self._limit:
631 if self._eof:
632 chunk = bytes(self._buffer)
633 self._buffer.clear()
637 await self._wait_for_data('readuntil')
639 if isep > self._limit:
643 chunk = self._buffer[:isep + seplen]
644 del self._buffer[:isep + seplen]
645 self._maybe_resume_transport()
648 async def read(self, n=-1):
670 if self._exception is not None:
671 raise self._exception
678 # collect everything in self._buffer, but that would
679 # deadlock if the subprocess sends more than self.limit
680 # bytes. So just call self.read(self._limit) until EOF.
683 block = await self.read(self._limit)
689 if not self._buffer and not self._eof:
690 await self._wait_for_data('read')
693 data = bytes(self._buffer[:n])
694 del self._buffer[:n]
696 self._maybe_resume_transport()
699 async def readexactly(self, n):
717 if self._exception is not None:
718 raise self._exception
723 while len(self._buffer) < n:
724 if self._eof:
725 incomplete = bytes(self._buffer)
726 self._buffer.clear()
729 await self._wait_for_data('readexactly')
731 if len(self._buffer) == n:
732 data = bytes(self._buffer)
733 self._buffer.clear()
735 data = bytes(self._buffer[:n])
736 del self._buffer[:n]
737 self._maybe_resume_transport()
740 def __aiter__(self):
741 return self
743 async def __anext__(self):
744 val = await self.readline()