Lines Matching refs:self

84     def __init__(self, loop, ssl_protocol):
85 self._loop = loop
86 self._ssl_protocol = ssl_protocol
87 self._closed = False
89 def get_extra_info(self, name, default=None):
91 return self._ssl_protocol._get_extra_info(name, default)
93 def set_protocol(self, protocol):
94 self._ssl_protocol._set_app_protocol(protocol)
96 def get_protocol(self):
97 return self._ssl_protocol._app_protocol
99 def is_closing(self):
100 return self._closed
102 def close(self):
110 if not self._closed:
111 self._closed = True
112 self._ssl_protocol._start_shutdown()
114 self._ssl_protocol = None
116 def __del__(self, _warnings=warnings):
117 if not self._closed:
118 self._closed = True
123 def is_reading(self):
124 return not self._ssl_protocol._app_reading_paused
126 def pause_reading(self):
132 self._ssl_protocol._pause_reading()
134 def resume_reading(self):
140 self._ssl_protocol._resume_reading()
142 def set_write_buffer_limits(self, high=None, low=None):
161 self._ssl_protocol._set_write_buffer_limits(high, low)
162 self._ssl_protocol._control_app_writing()
164 def get_write_buffer_limits(self):
165 return (self._ssl_protocol._outgoing_low_water,
166 self._ssl_protocol._outgoing_high_water)
168 def get_write_buffer_size(self):
170 return self._ssl_protocol._get_write_buffer_size()
172 def set_read_buffer_limits(self, high=None, low=None):
191 self._ssl_protocol._set_read_buffer_limits(high, low)
192 self._ssl_protocol._control_ssl_reading()
194 def get_read_buffer_limits(self):
195 return (self._ssl_protocol._incoming_low_water,
196 self._ssl_protocol._incoming_high_water)
198 def get_read_buffer_size(self):
200 return self._ssl_protocol._get_read_buffer_size()
203 def _protocol_paused(self):
205 return self._ssl_protocol._app_writing_paused
207 def write(self, data):
218 self._ssl_protocol._write_appdata((data,))
220 def writelines(self, list_of_data):
226 self._ssl_protocol._write_appdata(list_of_data)
228 def write_eof(self):
235 def can_write_eof(self):
239 def abort(self):
246 self._closed = True
247 if self._ssl_protocol is not None:
248 self._ssl_protocol._abort()
250 def _force_close(self, exc):
251 self._closed = True
252 self._ssl_protocol._abort(exc)
254 def _test__append_write_backlog(self, data):
256 self._ssl_protocol._write_backlog.append(data)
257 self._ssl_protocol._write_buffer_size += len(data)
267 def __init__(self, loop, app_protocol, sslcontext, waiter,
275 self._ssl_buffer = bytearray(self.max_size)
276 self._ssl_buffer_view = memoryview(self._ssl_buffer)
295 self._server_side = server_side
297 self._server_hostname = server_hostname
299 self._server_hostname = None
300 self._sslcontext = sslcontext
303 self._extra = dict(sslcontext=sslcontext)
306 self._write_backlog = collections.deque()
307 self._write_buffer_size = 0
309 self._waiter = waiter
310 self._loop = loop
311 self._set_app_protocol(app_protocol)
312 self._app_transport = None
313 self._app_transport_created = False
315 self._transport = None
316 self._ssl_handshake_timeout = ssl_handshake_timeout
317 self._ssl_shutdown_timeout = ssl_shutdown_timeout
319 self._incoming = ssl.MemoryBIO()
320 self._outgoing = ssl.MemoryBIO()
321 self._state = SSLProtocolState.UNWRAPPED
322 self._conn_lost = 0 # Set when connection_lost called
324 self._app_state = AppProtocolState.STATE_INIT
326 self._app_state = AppProtocolState.STATE_CON_MADE
327 self._sslobj = self._sslcontext.wrap_bio(
328 self._incoming, self._outgoing,
329 server_side=self._server_side,
330 server_hostname=self._server_hostname)
334 self._ssl_writing_paused = False
336 self._app_reading_paused = False
338 self._ssl_reading_paused = False
339 self._incoming_high_water = 0
340 self._incoming_low_water = 0
341 self._set_read_buffer_limits()
342 self._eof_received = False
344 self._app_writing_paused = False
345 self._outgoing_high_water = 0
346 self._outgoing_low_water = 0
347 self._set_write_buffer_limits()
348 self._get_app_transport()
350 def _set_app_protocol(self, app_protocol):
351 self._app_protocol = app_protocol
355 self._app_protocol_get_buffer = app_protocol.get_buffer
356 self._app_protocol_buffer_updated = app_protocol.buffer_updated
357 self._app_protocol_is_buffer = True
359 self._app_protocol_is_buffer = False
361 def _wakeup_waiter(self, exc=None):
362 if self._waiter is None:
364 if not self._waiter.cancelled():
366 self._waiter.set_exception(exc)
368 self._waiter.set_result(None)
369 self._waiter = None
371 def _get_app_transport(self):
372 if self._app_transport is None:
373 if self._app_transport_created:
375 self._app_transport = _SSLProtocolTransport(self._loop, self)
376 self._app_transport_created = True
377 return self._app_transport
379 def connection_made(self, transport):
384 self._transport = transport
385 self._start_handshake()
387 def connection_lost(self, exc):
394 self._write_backlog.clear()
395 self._outgoing.read()
396 self._conn_lost += 1
400 if self._app_transport is not None:
401 self._app_transport._closed = True
403 if self._state != SSLProtocolState.DO_HANDSHAKE:
405 self._app_state == AppProtocolState.STATE_CON_MADE or
406 self._app_state == AppProtocolState.STATE_EOF
408 self._app_state = AppProtocolState.STATE_CON_LOST
409 self._loop.call_soon(self._app_protocol.connection_lost, exc)
410 self._set_state(SSLProtocolState.UNWRAPPED)
411 self._transport = None
412 self._app_transport = None
413 self._app_protocol = None
414 self._wakeup_waiter(exc)
416 if self._shutdown_timeout_handle:
417 self._shutdown_timeout_handle.cancel()
418 self._shutdown_timeout_handle = None
419 if self._handshake_timeout_handle:
420 self._handshake_timeout_handle.cancel()
421 self._handshake_timeout_handle = None
423 def get_buffer(self, n):
425 if want <= 0 or want > self.max_size:
426 want = self.max_size
427 if len(self._ssl_buffer) < want:
428 self._ssl_buffer = bytearray(want)
429 self._ssl_buffer_view = memoryview(self._ssl_buffer)
430 return self._ssl_buffer_view
432 def buffer_updated(self, nbytes):
433 self._incoming.write(self._ssl_buffer_view[:nbytes])
435 if self._state == SSLProtocolState.DO_HANDSHAKE:
436 self._do_handshake()
438 elif self._state == SSLProtocolState.WRAPPED:
439 self._do_read()
441 elif self._state == SSLProtocolState.FLUSHING:
442 self._do_flush()
444 elif self._state == SSLProtocolState.SHUTDOWN:
445 self._do_shutdown()
447 def eof_received(self):
455 self._eof_received = True
457 if self._loop.get_debug():
458 logger.debug("%r received EOF", self)
460 if self._state == SSLProtocolState.DO_HANDSHAKE:
461 self._on_handshake_complete(ConnectionResetError)
463 elif self._state == SSLProtocolState.WRAPPED:
464 self._set_state(SSLProtocolState.FLUSHING)
465 if self._app_reading_paused:
468 self._do_flush()
470 elif self._state == SSLProtocolState.FLUSHING:
471 self._do_write()
472 self._set_state(SSLProtocolState.SHUTDOWN)
473 self._do_shutdown()
475 elif self._state == SSLProtocolState.SHUTDOWN:
476 self._do_shutdown()
479 self._transport.close()
482 def _get_extra_info(self, name, default=None):
483 if name in self._extra:
484 return self._extra[name]
485 elif self._transport is not None:
486 return self._transport.get_extra_info(name, default)
490 def _set_state(self, new_state):
497 self._state == SSLProtocolState.UNWRAPPED and
503 self._state == SSLProtocolState.DO_HANDSHAKE and
509 self._state == SSLProtocolState.WRAPPED and
515 self._state == SSLProtocolState.FLUSHING and
521 self._state = new_state
526 self._state, new_state))
530 def _start_handshake(self):
531 if self._loop.get_debug():
532 logger.debug("%r starts SSL handshake", self)
533 self._handshake_start_time = self._loop.time()
535 self._handshake_start_time = None
537 self._set_state(SSLProtocolState.DO_HANDSHAKE)
540 self._handshake_timeout_handle = \
541 self._loop.call_later(self._ssl_handshake_timeout,
542 lambda: self._check_handshake_timeout())
544 self._do_handshake()
546 def _check_handshake_timeout(self):
547 if self._state == SSLProtocolState.DO_HANDSHAKE:
550 f"{self._ssl_handshake_timeout} seconds: "
553 self._fatal_error(ConnectionAbortedError(msg))
555 def _do_handshake(self):
557 self._sslobj.do_handshake()
559 self._process_outgoing()
561 self._on_handshake_complete(exc)
563 self._on_handshake_complete(None)
565 def _on_handshake_complete(self, handshake_exc):
566 if self._handshake_timeout_handle is not None:
567 self._handshake_timeout_handle.cancel()
568 self._handshake_timeout_handle = None
570 sslobj = self._sslobj
573 self._set_state(SSLProtocolState.WRAPPED)
579 self._set_state(SSLProtocolState.UNWRAPPED)
584 self._fatal_error(exc, msg)
585 self._wakeup_waiter(exc)
588 if self._loop.get_debug():
589 dt = self._loop.time() - self._handshake_start_time
590 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
593 self._extra.update(peercert=peercert,
597 if self._app_state == AppProtocolState.STATE_INIT:
598 self._app_state = AppProtocolState.STATE_CON_MADE
599 self._app_protocol.connection_made(self._get_app_transport())
600 self._wakeup_waiter()
601 self._do_read()
605 def _start_shutdown(self):
607 self._state in (
614 if self._app_transport is not None:
615 self._app_transport._closed = True
616 if self._state == SSLProtocolState.DO_HANDSHAKE:
617 self._abort()
619 self._set_state(SSLProtocolState.FLUSHING)
620 self._shutdown_timeout_handle = self._loop.call_later(
621 self._ssl_shutdown_timeout,
622 lambda: self._check_shutdown_timeout()
624 self._do_flush()
626 def _check_shutdown_timeout(self):
628 self._state in (
633 self._transport._force_close(
636 def _do_flush(self):
637 self._do_read()
638 self._set_state(SSLProtocolState.SHUTDOWN)
639 self._do_shutdown()
641 def _do_shutdown(self):
643 if not self._eof_received:
644 self._sslobj.unwrap()
646 self._process_outgoing()
648 self._on_shutdown_complete(exc)
650 self._process_outgoing()
651 self._call_eof_received()
652 self._on_shutdown_complete(None)
654 def _on_shutdown_complete(self, shutdown_exc):
655 if self._shutdown_timeout_handle is not None:
656 self._shutdown_timeout_handle.cancel()
657 self._shutdown_timeout_handle = None
660 self._fatal_error(shutdown_exc)
662 self._loop.call_soon(self._transport.close)
664 def _abort(self):
665 self._set_state(SSLProtocolState.UNWRAPPED)
666 if self._transport is not None:
667 self._transport.abort()
671 def _write_appdata(self, list_of_data):
673 self._state in (
679 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
681 self._conn_lost += 1
685 self._write_backlog.append(data)
686 self._write_buffer_size += len(data)
689 if self._state == SSLProtocolState.WRAPPED:
690 self._do_write()
693 self._fatal_error(ex, 'Fatal error on SSL protocol')
695 def _do_write(self):
697 while self._write_backlog:
698 data = self._write_backlog[0]
699 count = self._sslobj.write(data)
702 self._write_backlog[0] = data[count:]
703 self._write_buffer_size -= count
705 del self._write_backlog[0]
706 self._write_buffer_size -= data_len
709 self._process_outgoing()
711 def _process_outgoing(self):
712 if not self._ssl_writing_paused:
713 data = self._outgoing.read()
715 self._transport.write(data)
716 self._control_app_writing()
720 def _do_read(self):
722 self._state not in (
729 if not self._app_reading_paused:
730 if self._app_protocol_is_buffer:
731 self._do_read__buffered()
733 self._do_read__copied()
734 if self._write_backlog:
735 self._do_write()
737 self._process_outgoing()
738 self._control_ssl_reading()
740 self._fatal_error(ex, 'Fatal error on SSL protocol')
742 def _do_read__buffered(self):
746 buf = self._app_protocol_get_buffer(self._get_read_buffer_size())
750 count = self._sslobj.read(wants, buf)
755 count = self._sslobj.read(wants - offset, buf[offset:])
761 self._loop.call_soon(lambda: self._do_read())
765 self._app_protocol_buffer_updated(offset)
768 self._call_eof_received()
769 self._start_shutdown()
771 def _do_read__copied(self):
778 chunk = self._sslobj.read(self.max_size)
793 self._app_protocol.data_received(first)
795 self._app_protocol.data_received(b''.join(data))
798 self._call_eof_received()
799 self._start_shutdown()
801 def _call_eof_received(self):
803 if self._app_state == AppProtocolState.STATE_CON_MADE:
804 self._app_state = AppProtocolState.STATE_EOF
805 keep_open = self._app_protocol.eof_received()
812 self._fatal_error(ex, 'Error calling eof_received()')
816 def _control_app_writing(self):
817 size = self._get_write_buffer_size()
818 if size >= self._outgoing_high_water and not self._app_writing_paused:
819 self._app_writing_paused = True
821 self._app_protocol.pause_writing()
825 self._loop.call_exception_handler({
828 'transport': self._app_transport,
829 'protocol': self,
831 elif size <= self._outgoing_low_water and self._app_writing_paused:
832 self._app_writing_paused = False
834 self._app_protocol.resume_writing()
838 self._loop.call_exception_handler({
841 'transport': self._app_transport,
842 'protocol': self,
845 def _get_write_buffer_size(self):
846 return self._outgoing.pending + self._write_buffer_size
848 def _set_write_buffer_limits(self, high=None, low=None):
851 self._outgoing_high_water = high
852 self._outgoing_low_water = low
856 def _pause_reading(self):
857 self._app_reading_paused = True
859 def _resume_reading(self):
860 if self._app_reading_paused:
861 self._app_reading_paused = False
864 if self._state == SSLProtocolState.WRAPPED:
865 self._do_read()
866 elif self._state == SSLProtocolState.FLUSHING:
867 self._do_flush()
868 elif self._state == SSLProtocolState.SHUTDOWN:
869 self._do_shutdown()
870 self._loop.call_soon(resume)
874 def _control_ssl_reading(self):
875 size = self._get_read_buffer_size()
876 if size >= self._incoming_high_water and not self._ssl_reading_paused:
877 self._ssl_reading_paused = True
878 self._transport.pause_reading()
879 elif size <= self._incoming_low_water and self._ssl_reading_paused:
880 self._ssl_reading_paused = False
881 self._transport.resume_reading()
883 def _set_read_buffer_limits(self, high=None, low=None):
886 self._incoming_high_water = high
887 self._incoming_low_water = low
889 def _get_read_buffer_size(self):
890 return self._incoming.pending
894 def pause_writing(self):
898 assert not self._ssl_writing_paused
899 self._ssl_writing_paused = True
901 def resume_writing(self):
905 assert self._ssl_writing_paused
906 self._ssl_writing_paused = False
907 self._process_outgoing()
909 def _fatal_error(self, exc, message='Fatal error on transport'):
910 if self._transport:
911 self._transport._force_close(exc)
914 if self._loop.get_debug():
915 logger.debug("%r: %s", self, message, exc_info=True)
917 self._loop.call_exception_handler({
920 'transport': self._transport,
921 'protocol': self,