17db96d56Sopenharmony_ciimport collections 27db96d56Sopenharmony_ciimport enum 37db96d56Sopenharmony_ciimport warnings 47db96d56Sopenharmony_citry: 57db96d56Sopenharmony_ci import ssl 67db96d56Sopenharmony_ciexcept ImportError: # pragma: no cover 77db96d56Sopenharmony_ci ssl = None 87db96d56Sopenharmony_ci 97db96d56Sopenharmony_cifrom . import constants 107db96d56Sopenharmony_cifrom . import exceptions 117db96d56Sopenharmony_cifrom . import protocols 127db96d56Sopenharmony_cifrom . import transports 137db96d56Sopenharmony_cifrom .log import logger 147db96d56Sopenharmony_ci 157db96d56Sopenharmony_ciif ssl is not None: 167db96d56Sopenharmony_ci SSLAgainErrors = (ssl.SSLWantReadError, ssl.SSLSyscallError) 177db96d56Sopenharmony_ci 187db96d56Sopenharmony_ci 197db96d56Sopenharmony_ciclass SSLProtocolState(enum.Enum): 207db96d56Sopenharmony_ci UNWRAPPED = "UNWRAPPED" 217db96d56Sopenharmony_ci DO_HANDSHAKE = "DO_HANDSHAKE" 227db96d56Sopenharmony_ci WRAPPED = "WRAPPED" 237db96d56Sopenharmony_ci FLUSHING = "FLUSHING" 247db96d56Sopenharmony_ci SHUTDOWN = "SHUTDOWN" 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ciclass AppProtocolState(enum.Enum): 287db96d56Sopenharmony_ci # This tracks the state of app protocol (https://git.io/fj59P): 297db96d56Sopenharmony_ci # 307db96d56Sopenharmony_ci # INIT -cm-> CON_MADE [-dr*->] [-er-> EOF?] -cl-> CON_LOST 317db96d56Sopenharmony_ci # 327db96d56Sopenharmony_ci # * cm: connection_made() 337db96d56Sopenharmony_ci # * dr: data_received() 347db96d56Sopenharmony_ci # * er: eof_received() 357db96d56Sopenharmony_ci # * cl: connection_lost() 367db96d56Sopenharmony_ci 377db96d56Sopenharmony_ci STATE_INIT = "STATE_INIT" 387db96d56Sopenharmony_ci STATE_CON_MADE = "STATE_CON_MADE" 397db96d56Sopenharmony_ci STATE_EOF = "STATE_EOF" 407db96d56Sopenharmony_ci STATE_CON_LOST = "STATE_CON_LOST" 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_cidef _create_transport_context(server_side, server_hostname): 447db96d56Sopenharmony_ci if server_side: 457db96d56Sopenharmony_ci raise ValueError('Server side SSL needs a valid SSLContext') 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_ci # Client side may pass ssl=True to use a default 487db96d56Sopenharmony_ci # context; in that case the sslcontext passed is None. 497db96d56Sopenharmony_ci # The default is secure for client connections. 507db96d56Sopenharmony_ci # Python 3.4+: use up-to-date strong settings. 517db96d56Sopenharmony_ci sslcontext = ssl.create_default_context() 527db96d56Sopenharmony_ci if not server_hostname: 537db96d56Sopenharmony_ci sslcontext.check_hostname = False 547db96d56Sopenharmony_ci return sslcontext 557db96d56Sopenharmony_ci 567db96d56Sopenharmony_ci 577db96d56Sopenharmony_cidef add_flowcontrol_defaults(high, low, kb): 587db96d56Sopenharmony_ci if high is None: 597db96d56Sopenharmony_ci if low is None: 607db96d56Sopenharmony_ci hi = kb * 1024 617db96d56Sopenharmony_ci else: 627db96d56Sopenharmony_ci lo = low 637db96d56Sopenharmony_ci hi = 4 * lo 647db96d56Sopenharmony_ci else: 657db96d56Sopenharmony_ci hi = high 667db96d56Sopenharmony_ci if low is None: 677db96d56Sopenharmony_ci lo = hi // 4 687db96d56Sopenharmony_ci else: 697db96d56Sopenharmony_ci lo = low 707db96d56Sopenharmony_ci 717db96d56Sopenharmony_ci if not hi >= lo >= 0: 727db96d56Sopenharmony_ci raise ValueError('high (%r) must be >= low (%r) must be >= 0' % 737db96d56Sopenharmony_ci (hi, lo)) 747db96d56Sopenharmony_ci 757db96d56Sopenharmony_ci return hi, lo 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ci 787db96d56Sopenharmony_ciclass _SSLProtocolTransport(transports._FlowControlMixin, 797db96d56Sopenharmony_ci transports.Transport): 807db96d56Sopenharmony_ci 817db96d56Sopenharmony_ci _start_tls_compatible = True 827db96d56Sopenharmony_ci _sendfile_compatible = constants._SendfileMode.FALLBACK 837db96d56Sopenharmony_ci 847db96d56Sopenharmony_ci def __init__(self, loop, ssl_protocol): 857db96d56Sopenharmony_ci self._loop = loop 867db96d56Sopenharmony_ci self._ssl_protocol = ssl_protocol 877db96d56Sopenharmony_ci self._closed = False 887db96d56Sopenharmony_ci 897db96d56Sopenharmony_ci def get_extra_info(self, name, default=None): 907db96d56Sopenharmony_ci """Get optional transport information.""" 917db96d56Sopenharmony_ci return self._ssl_protocol._get_extra_info(name, default) 927db96d56Sopenharmony_ci 937db96d56Sopenharmony_ci def set_protocol(self, protocol): 947db96d56Sopenharmony_ci self._ssl_protocol._set_app_protocol(protocol) 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_ci def get_protocol(self): 977db96d56Sopenharmony_ci return self._ssl_protocol._app_protocol 987db96d56Sopenharmony_ci 997db96d56Sopenharmony_ci def is_closing(self): 1007db96d56Sopenharmony_ci return self._closed 1017db96d56Sopenharmony_ci 1027db96d56Sopenharmony_ci def close(self): 1037db96d56Sopenharmony_ci """Close the transport. 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_ci Buffered data will be flushed asynchronously. No more data 1067db96d56Sopenharmony_ci will be received. After all buffered data is flushed, the 1077db96d56Sopenharmony_ci protocol's connection_lost() method will (eventually) called 1087db96d56Sopenharmony_ci with None as its argument. 1097db96d56Sopenharmony_ci """ 1107db96d56Sopenharmony_ci if not self._closed: 1117db96d56Sopenharmony_ci self._closed = True 1127db96d56Sopenharmony_ci self._ssl_protocol._start_shutdown() 1137db96d56Sopenharmony_ci else: 1147db96d56Sopenharmony_ci self._ssl_protocol = None 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci def __del__(self, _warnings=warnings): 1177db96d56Sopenharmony_ci if not self._closed: 1187db96d56Sopenharmony_ci self._closed = True 1197db96d56Sopenharmony_ci _warnings.warn( 1207db96d56Sopenharmony_ci "unclosed transport <asyncio._SSLProtocolTransport " 1217db96d56Sopenharmony_ci "object>", ResourceWarning) 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ci def is_reading(self): 1247db96d56Sopenharmony_ci return not self._ssl_protocol._app_reading_paused 1257db96d56Sopenharmony_ci 1267db96d56Sopenharmony_ci def pause_reading(self): 1277db96d56Sopenharmony_ci """Pause the receiving end. 1287db96d56Sopenharmony_ci 1297db96d56Sopenharmony_ci No data will be passed to the protocol's data_received() 1307db96d56Sopenharmony_ci method until resume_reading() is called. 1317db96d56Sopenharmony_ci """ 1327db96d56Sopenharmony_ci self._ssl_protocol._pause_reading() 1337db96d56Sopenharmony_ci 1347db96d56Sopenharmony_ci def resume_reading(self): 1357db96d56Sopenharmony_ci """Resume the receiving end. 1367db96d56Sopenharmony_ci 1377db96d56Sopenharmony_ci Data received will once again be passed to the protocol's 1387db96d56Sopenharmony_ci data_received() method. 1397db96d56Sopenharmony_ci """ 1407db96d56Sopenharmony_ci self._ssl_protocol._resume_reading() 1417db96d56Sopenharmony_ci 1427db96d56Sopenharmony_ci def set_write_buffer_limits(self, high=None, low=None): 1437db96d56Sopenharmony_ci """Set the high- and low-water limits for write flow control. 1447db96d56Sopenharmony_ci 1457db96d56Sopenharmony_ci These two values control when to call the protocol's 1467db96d56Sopenharmony_ci pause_writing() and resume_writing() methods. If specified, 1477db96d56Sopenharmony_ci the low-water limit must be less than or equal to the 1487db96d56Sopenharmony_ci high-water limit. Neither value can be negative. 1497db96d56Sopenharmony_ci 1507db96d56Sopenharmony_ci The defaults are implementation-specific. If only the 1517db96d56Sopenharmony_ci high-water limit is given, the low-water limit defaults to an 1527db96d56Sopenharmony_ci implementation-specific value less than or equal to the 1537db96d56Sopenharmony_ci high-water limit. Setting high to zero forces low to zero as 1547db96d56Sopenharmony_ci well, and causes pause_writing() to be called whenever the 1557db96d56Sopenharmony_ci buffer becomes non-empty. Setting low to zero causes 1567db96d56Sopenharmony_ci resume_writing() to be called only once the buffer is empty. 1577db96d56Sopenharmony_ci Use of zero for either limit is generally sub-optimal as it 1587db96d56Sopenharmony_ci reduces opportunities for doing I/O and computation 1597db96d56Sopenharmony_ci concurrently. 1607db96d56Sopenharmony_ci """ 1617db96d56Sopenharmony_ci self._ssl_protocol._set_write_buffer_limits(high, low) 1627db96d56Sopenharmony_ci self._ssl_protocol._control_app_writing() 1637db96d56Sopenharmony_ci 1647db96d56Sopenharmony_ci def get_write_buffer_limits(self): 1657db96d56Sopenharmony_ci return (self._ssl_protocol._outgoing_low_water, 1667db96d56Sopenharmony_ci self._ssl_protocol._outgoing_high_water) 1677db96d56Sopenharmony_ci 1687db96d56Sopenharmony_ci def get_write_buffer_size(self): 1697db96d56Sopenharmony_ci """Return the current size of the write buffers.""" 1707db96d56Sopenharmony_ci return self._ssl_protocol._get_write_buffer_size() 1717db96d56Sopenharmony_ci 1727db96d56Sopenharmony_ci def set_read_buffer_limits(self, high=None, low=None): 1737db96d56Sopenharmony_ci """Set the high- and low-water limits for read flow control. 1747db96d56Sopenharmony_ci 1757db96d56Sopenharmony_ci These two values control when to call the upstream transport's 1767db96d56Sopenharmony_ci pause_reading() and resume_reading() methods. If specified, 1777db96d56Sopenharmony_ci the low-water limit must be less than or equal to the 1787db96d56Sopenharmony_ci high-water limit. Neither value can be negative. 1797db96d56Sopenharmony_ci 1807db96d56Sopenharmony_ci The defaults are implementation-specific. If only the 1817db96d56Sopenharmony_ci high-water limit is given, the low-water limit defaults to an 1827db96d56Sopenharmony_ci implementation-specific value less than or equal to the 1837db96d56Sopenharmony_ci high-water limit. Setting high to zero forces low to zero as 1847db96d56Sopenharmony_ci well, and causes pause_reading() to be called whenever the 1857db96d56Sopenharmony_ci buffer becomes non-empty. Setting low to zero causes 1867db96d56Sopenharmony_ci resume_reading() to be called only once the buffer is empty. 1877db96d56Sopenharmony_ci Use of zero for either limit is generally sub-optimal as it 1887db96d56Sopenharmony_ci reduces opportunities for doing I/O and computation 1897db96d56Sopenharmony_ci concurrently. 1907db96d56Sopenharmony_ci """ 1917db96d56Sopenharmony_ci self._ssl_protocol._set_read_buffer_limits(high, low) 1927db96d56Sopenharmony_ci self._ssl_protocol._control_ssl_reading() 1937db96d56Sopenharmony_ci 1947db96d56Sopenharmony_ci def get_read_buffer_limits(self): 1957db96d56Sopenharmony_ci return (self._ssl_protocol._incoming_low_water, 1967db96d56Sopenharmony_ci self._ssl_protocol._incoming_high_water) 1977db96d56Sopenharmony_ci 1987db96d56Sopenharmony_ci def get_read_buffer_size(self): 1997db96d56Sopenharmony_ci """Return the current size of the read buffer.""" 2007db96d56Sopenharmony_ci return self._ssl_protocol._get_read_buffer_size() 2017db96d56Sopenharmony_ci 2027db96d56Sopenharmony_ci @property 2037db96d56Sopenharmony_ci def _protocol_paused(self): 2047db96d56Sopenharmony_ci # Required for sendfile fallback pause_writing/resume_writing logic 2057db96d56Sopenharmony_ci return self._ssl_protocol._app_writing_paused 2067db96d56Sopenharmony_ci 2077db96d56Sopenharmony_ci def write(self, data): 2087db96d56Sopenharmony_ci """Write some data bytes to the transport. 2097db96d56Sopenharmony_ci 2107db96d56Sopenharmony_ci This does not block; it buffers the data and arranges for it 2117db96d56Sopenharmony_ci to be sent out asynchronously. 2127db96d56Sopenharmony_ci """ 2137db96d56Sopenharmony_ci if not isinstance(data, (bytes, bytearray, memoryview)): 2147db96d56Sopenharmony_ci raise TypeError(f"data: expecting a bytes-like instance, " 2157db96d56Sopenharmony_ci f"got {type(data).__name__}") 2167db96d56Sopenharmony_ci if not data: 2177db96d56Sopenharmony_ci return 2187db96d56Sopenharmony_ci self._ssl_protocol._write_appdata((data,)) 2197db96d56Sopenharmony_ci 2207db96d56Sopenharmony_ci def writelines(self, list_of_data): 2217db96d56Sopenharmony_ci """Write a list (or any iterable) of data bytes to the transport. 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_ci The default implementation concatenates the arguments and 2247db96d56Sopenharmony_ci calls write() on the result. 2257db96d56Sopenharmony_ci """ 2267db96d56Sopenharmony_ci self._ssl_protocol._write_appdata(list_of_data) 2277db96d56Sopenharmony_ci 2287db96d56Sopenharmony_ci def write_eof(self): 2297db96d56Sopenharmony_ci """Close the write end after flushing buffered data. 2307db96d56Sopenharmony_ci 2317db96d56Sopenharmony_ci This raises :exc:`NotImplementedError` right now. 2327db96d56Sopenharmony_ci """ 2337db96d56Sopenharmony_ci raise NotImplementedError 2347db96d56Sopenharmony_ci 2357db96d56Sopenharmony_ci def can_write_eof(self): 2367db96d56Sopenharmony_ci """Return True if this transport supports write_eof(), False if not.""" 2377db96d56Sopenharmony_ci return False 2387db96d56Sopenharmony_ci 2397db96d56Sopenharmony_ci def abort(self): 2407db96d56Sopenharmony_ci """Close the transport immediately. 2417db96d56Sopenharmony_ci 2427db96d56Sopenharmony_ci Buffered data will be lost. No more data will be received. 2437db96d56Sopenharmony_ci The protocol's connection_lost() method will (eventually) be 2447db96d56Sopenharmony_ci called with None as its argument. 2457db96d56Sopenharmony_ci """ 2467db96d56Sopenharmony_ci self._closed = True 2477db96d56Sopenharmony_ci if self._ssl_protocol is not None: 2487db96d56Sopenharmony_ci self._ssl_protocol._abort() 2497db96d56Sopenharmony_ci 2507db96d56Sopenharmony_ci def _force_close(self, exc): 2517db96d56Sopenharmony_ci self._closed = True 2527db96d56Sopenharmony_ci self._ssl_protocol._abort(exc) 2537db96d56Sopenharmony_ci 2547db96d56Sopenharmony_ci def _test__append_write_backlog(self, data): 2557db96d56Sopenharmony_ci # for test only 2567db96d56Sopenharmony_ci self._ssl_protocol._write_backlog.append(data) 2577db96d56Sopenharmony_ci self._ssl_protocol._write_buffer_size += len(data) 2587db96d56Sopenharmony_ci 2597db96d56Sopenharmony_ci 2607db96d56Sopenharmony_ciclass SSLProtocol(protocols.BufferedProtocol): 2617db96d56Sopenharmony_ci max_size = 256 * 1024 # Buffer size passed to read() 2627db96d56Sopenharmony_ci 2637db96d56Sopenharmony_ci _handshake_start_time = None 2647db96d56Sopenharmony_ci _handshake_timeout_handle = None 2657db96d56Sopenharmony_ci _shutdown_timeout_handle = None 2667db96d56Sopenharmony_ci 2677db96d56Sopenharmony_ci def __init__(self, loop, app_protocol, sslcontext, waiter, 2687db96d56Sopenharmony_ci server_side=False, server_hostname=None, 2697db96d56Sopenharmony_ci call_connection_made=True, 2707db96d56Sopenharmony_ci ssl_handshake_timeout=None, 2717db96d56Sopenharmony_ci ssl_shutdown_timeout=None): 2727db96d56Sopenharmony_ci if ssl is None: 2737db96d56Sopenharmony_ci raise RuntimeError("stdlib ssl module not available") 2747db96d56Sopenharmony_ci 2757db96d56Sopenharmony_ci self._ssl_buffer = bytearray(self.max_size) 2767db96d56Sopenharmony_ci self._ssl_buffer_view = memoryview(self._ssl_buffer) 2777db96d56Sopenharmony_ci 2787db96d56Sopenharmony_ci if ssl_handshake_timeout is None: 2797db96d56Sopenharmony_ci ssl_handshake_timeout = constants.SSL_HANDSHAKE_TIMEOUT 2807db96d56Sopenharmony_ci elif ssl_handshake_timeout <= 0: 2817db96d56Sopenharmony_ci raise ValueError( 2827db96d56Sopenharmony_ci f"ssl_handshake_timeout should be a positive number, " 2837db96d56Sopenharmony_ci f"got {ssl_handshake_timeout}") 2847db96d56Sopenharmony_ci if ssl_shutdown_timeout is None: 2857db96d56Sopenharmony_ci ssl_shutdown_timeout = constants.SSL_SHUTDOWN_TIMEOUT 2867db96d56Sopenharmony_ci elif ssl_shutdown_timeout <= 0: 2877db96d56Sopenharmony_ci raise ValueError( 2887db96d56Sopenharmony_ci f"ssl_shutdown_timeout should be a positive number, " 2897db96d56Sopenharmony_ci f"got {ssl_shutdown_timeout}") 2907db96d56Sopenharmony_ci 2917db96d56Sopenharmony_ci if not sslcontext: 2927db96d56Sopenharmony_ci sslcontext = _create_transport_context( 2937db96d56Sopenharmony_ci server_side, server_hostname) 2947db96d56Sopenharmony_ci 2957db96d56Sopenharmony_ci self._server_side = server_side 2967db96d56Sopenharmony_ci if server_hostname and not server_side: 2977db96d56Sopenharmony_ci self._server_hostname = server_hostname 2987db96d56Sopenharmony_ci else: 2997db96d56Sopenharmony_ci self._server_hostname = None 3007db96d56Sopenharmony_ci self._sslcontext = sslcontext 3017db96d56Sopenharmony_ci # SSL-specific extra info. More info are set when the handshake 3027db96d56Sopenharmony_ci # completes. 3037db96d56Sopenharmony_ci self._extra = dict(sslcontext=sslcontext) 3047db96d56Sopenharmony_ci 3057db96d56Sopenharmony_ci # App data write buffering 3067db96d56Sopenharmony_ci self._write_backlog = collections.deque() 3077db96d56Sopenharmony_ci self._write_buffer_size = 0 3087db96d56Sopenharmony_ci 3097db96d56Sopenharmony_ci self._waiter = waiter 3107db96d56Sopenharmony_ci self._loop = loop 3117db96d56Sopenharmony_ci self._set_app_protocol(app_protocol) 3127db96d56Sopenharmony_ci self._app_transport = None 3137db96d56Sopenharmony_ci self._app_transport_created = False 3147db96d56Sopenharmony_ci # transport, ex: SelectorSocketTransport 3157db96d56Sopenharmony_ci self._transport = None 3167db96d56Sopenharmony_ci self._ssl_handshake_timeout = ssl_handshake_timeout 3177db96d56Sopenharmony_ci self._ssl_shutdown_timeout = ssl_shutdown_timeout 3187db96d56Sopenharmony_ci # SSL and state machine 3197db96d56Sopenharmony_ci self._incoming = ssl.MemoryBIO() 3207db96d56Sopenharmony_ci self._outgoing = ssl.MemoryBIO() 3217db96d56Sopenharmony_ci self._state = SSLProtocolState.UNWRAPPED 3227db96d56Sopenharmony_ci self._conn_lost = 0 # Set when connection_lost called 3237db96d56Sopenharmony_ci if call_connection_made: 3247db96d56Sopenharmony_ci self._app_state = AppProtocolState.STATE_INIT 3257db96d56Sopenharmony_ci else: 3267db96d56Sopenharmony_ci self._app_state = AppProtocolState.STATE_CON_MADE 3277db96d56Sopenharmony_ci self._sslobj = self._sslcontext.wrap_bio( 3287db96d56Sopenharmony_ci self._incoming, self._outgoing, 3297db96d56Sopenharmony_ci server_side=self._server_side, 3307db96d56Sopenharmony_ci server_hostname=self._server_hostname) 3317db96d56Sopenharmony_ci 3327db96d56Sopenharmony_ci # Flow Control 3337db96d56Sopenharmony_ci 3347db96d56Sopenharmony_ci self._ssl_writing_paused = False 3357db96d56Sopenharmony_ci 3367db96d56Sopenharmony_ci self._app_reading_paused = False 3377db96d56Sopenharmony_ci 3387db96d56Sopenharmony_ci self._ssl_reading_paused = False 3397db96d56Sopenharmony_ci self._incoming_high_water = 0 3407db96d56Sopenharmony_ci self._incoming_low_water = 0 3417db96d56Sopenharmony_ci self._set_read_buffer_limits() 3427db96d56Sopenharmony_ci self._eof_received = False 3437db96d56Sopenharmony_ci 3447db96d56Sopenharmony_ci self._app_writing_paused = False 3457db96d56Sopenharmony_ci self._outgoing_high_water = 0 3467db96d56Sopenharmony_ci self._outgoing_low_water = 0 3477db96d56Sopenharmony_ci self._set_write_buffer_limits() 3487db96d56Sopenharmony_ci self._get_app_transport() 3497db96d56Sopenharmony_ci 3507db96d56Sopenharmony_ci def _set_app_protocol(self, app_protocol): 3517db96d56Sopenharmony_ci self._app_protocol = app_protocol 3527db96d56Sopenharmony_ci # Make fast hasattr check first 3537db96d56Sopenharmony_ci if (hasattr(app_protocol, 'get_buffer') and 3547db96d56Sopenharmony_ci isinstance(app_protocol, protocols.BufferedProtocol)): 3557db96d56Sopenharmony_ci self._app_protocol_get_buffer = app_protocol.get_buffer 3567db96d56Sopenharmony_ci self._app_protocol_buffer_updated = app_protocol.buffer_updated 3577db96d56Sopenharmony_ci self._app_protocol_is_buffer = True 3587db96d56Sopenharmony_ci else: 3597db96d56Sopenharmony_ci self._app_protocol_is_buffer = False 3607db96d56Sopenharmony_ci 3617db96d56Sopenharmony_ci def _wakeup_waiter(self, exc=None): 3627db96d56Sopenharmony_ci if self._waiter is None: 3637db96d56Sopenharmony_ci return 3647db96d56Sopenharmony_ci if not self._waiter.cancelled(): 3657db96d56Sopenharmony_ci if exc is not None: 3667db96d56Sopenharmony_ci self._waiter.set_exception(exc) 3677db96d56Sopenharmony_ci else: 3687db96d56Sopenharmony_ci self._waiter.set_result(None) 3697db96d56Sopenharmony_ci self._waiter = None 3707db96d56Sopenharmony_ci 3717db96d56Sopenharmony_ci def _get_app_transport(self): 3727db96d56Sopenharmony_ci if self._app_transport is None: 3737db96d56Sopenharmony_ci if self._app_transport_created: 3747db96d56Sopenharmony_ci raise RuntimeError('Creating _SSLProtocolTransport twice') 3757db96d56Sopenharmony_ci self._app_transport = _SSLProtocolTransport(self._loop, self) 3767db96d56Sopenharmony_ci self._app_transport_created = True 3777db96d56Sopenharmony_ci return self._app_transport 3787db96d56Sopenharmony_ci 3797db96d56Sopenharmony_ci def connection_made(self, transport): 3807db96d56Sopenharmony_ci """Called when the low-level connection is made. 3817db96d56Sopenharmony_ci 3827db96d56Sopenharmony_ci Start the SSL handshake. 3837db96d56Sopenharmony_ci """ 3847db96d56Sopenharmony_ci self._transport = transport 3857db96d56Sopenharmony_ci self._start_handshake() 3867db96d56Sopenharmony_ci 3877db96d56Sopenharmony_ci def connection_lost(self, exc): 3887db96d56Sopenharmony_ci """Called when the low-level connection is lost or closed. 3897db96d56Sopenharmony_ci 3907db96d56Sopenharmony_ci The argument is an exception object or None (the latter 3917db96d56Sopenharmony_ci meaning a regular EOF is received or the connection was 3927db96d56Sopenharmony_ci aborted or closed). 3937db96d56Sopenharmony_ci """ 3947db96d56Sopenharmony_ci self._write_backlog.clear() 3957db96d56Sopenharmony_ci self._outgoing.read() 3967db96d56Sopenharmony_ci self._conn_lost += 1 3977db96d56Sopenharmony_ci 3987db96d56Sopenharmony_ci # Just mark the app transport as closed so that its __dealloc__ 3997db96d56Sopenharmony_ci # doesn't complain. 4007db96d56Sopenharmony_ci if self._app_transport is not None: 4017db96d56Sopenharmony_ci self._app_transport._closed = True 4027db96d56Sopenharmony_ci 4037db96d56Sopenharmony_ci if self._state != SSLProtocolState.DO_HANDSHAKE: 4047db96d56Sopenharmony_ci if ( 4057db96d56Sopenharmony_ci self._app_state == AppProtocolState.STATE_CON_MADE or 4067db96d56Sopenharmony_ci self._app_state == AppProtocolState.STATE_EOF 4077db96d56Sopenharmony_ci ): 4087db96d56Sopenharmony_ci self._app_state = AppProtocolState.STATE_CON_LOST 4097db96d56Sopenharmony_ci self._loop.call_soon(self._app_protocol.connection_lost, exc) 4107db96d56Sopenharmony_ci self._set_state(SSLProtocolState.UNWRAPPED) 4117db96d56Sopenharmony_ci self._transport = None 4127db96d56Sopenharmony_ci self._app_transport = None 4137db96d56Sopenharmony_ci self._app_protocol = None 4147db96d56Sopenharmony_ci self._wakeup_waiter(exc) 4157db96d56Sopenharmony_ci 4167db96d56Sopenharmony_ci if self._shutdown_timeout_handle: 4177db96d56Sopenharmony_ci self._shutdown_timeout_handle.cancel() 4187db96d56Sopenharmony_ci self._shutdown_timeout_handle = None 4197db96d56Sopenharmony_ci if self._handshake_timeout_handle: 4207db96d56Sopenharmony_ci self._handshake_timeout_handle.cancel() 4217db96d56Sopenharmony_ci self._handshake_timeout_handle = None 4227db96d56Sopenharmony_ci 4237db96d56Sopenharmony_ci def get_buffer(self, n): 4247db96d56Sopenharmony_ci want = n 4257db96d56Sopenharmony_ci if want <= 0 or want > self.max_size: 4267db96d56Sopenharmony_ci want = self.max_size 4277db96d56Sopenharmony_ci if len(self._ssl_buffer) < want: 4287db96d56Sopenharmony_ci self._ssl_buffer = bytearray(want) 4297db96d56Sopenharmony_ci self._ssl_buffer_view = memoryview(self._ssl_buffer) 4307db96d56Sopenharmony_ci return self._ssl_buffer_view 4317db96d56Sopenharmony_ci 4327db96d56Sopenharmony_ci def buffer_updated(self, nbytes): 4337db96d56Sopenharmony_ci self._incoming.write(self._ssl_buffer_view[:nbytes]) 4347db96d56Sopenharmony_ci 4357db96d56Sopenharmony_ci if self._state == SSLProtocolState.DO_HANDSHAKE: 4367db96d56Sopenharmony_ci self._do_handshake() 4377db96d56Sopenharmony_ci 4387db96d56Sopenharmony_ci elif self._state == SSLProtocolState.WRAPPED: 4397db96d56Sopenharmony_ci self._do_read() 4407db96d56Sopenharmony_ci 4417db96d56Sopenharmony_ci elif self._state == SSLProtocolState.FLUSHING: 4427db96d56Sopenharmony_ci self._do_flush() 4437db96d56Sopenharmony_ci 4447db96d56Sopenharmony_ci elif self._state == SSLProtocolState.SHUTDOWN: 4457db96d56Sopenharmony_ci self._do_shutdown() 4467db96d56Sopenharmony_ci 4477db96d56Sopenharmony_ci def eof_received(self): 4487db96d56Sopenharmony_ci """Called when the other end of the low-level stream 4497db96d56Sopenharmony_ci is half-closed. 4507db96d56Sopenharmony_ci 4517db96d56Sopenharmony_ci If this returns a false value (including None), the transport 4527db96d56Sopenharmony_ci will close itself. If it returns a true value, closing the 4537db96d56Sopenharmony_ci transport is up to the protocol. 4547db96d56Sopenharmony_ci """ 4557db96d56Sopenharmony_ci self._eof_received = True 4567db96d56Sopenharmony_ci try: 4577db96d56Sopenharmony_ci if self._loop.get_debug(): 4587db96d56Sopenharmony_ci logger.debug("%r received EOF", self) 4597db96d56Sopenharmony_ci 4607db96d56Sopenharmony_ci if self._state == SSLProtocolState.DO_HANDSHAKE: 4617db96d56Sopenharmony_ci self._on_handshake_complete(ConnectionResetError) 4627db96d56Sopenharmony_ci 4637db96d56Sopenharmony_ci elif self._state == SSLProtocolState.WRAPPED: 4647db96d56Sopenharmony_ci self._set_state(SSLProtocolState.FLUSHING) 4657db96d56Sopenharmony_ci if self._app_reading_paused: 4667db96d56Sopenharmony_ci return True 4677db96d56Sopenharmony_ci else: 4687db96d56Sopenharmony_ci self._do_flush() 4697db96d56Sopenharmony_ci 4707db96d56Sopenharmony_ci elif self._state == SSLProtocolState.FLUSHING: 4717db96d56Sopenharmony_ci self._do_write() 4727db96d56Sopenharmony_ci self._set_state(SSLProtocolState.SHUTDOWN) 4737db96d56Sopenharmony_ci self._do_shutdown() 4747db96d56Sopenharmony_ci 4757db96d56Sopenharmony_ci elif self._state == SSLProtocolState.SHUTDOWN: 4767db96d56Sopenharmony_ci self._do_shutdown() 4777db96d56Sopenharmony_ci 4787db96d56Sopenharmony_ci except Exception: 4797db96d56Sopenharmony_ci self._transport.close() 4807db96d56Sopenharmony_ci raise 4817db96d56Sopenharmony_ci 4827db96d56Sopenharmony_ci def _get_extra_info(self, name, default=None): 4837db96d56Sopenharmony_ci if name in self._extra: 4847db96d56Sopenharmony_ci return self._extra[name] 4857db96d56Sopenharmony_ci elif self._transport is not None: 4867db96d56Sopenharmony_ci return self._transport.get_extra_info(name, default) 4877db96d56Sopenharmony_ci else: 4887db96d56Sopenharmony_ci return default 4897db96d56Sopenharmony_ci 4907db96d56Sopenharmony_ci def _set_state(self, new_state): 4917db96d56Sopenharmony_ci allowed = False 4927db96d56Sopenharmony_ci 4937db96d56Sopenharmony_ci if new_state == SSLProtocolState.UNWRAPPED: 4947db96d56Sopenharmony_ci allowed = True 4957db96d56Sopenharmony_ci 4967db96d56Sopenharmony_ci elif ( 4977db96d56Sopenharmony_ci self._state == SSLProtocolState.UNWRAPPED and 4987db96d56Sopenharmony_ci new_state == SSLProtocolState.DO_HANDSHAKE 4997db96d56Sopenharmony_ci ): 5007db96d56Sopenharmony_ci allowed = True 5017db96d56Sopenharmony_ci 5027db96d56Sopenharmony_ci elif ( 5037db96d56Sopenharmony_ci self._state == SSLProtocolState.DO_HANDSHAKE and 5047db96d56Sopenharmony_ci new_state == SSLProtocolState.WRAPPED 5057db96d56Sopenharmony_ci ): 5067db96d56Sopenharmony_ci allowed = True 5077db96d56Sopenharmony_ci 5087db96d56Sopenharmony_ci elif ( 5097db96d56Sopenharmony_ci self._state == SSLProtocolState.WRAPPED and 5107db96d56Sopenharmony_ci new_state == SSLProtocolState.FLUSHING 5117db96d56Sopenharmony_ci ): 5127db96d56Sopenharmony_ci allowed = True 5137db96d56Sopenharmony_ci 5147db96d56Sopenharmony_ci elif ( 5157db96d56Sopenharmony_ci self._state == SSLProtocolState.FLUSHING and 5167db96d56Sopenharmony_ci new_state == SSLProtocolState.SHUTDOWN 5177db96d56Sopenharmony_ci ): 5187db96d56Sopenharmony_ci allowed = True 5197db96d56Sopenharmony_ci 5207db96d56Sopenharmony_ci if allowed: 5217db96d56Sopenharmony_ci self._state = new_state 5227db96d56Sopenharmony_ci 5237db96d56Sopenharmony_ci else: 5247db96d56Sopenharmony_ci raise RuntimeError( 5257db96d56Sopenharmony_ci 'cannot switch state from {} to {}'.format( 5267db96d56Sopenharmony_ci self._state, new_state)) 5277db96d56Sopenharmony_ci 5287db96d56Sopenharmony_ci # Handshake flow 5297db96d56Sopenharmony_ci 5307db96d56Sopenharmony_ci def _start_handshake(self): 5317db96d56Sopenharmony_ci if self._loop.get_debug(): 5327db96d56Sopenharmony_ci logger.debug("%r starts SSL handshake", self) 5337db96d56Sopenharmony_ci self._handshake_start_time = self._loop.time() 5347db96d56Sopenharmony_ci else: 5357db96d56Sopenharmony_ci self._handshake_start_time = None 5367db96d56Sopenharmony_ci 5377db96d56Sopenharmony_ci self._set_state(SSLProtocolState.DO_HANDSHAKE) 5387db96d56Sopenharmony_ci 5397db96d56Sopenharmony_ci # start handshake timeout count down 5407db96d56Sopenharmony_ci self._handshake_timeout_handle = \ 5417db96d56Sopenharmony_ci self._loop.call_later(self._ssl_handshake_timeout, 5427db96d56Sopenharmony_ci lambda: self._check_handshake_timeout()) 5437db96d56Sopenharmony_ci 5447db96d56Sopenharmony_ci self._do_handshake() 5457db96d56Sopenharmony_ci 5467db96d56Sopenharmony_ci def _check_handshake_timeout(self): 5477db96d56Sopenharmony_ci if self._state == SSLProtocolState.DO_HANDSHAKE: 5487db96d56Sopenharmony_ci msg = ( 5497db96d56Sopenharmony_ci f"SSL handshake is taking longer than " 5507db96d56Sopenharmony_ci f"{self._ssl_handshake_timeout} seconds: " 5517db96d56Sopenharmony_ci f"aborting the connection" 5527db96d56Sopenharmony_ci ) 5537db96d56Sopenharmony_ci self._fatal_error(ConnectionAbortedError(msg)) 5547db96d56Sopenharmony_ci 5557db96d56Sopenharmony_ci def _do_handshake(self): 5567db96d56Sopenharmony_ci try: 5577db96d56Sopenharmony_ci self._sslobj.do_handshake() 5587db96d56Sopenharmony_ci except SSLAgainErrors: 5597db96d56Sopenharmony_ci self._process_outgoing() 5607db96d56Sopenharmony_ci except ssl.SSLError as exc: 5617db96d56Sopenharmony_ci self._on_handshake_complete(exc) 5627db96d56Sopenharmony_ci else: 5637db96d56Sopenharmony_ci self._on_handshake_complete(None) 5647db96d56Sopenharmony_ci 5657db96d56Sopenharmony_ci def _on_handshake_complete(self, handshake_exc): 5667db96d56Sopenharmony_ci if self._handshake_timeout_handle is not None: 5677db96d56Sopenharmony_ci self._handshake_timeout_handle.cancel() 5687db96d56Sopenharmony_ci self._handshake_timeout_handle = None 5697db96d56Sopenharmony_ci 5707db96d56Sopenharmony_ci sslobj = self._sslobj 5717db96d56Sopenharmony_ci try: 5727db96d56Sopenharmony_ci if handshake_exc is None: 5737db96d56Sopenharmony_ci self._set_state(SSLProtocolState.WRAPPED) 5747db96d56Sopenharmony_ci else: 5757db96d56Sopenharmony_ci raise handshake_exc 5767db96d56Sopenharmony_ci 5777db96d56Sopenharmony_ci peercert = sslobj.getpeercert() 5787db96d56Sopenharmony_ci except Exception as exc: 5797db96d56Sopenharmony_ci self._set_state(SSLProtocolState.UNWRAPPED) 5807db96d56Sopenharmony_ci if isinstance(exc, ssl.CertificateError): 5817db96d56Sopenharmony_ci msg = 'SSL handshake failed on verifying the certificate' 5827db96d56Sopenharmony_ci else: 5837db96d56Sopenharmony_ci msg = 'SSL handshake failed' 5847db96d56Sopenharmony_ci self._fatal_error(exc, msg) 5857db96d56Sopenharmony_ci self._wakeup_waiter(exc) 5867db96d56Sopenharmony_ci return 5877db96d56Sopenharmony_ci 5887db96d56Sopenharmony_ci if self._loop.get_debug(): 5897db96d56Sopenharmony_ci dt = self._loop.time() - self._handshake_start_time 5907db96d56Sopenharmony_ci logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3) 5917db96d56Sopenharmony_ci 5927db96d56Sopenharmony_ci # Add extra info that becomes available after handshake. 5937db96d56Sopenharmony_ci self._extra.update(peercert=peercert, 5947db96d56Sopenharmony_ci cipher=sslobj.cipher(), 5957db96d56Sopenharmony_ci compression=sslobj.compression(), 5967db96d56Sopenharmony_ci ssl_object=sslobj) 5977db96d56Sopenharmony_ci if self._app_state == AppProtocolState.STATE_INIT: 5987db96d56Sopenharmony_ci self._app_state = AppProtocolState.STATE_CON_MADE 5997db96d56Sopenharmony_ci self._app_protocol.connection_made(self._get_app_transport()) 6007db96d56Sopenharmony_ci self._wakeup_waiter() 6017db96d56Sopenharmony_ci self._do_read() 6027db96d56Sopenharmony_ci 6037db96d56Sopenharmony_ci # Shutdown flow 6047db96d56Sopenharmony_ci 6057db96d56Sopenharmony_ci def _start_shutdown(self): 6067db96d56Sopenharmony_ci if ( 6077db96d56Sopenharmony_ci self._state in ( 6087db96d56Sopenharmony_ci SSLProtocolState.FLUSHING, 6097db96d56Sopenharmony_ci SSLProtocolState.SHUTDOWN, 6107db96d56Sopenharmony_ci SSLProtocolState.UNWRAPPED 6117db96d56Sopenharmony_ci ) 6127db96d56Sopenharmony_ci ): 6137db96d56Sopenharmony_ci return 6147db96d56Sopenharmony_ci if self._app_transport is not None: 6157db96d56Sopenharmony_ci self._app_transport._closed = True 6167db96d56Sopenharmony_ci if self._state == SSLProtocolState.DO_HANDSHAKE: 6177db96d56Sopenharmony_ci self._abort() 6187db96d56Sopenharmony_ci else: 6197db96d56Sopenharmony_ci self._set_state(SSLProtocolState.FLUSHING) 6207db96d56Sopenharmony_ci self._shutdown_timeout_handle = self._loop.call_later( 6217db96d56Sopenharmony_ci self._ssl_shutdown_timeout, 6227db96d56Sopenharmony_ci lambda: self._check_shutdown_timeout() 6237db96d56Sopenharmony_ci ) 6247db96d56Sopenharmony_ci self._do_flush() 6257db96d56Sopenharmony_ci 6267db96d56Sopenharmony_ci def _check_shutdown_timeout(self): 6277db96d56Sopenharmony_ci if ( 6287db96d56Sopenharmony_ci self._state in ( 6297db96d56Sopenharmony_ci SSLProtocolState.FLUSHING, 6307db96d56Sopenharmony_ci SSLProtocolState.SHUTDOWN 6317db96d56Sopenharmony_ci ) 6327db96d56Sopenharmony_ci ): 6337db96d56Sopenharmony_ci self._transport._force_close( 6347db96d56Sopenharmony_ci exceptions.TimeoutError('SSL shutdown timed out')) 6357db96d56Sopenharmony_ci 6367db96d56Sopenharmony_ci def _do_flush(self): 6377db96d56Sopenharmony_ci self._do_read() 6387db96d56Sopenharmony_ci self._set_state(SSLProtocolState.SHUTDOWN) 6397db96d56Sopenharmony_ci self._do_shutdown() 6407db96d56Sopenharmony_ci 6417db96d56Sopenharmony_ci def _do_shutdown(self): 6427db96d56Sopenharmony_ci try: 6437db96d56Sopenharmony_ci if not self._eof_received: 6447db96d56Sopenharmony_ci self._sslobj.unwrap() 6457db96d56Sopenharmony_ci except SSLAgainErrors: 6467db96d56Sopenharmony_ci self._process_outgoing() 6477db96d56Sopenharmony_ci except ssl.SSLError as exc: 6487db96d56Sopenharmony_ci self._on_shutdown_complete(exc) 6497db96d56Sopenharmony_ci else: 6507db96d56Sopenharmony_ci self._process_outgoing() 6517db96d56Sopenharmony_ci self._call_eof_received() 6527db96d56Sopenharmony_ci self._on_shutdown_complete(None) 6537db96d56Sopenharmony_ci 6547db96d56Sopenharmony_ci def _on_shutdown_complete(self, shutdown_exc): 6557db96d56Sopenharmony_ci if self._shutdown_timeout_handle is not None: 6567db96d56Sopenharmony_ci self._shutdown_timeout_handle.cancel() 6577db96d56Sopenharmony_ci self._shutdown_timeout_handle = None 6587db96d56Sopenharmony_ci 6597db96d56Sopenharmony_ci if shutdown_exc: 6607db96d56Sopenharmony_ci self._fatal_error(shutdown_exc) 6617db96d56Sopenharmony_ci else: 6627db96d56Sopenharmony_ci self._loop.call_soon(self._transport.close) 6637db96d56Sopenharmony_ci 6647db96d56Sopenharmony_ci def _abort(self): 6657db96d56Sopenharmony_ci self._set_state(SSLProtocolState.UNWRAPPED) 6667db96d56Sopenharmony_ci if self._transport is not None: 6677db96d56Sopenharmony_ci self._transport.abort() 6687db96d56Sopenharmony_ci 6697db96d56Sopenharmony_ci # Outgoing flow 6707db96d56Sopenharmony_ci 6717db96d56Sopenharmony_ci def _write_appdata(self, list_of_data): 6727db96d56Sopenharmony_ci if ( 6737db96d56Sopenharmony_ci self._state in ( 6747db96d56Sopenharmony_ci SSLProtocolState.FLUSHING, 6757db96d56Sopenharmony_ci SSLProtocolState.SHUTDOWN, 6767db96d56Sopenharmony_ci SSLProtocolState.UNWRAPPED 6777db96d56Sopenharmony_ci ) 6787db96d56Sopenharmony_ci ): 6797db96d56Sopenharmony_ci if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 6807db96d56Sopenharmony_ci logger.warning('SSL connection is closed') 6817db96d56Sopenharmony_ci self._conn_lost += 1 6827db96d56Sopenharmony_ci return 6837db96d56Sopenharmony_ci 6847db96d56Sopenharmony_ci for data in list_of_data: 6857db96d56Sopenharmony_ci self._write_backlog.append(data) 6867db96d56Sopenharmony_ci self._write_buffer_size += len(data) 6877db96d56Sopenharmony_ci 6887db96d56Sopenharmony_ci try: 6897db96d56Sopenharmony_ci if self._state == SSLProtocolState.WRAPPED: 6907db96d56Sopenharmony_ci self._do_write() 6917db96d56Sopenharmony_ci 6927db96d56Sopenharmony_ci except Exception as ex: 6937db96d56Sopenharmony_ci self._fatal_error(ex, 'Fatal error on SSL protocol') 6947db96d56Sopenharmony_ci 6957db96d56Sopenharmony_ci def _do_write(self): 6967db96d56Sopenharmony_ci try: 6977db96d56Sopenharmony_ci while self._write_backlog: 6987db96d56Sopenharmony_ci data = self._write_backlog[0] 6997db96d56Sopenharmony_ci count = self._sslobj.write(data) 7007db96d56Sopenharmony_ci data_len = len(data) 7017db96d56Sopenharmony_ci if count < data_len: 7027db96d56Sopenharmony_ci self._write_backlog[0] = data[count:] 7037db96d56Sopenharmony_ci self._write_buffer_size -= count 7047db96d56Sopenharmony_ci else: 7057db96d56Sopenharmony_ci del self._write_backlog[0] 7067db96d56Sopenharmony_ci self._write_buffer_size -= data_len 7077db96d56Sopenharmony_ci except SSLAgainErrors: 7087db96d56Sopenharmony_ci pass 7097db96d56Sopenharmony_ci self._process_outgoing() 7107db96d56Sopenharmony_ci 7117db96d56Sopenharmony_ci def _process_outgoing(self): 7127db96d56Sopenharmony_ci if not self._ssl_writing_paused: 7137db96d56Sopenharmony_ci data = self._outgoing.read() 7147db96d56Sopenharmony_ci if len(data): 7157db96d56Sopenharmony_ci self._transport.write(data) 7167db96d56Sopenharmony_ci self._control_app_writing() 7177db96d56Sopenharmony_ci 7187db96d56Sopenharmony_ci # Incoming flow 7197db96d56Sopenharmony_ci 7207db96d56Sopenharmony_ci def _do_read(self): 7217db96d56Sopenharmony_ci if ( 7227db96d56Sopenharmony_ci self._state not in ( 7237db96d56Sopenharmony_ci SSLProtocolState.WRAPPED, 7247db96d56Sopenharmony_ci SSLProtocolState.FLUSHING, 7257db96d56Sopenharmony_ci ) 7267db96d56Sopenharmony_ci ): 7277db96d56Sopenharmony_ci return 7287db96d56Sopenharmony_ci try: 7297db96d56Sopenharmony_ci if not self._app_reading_paused: 7307db96d56Sopenharmony_ci if self._app_protocol_is_buffer: 7317db96d56Sopenharmony_ci self._do_read__buffered() 7327db96d56Sopenharmony_ci else: 7337db96d56Sopenharmony_ci self._do_read__copied() 7347db96d56Sopenharmony_ci if self._write_backlog: 7357db96d56Sopenharmony_ci self._do_write() 7367db96d56Sopenharmony_ci else: 7377db96d56Sopenharmony_ci self._process_outgoing() 7387db96d56Sopenharmony_ci self._control_ssl_reading() 7397db96d56Sopenharmony_ci except Exception as ex: 7407db96d56Sopenharmony_ci self._fatal_error(ex, 'Fatal error on SSL protocol') 7417db96d56Sopenharmony_ci 7427db96d56Sopenharmony_ci def _do_read__buffered(self): 7437db96d56Sopenharmony_ci offset = 0 7447db96d56Sopenharmony_ci count = 1 7457db96d56Sopenharmony_ci 7467db96d56Sopenharmony_ci buf = self._app_protocol_get_buffer(self._get_read_buffer_size()) 7477db96d56Sopenharmony_ci wants = len(buf) 7487db96d56Sopenharmony_ci 7497db96d56Sopenharmony_ci try: 7507db96d56Sopenharmony_ci count = self._sslobj.read(wants, buf) 7517db96d56Sopenharmony_ci 7527db96d56Sopenharmony_ci if count > 0: 7537db96d56Sopenharmony_ci offset = count 7547db96d56Sopenharmony_ci while offset < wants: 7557db96d56Sopenharmony_ci count = self._sslobj.read(wants - offset, buf[offset:]) 7567db96d56Sopenharmony_ci if count > 0: 7577db96d56Sopenharmony_ci offset += count 7587db96d56Sopenharmony_ci else: 7597db96d56Sopenharmony_ci break 7607db96d56Sopenharmony_ci else: 7617db96d56Sopenharmony_ci self._loop.call_soon(lambda: self._do_read()) 7627db96d56Sopenharmony_ci except SSLAgainErrors: 7637db96d56Sopenharmony_ci pass 7647db96d56Sopenharmony_ci if offset > 0: 7657db96d56Sopenharmony_ci self._app_protocol_buffer_updated(offset) 7667db96d56Sopenharmony_ci if not count: 7677db96d56Sopenharmony_ci # close_notify 7687db96d56Sopenharmony_ci self._call_eof_received() 7697db96d56Sopenharmony_ci self._start_shutdown() 7707db96d56Sopenharmony_ci 7717db96d56Sopenharmony_ci def _do_read__copied(self): 7727db96d56Sopenharmony_ci chunk = b'1' 7737db96d56Sopenharmony_ci zero = True 7747db96d56Sopenharmony_ci one = False 7757db96d56Sopenharmony_ci 7767db96d56Sopenharmony_ci try: 7777db96d56Sopenharmony_ci while True: 7787db96d56Sopenharmony_ci chunk = self._sslobj.read(self.max_size) 7797db96d56Sopenharmony_ci if not chunk: 7807db96d56Sopenharmony_ci break 7817db96d56Sopenharmony_ci if zero: 7827db96d56Sopenharmony_ci zero = False 7837db96d56Sopenharmony_ci one = True 7847db96d56Sopenharmony_ci first = chunk 7857db96d56Sopenharmony_ci elif one: 7867db96d56Sopenharmony_ci one = False 7877db96d56Sopenharmony_ci data = [first, chunk] 7887db96d56Sopenharmony_ci else: 7897db96d56Sopenharmony_ci data.append(chunk) 7907db96d56Sopenharmony_ci except SSLAgainErrors: 7917db96d56Sopenharmony_ci pass 7927db96d56Sopenharmony_ci if one: 7937db96d56Sopenharmony_ci self._app_protocol.data_received(first) 7947db96d56Sopenharmony_ci elif not zero: 7957db96d56Sopenharmony_ci self._app_protocol.data_received(b''.join(data)) 7967db96d56Sopenharmony_ci if not chunk: 7977db96d56Sopenharmony_ci # close_notify 7987db96d56Sopenharmony_ci self._call_eof_received() 7997db96d56Sopenharmony_ci self._start_shutdown() 8007db96d56Sopenharmony_ci 8017db96d56Sopenharmony_ci def _call_eof_received(self): 8027db96d56Sopenharmony_ci try: 8037db96d56Sopenharmony_ci if self._app_state == AppProtocolState.STATE_CON_MADE: 8047db96d56Sopenharmony_ci self._app_state = AppProtocolState.STATE_EOF 8057db96d56Sopenharmony_ci keep_open = self._app_protocol.eof_received() 8067db96d56Sopenharmony_ci if keep_open: 8077db96d56Sopenharmony_ci logger.warning('returning true from eof_received() ' 8087db96d56Sopenharmony_ci 'has no effect when using ssl') 8097db96d56Sopenharmony_ci except (KeyboardInterrupt, SystemExit): 8107db96d56Sopenharmony_ci raise 8117db96d56Sopenharmony_ci except BaseException as ex: 8127db96d56Sopenharmony_ci self._fatal_error(ex, 'Error calling eof_received()') 8137db96d56Sopenharmony_ci 8147db96d56Sopenharmony_ci # Flow control for writes from APP socket 8157db96d56Sopenharmony_ci 8167db96d56Sopenharmony_ci def _control_app_writing(self): 8177db96d56Sopenharmony_ci size = self._get_write_buffer_size() 8187db96d56Sopenharmony_ci if size >= self._outgoing_high_water and not self._app_writing_paused: 8197db96d56Sopenharmony_ci self._app_writing_paused = True 8207db96d56Sopenharmony_ci try: 8217db96d56Sopenharmony_ci self._app_protocol.pause_writing() 8227db96d56Sopenharmony_ci except (KeyboardInterrupt, SystemExit): 8237db96d56Sopenharmony_ci raise 8247db96d56Sopenharmony_ci except BaseException as exc: 8257db96d56Sopenharmony_ci self._loop.call_exception_handler({ 8267db96d56Sopenharmony_ci 'message': 'protocol.pause_writing() failed', 8277db96d56Sopenharmony_ci 'exception': exc, 8287db96d56Sopenharmony_ci 'transport': self._app_transport, 8297db96d56Sopenharmony_ci 'protocol': self, 8307db96d56Sopenharmony_ci }) 8317db96d56Sopenharmony_ci elif size <= self._outgoing_low_water and self._app_writing_paused: 8327db96d56Sopenharmony_ci self._app_writing_paused = False 8337db96d56Sopenharmony_ci try: 8347db96d56Sopenharmony_ci self._app_protocol.resume_writing() 8357db96d56Sopenharmony_ci except (KeyboardInterrupt, SystemExit): 8367db96d56Sopenharmony_ci raise 8377db96d56Sopenharmony_ci except BaseException as exc: 8387db96d56Sopenharmony_ci self._loop.call_exception_handler({ 8397db96d56Sopenharmony_ci 'message': 'protocol.resume_writing() failed', 8407db96d56Sopenharmony_ci 'exception': exc, 8417db96d56Sopenharmony_ci 'transport': self._app_transport, 8427db96d56Sopenharmony_ci 'protocol': self, 8437db96d56Sopenharmony_ci }) 8447db96d56Sopenharmony_ci 8457db96d56Sopenharmony_ci def _get_write_buffer_size(self): 8467db96d56Sopenharmony_ci return self._outgoing.pending + self._write_buffer_size 8477db96d56Sopenharmony_ci 8487db96d56Sopenharmony_ci def _set_write_buffer_limits(self, high=None, low=None): 8497db96d56Sopenharmony_ci high, low = add_flowcontrol_defaults( 8507db96d56Sopenharmony_ci high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_WRITE) 8517db96d56Sopenharmony_ci self._outgoing_high_water = high 8527db96d56Sopenharmony_ci self._outgoing_low_water = low 8537db96d56Sopenharmony_ci 8547db96d56Sopenharmony_ci # Flow control for reads to APP socket 8557db96d56Sopenharmony_ci 8567db96d56Sopenharmony_ci def _pause_reading(self): 8577db96d56Sopenharmony_ci self._app_reading_paused = True 8587db96d56Sopenharmony_ci 8597db96d56Sopenharmony_ci def _resume_reading(self): 8607db96d56Sopenharmony_ci if self._app_reading_paused: 8617db96d56Sopenharmony_ci self._app_reading_paused = False 8627db96d56Sopenharmony_ci 8637db96d56Sopenharmony_ci def resume(): 8647db96d56Sopenharmony_ci if self._state == SSLProtocolState.WRAPPED: 8657db96d56Sopenharmony_ci self._do_read() 8667db96d56Sopenharmony_ci elif self._state == SSLProtocolState.FLUSHING: 8677db96d56Sopenharmony_ci self._do_flush() 8687db96d56Sopenharmony_ci elif self._state == SSLProtocolState.SHUTDOWN: 8697db96d56Sopenharmony_ci self._do_shutdown() 8707db96d56Sopenharmony_ci self._loop.call_soon(resume) 8717db96d56Sopenharmony_ci 8727db96d56Sopenharmony_ci # Flow control for reads from SSL socket 8737db96d56Sopenharmony_ci 8747db96d56Sopenharmony_ci def _control_ssl_reading(self): 8757db96d56Sopenharmony_ci size = self._get_read_buffer_size() 8767db96d56Sopenharmony_ci if size >= self._incoming_high_water and not self._ssl_reading_paused: 8777db96d56Sopenharmony_ci self._ssl_reading_paused = True 8787db96d56Sopenharmony_ci self._transport.pause_reading() 8797db96d56Sopenharmony_ci elif size <= self._incoming_low_water and self._ssl_reading_paused: 8807db96d56Sopenharmony_ci self._ssl_reading_paused = False 8817db96d56Sopenharmony_ci self._transport.resume_reading() 8827db96d56Sopenharmony_ci 8837db96d56Sopenharmony_ci def _set_read_buffer_limits(self, high=None, low=None): 8847db96d56Sopenharmony_ci high, low = add_flowcontrol_defaults( 8857db96d56Sopenharmony_ci high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_READ) 8867db96d56Sopenharmony_ci self._incoming_high_water = high 8877db96d56Sopenharmony_ci self._incoming_low_water = low 8887db96d56Sopenharmony_ci 8897db96d56Sopenharmony_ci def _get_read_buffer_size(self): 8907db96d56Sopenharmony_ci return self._incoming.pending 8917db96d56Sopenharmony_ci 8927db96d56Sopenharmony_ci # Flow control for writes to SSL socket 8937db96d56Sopenharmony_ci 8947db96d56Sopenharmony_ci def pause_writing(self): 8957db96d56Sopenharmony_ci """Called when the low-level transport's buffer goes over 8967db96d56Sopenharmony_ci the high-water mark. 8977db96d56Sopenharmony_ci """ 8987db96d56Sopenharmony_ci assert not self._ssl_writing_paused 8997db96d56Sopenharmony_ci self._ssl_writing_paused = True 9007db96d56Sopenharmony_ci 9017db96d56Sopenharmony_ci def resume_writing(self): 9027db96d56Sopenharmony_ci """Called when the low-level transport's buffer drains below 9037db96d56Sopenharmony_ci the low-water mark. 9047db96d56Sopenharmony_ci """ 9057db96d56Sopenharmony_ci assert self._ssl_writing_paused 9067db96d56Sopenharmony_ci self._ssl_writing_paused = False 9077db96d56Sopenharmony_ci self._process_outgoing() 9087db96d56Sopenharmony_ci 9097db96d56Sopenharmony_ci def _fatal_error(self, exc, message='Fatal error on transport'): 9107db96d56Sopenharmony_ci if self._transport: 9117db96d56Sopenharmony_ci self._transport._force_close(exc) 9127db96d56Sopenharmony_ci 9137db96d56Sopenharmony_ci if isinstance(exc, OSError): 9147db96d56Sopenharmony_ci if self._loop.get_debug(): 9157db96d56Sopenharmony_ci logger.debug("%r: %s", self, message, exc_info=True) 9167db96d56Sopenharmony_ci elif not isinstance(exc, exceptions.CancelledError): 9177db96d56Sopenharmony_ci self._loop.call_exception_handler({ 9187db96d56Sopenharmony_ci 'message': message, 9197db96d56Sopenharmony_ci 'exception': exc, 9207db96d56Sopenharmony_ci 'transport': self._transport, 9217db96d56Sopenharmony_ci 'protocol': self, 9227db96d56Sopenharmony_ci }) 923