17db96d56Sopenharmony_ci"""Test script for poplib module.""" 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ci# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL 47db96d56Sopenharmony_ci# a real test suite 57db96d56Sopenharmony_ci 67db96d56Sopenharmony_ciimport poplib 77db96d56Sopenharmony_ciimport socket 87db96d56Sopenharmony_ciimport os 97db96d56Sopenharmony_ciimport errno 107db96d56Sopenharmony_ciimport threading 117db96d56Sopenharmony_ci 127db96d56Sopenharmony_ciimport unittest 137db96d56Sopenharmony_cifrom unittest import TestCase, skipUnless 147db96d56Sopenharmony_cifrom test import support as test_support 157db96d56Sopenharmony_cifrom test.support import hashlib_helper 167db96d56Sopenharmony_cifrom test.support import socket_helper 177db96d56Sopenharmony_cifrom test.support import threading_helper 187db96d56Sopenharmony_cifrom test.support import warnings_helper 197db96d56Sopenharmony_ci 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_ciasynchat = warnings_helper.import_deprecated('asynchat') 227db96d56Sopenharmony_ciasyncore = warnings_helper.import_deprecated('asyncore') 237db96d56Sopenharmony_ci 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_citest_support.requires_working_socket(module=True) 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ciHOST = socket_helper.HOST 287db96d56Sopenharmony_ciPORT = 0 297db96d56Sopenharmony_ci 307db96d56Sopenharmony_ciSUPPORTS_SSL = False 317db96d56Sopenharmony_ciif hasattr(poplib, 'POP3_SSL'): 327db96d56Sopenharmony_ci import ssl 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ci SUPPORTS_SSL = True 357db96d56Sopenharmony_ci CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 367db96d56Sopenharmony_ci CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_cirequires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported') 397db96d56Sopenharmony_ci 407db96d56Sopenharmony_ci# the dummy data returned by server when LIST and RETR commands are issued 417db96d56Sopenharmony_ciLIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' 427db96d56Sopenharmony_ciRETR_RESP = b"""From: postmaster@python.org\ 437db96d56Sopenharmony_ci\r\nContent-Type: text/plain\r\n\ 447db96d56Sopenharmony_ciMIME-Version: 1.0\r\n\ 457db96d56Sopenharmony_ciSubject: Dummy\r\n\ 467db96d56Sopenharmony_ci\r\n\ 477db96d56Sopenharmony_ciline1\r\n\ 487db96d56Sopenharmony_ciline2\r\n\ 497db96d56Sopenharmony_ciline3\r\n\ 507db96d56Sopenharmony_ci.\r\n""" 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ciclass DummyPOP3Handler(asynchat.async_chat): 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ci CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']} 567db96d56Sopenharmony_ci enable_UTF8 = False 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ci def __init__(self, conn): 597db96d56Sopenharmony_ci asynchat.async_chat.__init__(self, conn) 607db96d56Sopenharmony_ci self.set_terminator(b"\r\n") 617db96d56Sopenharmony_ci self.in_buffer = [] 627db96d56Sopenharmony_ci self.push('+OK dummy pop3 server ready. <timestamp>') 637db96d56Sopenharmony_ci self.tls_active = False 647db96d56Sopenharmony_ci self.tls_starting = False 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ci def collect_incoming_data(self, data): 677db96d56Sopenharmony_ci self.in_buffer.append(data) 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ci def found_terminator(self): 707db96d56Sopenharmony_ci line = b''.join(self.in_buffer) 717db96d56Sopenharmony_ci line = str(line, 'ISO-8859-1') 727db96d56Sopenharmony_ci self.in_buffer = [] 737db96d56Sopenharmony_ci cmd = line.split(' ')[0].lower() 747db96d56Sopenharmony_ci space = line.find(' ') 757db96d56Sopenharmony_ci if space != -1: 767db96d56Sopenharmony_ci arg = line[space + 1:] 777db96d56Sopenharmony_ci else: 787db96d56Sopenharmony_ci arg = "" 797db96d56Sopenharmony_ci if hasattr(self, 'cmd_' + cmd): 807db96d56Sopenharmony_ci method = getattr(self, 'cmd_' + cmd) 817db96d56Sopenharmony_ci method(arg) 827db96d56Sopenharmony_ci else: 837db96d56Sopenharmony_ci self.push('-ERR unrecognized POP3 command "%s".' %cmd) 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ci def handle_error(self): 867db96d56Sopenharmony_ci raise 877db96d56Sopenharmony_ci 887db96d56Sopenharmony_ci def push(self, data): 897db96d56Sopenharmony_ci asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n') 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci def cmd_echo(self, arg): 927db96d56Sopenharmony_ci # sends back the received string (used by the test suite) 937db96d56Sopenharmony_ci self.push(arg) 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci def cmd_user(self, arg): 967db96d56Sopenharmony_ci if arg != "guido": 977db96d56Sopenharmony_ci self.push("-ERR no such user") 987db96d56Sopenharmony_ci self.push('+OK password required') 997db96d56Sopenharmony_ci 1007db96d56Sopenharmony_ci def cmd_pass(self, arg): 1017db96d56Sopenharmony_ci if arg != "python": 1027db96d56Sopenharmony_ci self.push("-ERR wrong password") 1037db96d56Sopenharmony_ci self.push('+OK 10 messages') 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_ci def cmd_stat(self, arg): 1067db96d56Sopenharmony_ci self.push('+OK 10 100') 1077db96d56Sopenharmony_ci 1087db96d56Sopenharmony_ci def cmd_list(self, arg): 1097db96d56Sopenharmony_ci if arg: 1107db96d56Sopenharmony_ci self.push('+OK %s %s' % (arg, arg)) 1117db96d56Sopenharmony_ci else: 1127db96d56Sopenharmony_ci self.push('+OK') 1137db96d56Sopenharmony_ci asynchat.async_chat.push(self, LIST_RESP) 1147db96d56Sopenharmony_ci 1157db96d56Sopenharmony_ci cmd_uidl = cmd_list 1167db96d56Sopenharmony_ci 1177db96d56Sopenharmony_ci def cmd_retr(self, arg): 1187db96d56Sopenharmony_ci self.push('+OK %s bytes' %len(RETR_RESP)) 1197db96d56Sopenharmony_ci asynchat.async_chat.push(self, RETR_RESP) 1207db96d56Sopenharmony_ci 1217db96d56Sopenharmony_ci cmd_top = cmd_retr 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ci def cmd_dele(self, arg): 1247db96d56Sopenharmony_ci self.push('+OK message marked for deletion.') 1257db96d56Sopenharmony_ci 1267db96d56Sopenharmony_ci def cmd_noop(self, arg): 1277db96d56Sopenharmony_ci self.push('+OK done nothing.') 1287db96d56Sopenharmony_ci 1297db96d56Sopenharmony_ci def cmd_rpop(self, arg): 1307db96d56Sopenharmony_ci self.push('+OK done nothing.') 1317db96d56Sopenharmony_ci 1327db96d56Sopenharmony_ci def cmd_apop(self, arg): 1337db96d56Sopenharmony_ci self.push('+OK done nothing.') 1347db96d56Sopenharmony_ci 1357db96d56Sopenharmony_ci def cmd_quit(self, arg): 1367db96d56Sopenharmony_ci self.push('+OK closing.') 1377db96d56Sopenharmony_ci self.close_when_done() 1387db96d56Sopenharmony_ci 1397db96d56Sopenharmony_ci def _get_capas(self): 1407db96d56Sopenharmony_ci _capas = dict(self.CAPAS) 1417db96d56Sopenharmony_ci if not self.tls_active and SUPPORTS_SSL: 1427db96d56Sopenharmony_ci _capas['STLS'] = [] 1437db96d56Sopenharmony_ci return _capas 1447db96d56Sopenharmony_ci 1457db96d56Sopenharmony_ci def cmd_capa(self, arg): 1467db96d56Sopenharmony_ci self.push('+OK Capability list follows') 1477db96d56Sopenharmony_ci if self._get_capas(): 1487db96d56Sopenharmony_ci for cap, params in self._get_capas().items(): 1497db96d56Sopenharmony_ci _ln = [cap] 1507db96d56Sopenharmony_ci if params: 1517db96d56Sopenharmony_ci _ln.extend(params) 1527db96d56Sopenharmony_ci self.push(' '.join(_ln)) 1537db96d56Sopenharmony_ci self.push('.') 1547db96d56Sopenharmony_ci 1557db96d56Sopenharmony_ci def cmd_utf8(self, arg): 1567db96d56Sopenharmony_ci self.push('+OK I know RFC6856' 1577db96d56Sopenharmony_ci if self.enable_UTF8 1587db96d56Sopenharmony_ci else '-ERR What is UTF8?!') 1597db96d56Sopenharmony_ci 1607db96d56Sopenharmony_ci if SUPPORTS_SSL: 1617db96d56Sopenharmony_ci 1627db96d56Sopenharmony_ci def cmd_stls(self, arg): 1637db96d56Sopenharmony_ci if self.tls_active is False: 1647db96d56Sopenharmony_ci self.push('+OK Begin TLS negotiation') 1657db96d56Sopenharmony_ci context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 1667db96d56Sopenharmony_ci context.load_cert_chain(CERTFILE) 1677db96d56Sopenharmony_ci tls_sock = context.wrap_socket(self.socket, 1687db96d56Sopenharmony_ci server_side=True, 1697db96d56Sopenharmony_ci do_handshake_on_connect=False, 1707db96d56Sopenharmony_ci suppress_ragged_eofs=False) 1717db96d56Sopenharmony_ci self.del_channel() 1727db96d56Sopenharmony_ci self.set_socket(tls_sock) 1737db96d56Sopenharmony_ci self.tls_active = True 1747db96d56Sopenharmony_ci self.tls_starting = True 1757db96d56Sopenharmony_ci self.in_buffer = [] 1767db96d56Sopenharmony_ci self._do_tls_handshake() 1777db96d56Sopenharmony_ci else: 1787db96d56Sopenharmony_ci self.push('-ERR Command not permitted when TLS active') 1797db96d56Sopenharmony_ci 1807db96d56Sopenharmony_ci def _do_tls_handshake(self): 1817db96d56Sopenharmony_ci try: 1827db96d56Sopenharmony_ci self.socket.do_handshake() 1837db96d56Sopenharmony_ci except ssl.SSLError as err: 1847db96d56Sopenharmony_ci if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 1857db96d56Sopenharmony_ci ssl.SSL_ERROR_WANT_WRITE): 1867db96d56Sopenharmony_ci return 1877db96d56Sopenharmony_ci elif err.args[0] == ssl.SSL_ERROR_EOF: 1887db96d56Sopenharmony_ci return self.handle_close() 1897db96d56Sopenharmony_ci # TODO: SSLError does not expose alert information 1907db96d56Sopenharmony_ci elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or 1917db96d56Sopenharmony_ci "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]): 1927db96d56Sopenharmony_ci return self.handle_close() 1937db96d56Sopenharmony_ci raise 1947db96d56Sopenharmony_ci except OSError as err: 1957db96d56Sopenharmony_ci if err.args[0] == errno.ECONNABORTED: 1967db96d56Sopenharmony_ci return self.handle_close() 1977db96d56Sopenharmony_ci else: 1987db96d56Sopenharmony_ci self.tls_active = True 1997db96d56Sopenharmony_ci self.tls_starting = False 2007db96d56Sopenharmony_ci 2017db96d56Sopenharmony_ci def handle_read(self): 2027db96d56Sopenharmony_ci if self.tls_starting: 2037db96d56Sopenharmony_ci self._do_tls_handshake() 2047db96d56Sopenharmony_ci else: 2057db96d56Sopenharmony_ci try: 2067db96d56Sopenharmony_ci asynchat.async_chat.handle_read(self) 2077db96d56Sopenharmony_ci except ssl.SSLEOFError: 2087db96d56Sopenharmony_ci self.handle_close() 2097db96d56Sopenharmony_ci 2107db96d56Sopenharmony_ciclass DummyPOP3Server(asyncore.dispatcher, threading.Thread): 2117db96d56Sopenharmony_ci 2127db96d56Sopenharmony_ci handler = DummyPOP3Handler 2137db96d56Sopenharmony_ci 2147db96d56Sopenharmony_ci def __init__(self, address, af=socket.AF_INET): 2157db96d56Sopenharmony_ci threading.Thread.__init__(self) 2167db96d56Sopenharmony_ci asyncore.dispatcher.__init__(self) 2177db96d56Sopenharmony_ci self.daemon = True 2187db96d56Sopenharmony_ci self.create_socket(af, socket.SOCK_STREAM) 2197db96d56Sopenharmony_ci self.bind(address) 2207db96d56Sopenharmony_ci self.listen(5) 2217db96d56Sopenharmony_ci self.active = False 2227db96d56Sopenharmony_ci self.active_lock = threading.Lock() 2237db96d56Sopenharmony_ci self.host, self.port = self.socket.getsockname()[:2] 2247db96d56Sopenharmony_ci self.handler_instance = None 2257db96d56Sopenharmony_ci 2267db96d56Sopenharmony_ci def start(self): 2277db96d56Sopenharmony_ci assert not self.active 2287db96d56Sopenharmony_ci self.__flag = threading.Event() 2297db96d56Sopenharmony_ci threading.Thread.start(self) 2307db96d56Sopenharmony_ci self.__flag.wait() 2317db96d56Sopenharmony_ci 2327db96d56Sopenharmony_ci def run(self): 2337db96d56Sopenharmony_ci self.active = True 2347db96d56Sopenharmony_ci self.__flag.set() 2357db96d56Sopenharmony_ci try: 2367db96d56Sopenharmony_ci while self.active and asyncore.socket_map: 2377db96d56Sopenharmony_ci with self.active_lock: 2387db96d56Sopenharmony_ci asyncore.loop(timeout=0.1, count=1) 2397db96d56Sopenharmony_ci finally: 2407db96d56Sopenharmony_ci asyncore.close_all(ignore_all=True) 2417db96d56Sopenharmony_ci 2427db96d56Sopenharmony_ci def stop(self): 2437db96d56Sopenharmony_ci assert self.active 2447db96d56Sopenharmony_ci self.active = False 2457db96d56Sopenharmony_ci self.join() 2467db96d56Sopenharmony_ci 2477db96d56Sopenharmony_ci def handle_accepted(self, conn, addr): 2487db96d56Sopenharmony_ci self.handler_instance = self.handler(conn) 2497db96d56Sopenharmony_ci 2507db96d56Sopenharmony_ci def handle_connect(self): 2517db96d56Sopenharmony_ci self.close() 2527db96d56Sopenharmony_ci handle_read = handle_connect 2537db96d56Sopenharmony_ci 2547db96d56Sopenharmony_ci def writable(self): 2557db96d56Sopenharmony_ci return 0 2567db96d56Sopenharmony_ci 2577db96d56Sopenharmony_ci def handle_error(self): 2587db96d56Sopenharmony_ci raise 2597db96d56Sopenharmony_ci 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ciclass TestPOP3Class(TestCase): 2627db96d56Sopenharmony_ci def assertOK(self, resp): 2637db96d56Sopenharmony_ci self.assertTrue(resp.startswith(b"+OK")) 2647db96d56Sopenharmony_ci 2657db96d56Sopenharmony_ci def setUp(self): 2667db96d56Sopenharmony_ci self.server = DummyPOP3Server((HOST, PORT)) 2677db96d56Sopenharmony_ci self.server.start() 2687db96d56Sopenharmony_ci self.client = poplib.POP3(self.server.host, self.server.port, 2697db96d56Sopenharmony_ci timeout=test_support.LOOPBACK_TIMEOUT) 2707db96d56Sopenharmony_ci 2717db96d56Sopenharmony_ci def tearDown(self): 2727db96d56Sopenharmony_ci self.client.close() 2737db96d56Sopenharmony_ci self.server.stop() 2747db96d56Sopenharmony_ci # Explicitly clear the attribute to prevent dangling thread 2757db96d56Sopenharmony_ci self.server = None 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci def test_getwelcome(self): 2787db96d56Sopenharmony_ci self.assertEqual(self.client.getwelcome(), 2797db96d56Sopenharmony_ci b'+OK dummy pop3 server ready. <timestamp>') 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci def test_exceptions(self): 2827db96d56Sopenharmony_ci self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err') 2837db96d56Sopenharmony_ci 2847db96d56Sopenharmony_ci def test_user(self): 2857db96d56Sopenharmony_ci self.assertOK(self.client.user('guido')) 2867db96d56Sopenharmony_ci self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 2877db96d56Sopenharmony_ci 2887db96d56Sopenharmony_ci def test_pass_(self): 2897db96d56Sopenharmony_ci self.assertOK(self.client.pass_('python')) 2907db96d56Sopenharmony_ci self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 2917db96d56Sopenharmony_ci 2927db96d56Sopenharmony_ci def test_stat(self): 2937db96d56Sopenharmony_ci self.assertEqual(self.client.stat(), (10, 100)) 2947db96d56Sopenharmony_ci 2957db96d56Sopenharmony_ci def test_list(self): 2967db96d56Sopenharmony_ci self.assertEqual(self.client.list()[1:], 2977db96d56Sopenharmony_ci ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'], 2987db96d56Sopenharmony_ci 25)) 2997db96d56Sopenharmony_ci self.assertTrue(self.client.list('1').endswith(b"OK 1 1")) 3007db96d56Sopenharmony_ci 3017db96d56Sopenharmony_ci def test_retr(self): 3027db96d56Sopenharmony_ci expected = (b'+OK 116 bytes', 3037db96d56Sopenharmony_ci [b'From: postmaster@python.org', b'Content-Type: text/plain', 3047db96d56Sopenharmony_ci b'MIME-Version: 1.0', b'Subject: Dummy', 3057db96d56Sopenharmony_ci b'', b'line1', b'line2', b'line3'], 3067db96d56Sopenharmony_ci 113) 3077db96d56Sopenharmony_ci foo = self.client.retr('foo') 3087db96d56Sopenharmony_ci self.assertEqual(foo, expected) 3097db96d56Sopenharmony_ci 3107db96d56Sopenharmony_ci def test_too_long_lines(self): 3117db96d56Sopenharmony_ci self.assertRaises(poplib.error_proto, self.client._shortcmd, 3127db96d56Sopenharmony_ci 'echo +%s' % ((poplib._MAXLINE + 10) * 'a')) 3137db96d56Sopenharmony_ci 3147db96d56Sopenharmony_ci def test_dele(self): 3157db96d56Sopenharmony_ci self.assertOK(self.client.dele('foo')) 3167db96d56Sopenharmony_ci 3177db96d56Sopenharmony_ci def test_noop(self): 3187db96d56Sopenharmony_ci self.assertOK(self.client.noop()) 3197db96d56Sopenharmony_ci 3207db96d56Sopenharmony_ci def test_rpop(self): 3217db96d56Sopenharmony_ci self.assertOK(self.client.rpop('foo')) 3227db96d56Sopenharmony_ci 3237db96d56Sopenharmony_ci @hashlib_helper.requires_hashdigest('md5', openssl=True) 3247db96d56Sopenharmony_ci def test_apop_normal(self): 3257db96d56Sopenharmony_ci self.assertOK(self.client.apop('foo', 'dummypassword')) 3267db96d56Sopenharmony_ci 3277db96d56Sopenharmony_ci @hashlib_helper.requires_hashdigest('md5', openssl=True) 3287db96d56Sopenharmony_ci def test_apop_REDOS(self): 3297db96d56Sopenharmony_ci # Replace welcome with very long evil welcome. 3307db96d56Sopenharmony_ci # NB The upper bound on welcome length is currently 2048. 3317db96d56Sopenharmony_ci # At this length, evil input makes each apop call take 3327db96d56Sopenharmony_ci # on the order of milliseconds instead of microseconds. 3337db96d56Sopenharmony_ci evil_welcome = b'+OK' + (b'<' * 1000000) 3347db96d56Sopenharmony_ci with test_support.swap_attr(self.client, 'welcome', evil_welcome): 3357db96d56Sopenharmony_ci # The evil welcome is invalid, so apop should throw. 3367db96d56Sopenharmony_ci self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb') 3377db96d56Sopenharmony_ci 3387db96d56Sopenharmony_ci def test_top(self): 3397db96d56Sopenharmony_ci expected = (b'+OK 116 bytes', 3407db96d56Sopenharmony_ci [b'From: postmaster@python.org', b'Content-Type: text/plain', 3417db96d56Sopenharmony_ci b'MIME-Version: 1.0', b'Subject: Dummy', b'', 3427db96d56Sopenharmony_ci b'line1', b'line2', b'line3'], 3437db96d56Sopenharmony_ci 113) 3447db96d56Sopenharmony_ci self.assertEqual(self.client.top(1, 1), expected) 3457db96d56Sopenharmony_ci 3467db96d56Sopenharmony_ci def test_uidl(self): 3477db96d56Sopenharmony_ci self.client.uidl() 3487db96d56Sopenharmony_ci self.client.uidl('foo') 3497db96d56Sopenharmony_ci 3507db96d56Sopenharmony_ci def test_utf8_raises_if_unsupported(self): 3517db96d56Sopenharmony_ci self.server.handler.enable_UTF8 = False 3527db96d56Sopenharmony_ci self.assertRaises(poplib.error_proto, self.client.utf8) 3537db96d56Sopenharmony_ci 3547db96d56Sopenharmony_ci def test_utf8(self): 3557db96d56Sopenharmony_ci self.server.handler.enable_UTF8 = True 3567db96d56Sopenharmony_ci expected = b'+OK I know RFC6856' 3577db96d56Sopenharmony_ci result = self.client.utf8() 3587db96d56Sopenharmony_ci self.assertEqual(result, expected) 3597db96d56Sopenharmony_ci 3607db96d56Sopenharmony_ci def test_capa(self): 3617db96d56Sopenharmony_ci capa = self.client.capa() 3627db96d56Sopenharmony_ci self.assertTrue('IMPLEMENTATION' in capa.keys()) 3637db96d56Sopenharmony_ci 3647db96d56Sopenharmony_ci def test_quit(self): 3657db96d56Sopenharmony_ci resp = self.client.quit() 3667db96d56Sopenharmony_ci self.assertTrue(resp) 3677db96d56Sopenharmony_ci self.assertIsNone(self.client.sock) 3687db96d56Sopenharmony_ci self.assertIsNone(self.client.file) 3697db96d56Sopenharmony_ci 3707db96d56Sopenharmony_ci @requires_ssl 3717db96d56Sopenharmony_ci def test_stls_capa(self): 3727db96d56Sopenharmony_ci capa = self.client.capa() 3737db96d56Sopenharmony_ci self.assertTrue('STLS' in capa.keys()) 3747db96d56Sopenharmony_ci 3757db96d56Sopenharmony_ci @requires_ssl 3767db96d56Sopenharmony_ci def test_stls(self): 3777db96d56Sopenharmony_ci expected = b'+OK Begin TLS negotiation' 3787db96d56Sopenharmony_ci resp = self.client.stls() 3797db96d56Sopenharmony_ci self.assertEqual(resp, expected) 3807db96d56Sopenharmony_ci 3817db96d56Sopenharmony_ci @requires_ssl 3827db96d56Sopenharmony_ci def test_stls_context(self): 3837db96d56Sopenharmony_ci expected = b'+OK Begin TLS negotiation' 3847db96d56Sopenharmony_ci ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 3857db96d56Sopenharmony_ci ctx.load_verify_locations(CAFILE) 3867db96d56Sopenharmony_ci self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 3877db96d56Sopenharmony_ci self.assertEqual(ctx.check_hostname, True) 3887db96d56Sopenharmony_ci with self.assertRaises(ssl.CertificateError): 3897db96d56Sopenharmony_ci resp = self.client.stls(context=ctx) 3907db96d56Sopenharmony_ci self.client = poplib.POP3("localhost", self.server.port, 3917db96d56Sopenharmony_ci timeout=test_support.LOOPBACK_TIMEOUT) 3927db96d56Sopenharmony_ci resp = self.client.stls(context=ctx) 3937db96d56Sopenharmony_ci self.assertEqual(resp, expected) 3947db96d56Sopenharmony_ci 3957db96d56Sopenharmony_ci 3967db96d56Sopenharmony_ciif SUPPORTS_SSL: 3977db96d56Sopenharmony_ci from test.test_ftplib import SSLConnection 3987db96d56Sopenharmony_ci 3997db96d56Sopenharmony_ci class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler): 4007db96d56Sopenharmony_ci 4017db96d56Sopenharmony_ci def __init__(self, conn): 4027db96d56Sopenharmony_ci asynchat.async_chat.__init__(self, conn) 4037db96d56Sopenharmony_ci self.secure_connection() 4047db96d56Sopenharmony_ci self.set_terminator(b"\r\n") 4057db96d56Sopenharmony_ci self.in_buffer = [] 4067db96d56Sopenharmony_ci self.push('+OK dummy pop3 server ready. <timestamp>') 4077db96d56Sopenharmony_ci self.tls_active = True 4087db96d56Sopenharmony_ci self.tls_starting = False 4097db96d56Sopenharmony_ci 4107db96d56Sopenharmony_ci 4117db96d56Sopenharmony_ci@requires_ssl 4127db96d56Sopenharmony_ciclass TestPOP3_SSLClass(TestPOP3Class): 4137db96d56Sopenharmony_ci # repeat previous tests by using poplib.POP3_SSL 4147db96d56Sopenharmony_ci 4157db96d56Sopenharmony_ci def setUp(self): 4167db96d56Sopenharmony_ci self.server = DummyPOP3Server((HOST, PORT)) 4177db96d56Sopenharmony_ci self.server.handler = DummyPOP3_SSLHandler 4187db96d56Sopenharmony_ci self.server.start() 4197db96d56Sopenharmony_ci self.client = poplib.POP3_SSL(self.server.host, self.server.port) 4207db96d56Sopenharmony_ci 4217db96d56Sopenharmony_ci def test__all__(self): 4227db96d56Sopenharmony_ci self.assertIn('POP3_SSL', poplib.__all__) 4237db96d56Sopenharmony_ci 4247db96d56Sopenharmony_ci def test_context(self): 4257db96d56Sopenharmony_ci ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 4267db96d56Sopenharmony_ci ctx.check_hostname = False 4277db96d56Sopenharmony_ci ctx.verify_mode = ssl.CERT_NONE 4287db96d56Sopenharmony_ci self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 4297db96d56Sopenharmony_ci self.server.port, keyfile=CERTFILE, context=ctx) 4307db96d56Sopenharmony_ci self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 4317db96d56Sopenharmony_ci self.server.port, certfile=CERTFILE, context=ctx) 4327db96d56Sopenharmony_ci self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 4337db96d56Sopenharmony_ci self.server.port, keyfile=CERTFILE, 4347db96d56Sopenharmony_ci certfile=CERTFILE, context=ctx) 4357db96d56Sopenharmony_ci 4367db96d56Sopenharmony_ci self.client.quit() 4377db96d56Sopenharmony_ci self.client = poplib.POP3_SSL(self.server.host, self.server.port, 4387db96d56Sopenharmony_ci context=ctx) 4397db96d56Sopenharmony_ci self.assertIsInstance(self.client.sock, ssl.SSLSocket) 4407db96d56Sopenharmony_ci self.assertIs(self.client.sock.context, ctx) 4417db96d56Sopenharmony_ci self.assertTrue(self.client.noop().startswith(b'+OK')) 4427db96d56Sopenharmony_ci 4437db96d56Sopenharmony_ci def test_stls(self): 4447db96d56Sopenharmony_ci self.assertRaises(poplib.error_proto, self.client.stls) 4457db96d56Sopenharmony_ci 4467db96d56Sopenharmony_ci test_stls_context = test_stls 4477db96d56Sopenharmony_ci 4487db96d56Sopenharmony_ci def test_stls_capa(self): 4497db96d56Sopenharmony_ci capa = self.client.capa() 4507db96d56Sopenharmony_ci self.assertFalse('STLS' in capa.keys()) 4517db96d56Sopenharmony_ci 4527db96d56Sopenharmony_ci 4537db96d56Sopenharmony_ci@requires_ssl 4547db96d56Sopenharmony_ciclass TestPOP3_TLSClass(TestPOP3Class): 4557db96d56Sopenharmony_ci # repeat previous tests by using poplib.POP3.stls() 4567db96d56Sopenharmony_ci 4577db96d56Sopenharmony_ci def setUp(self): 4587db96d56Sopenharmony_ci self.server = DummyPOP3Server((HOST, PORT)) 4597db96d56Sopenharmony_ci self.server.start() 4607db96d56Sopenharmony_ci self.client = poplib.POP3(self.server.host, self.server.port, 4617db96d56Sopenharmony_ci timeout=test_support.LOOPBACK_TIMEOUT) 4627db96d56Sopenharmony_ci self.client.stls() 4637db96d56Sopenharmony_ci 4647db96d56Sopenharmony_ci def tearDown(self): 4657db96d56Sopenharmony_ci if self.client.file is not None and self.client.sock is not None: 4667db96d56Sopenharmony_ci try: 4677db96d56Sopenharmony_ci self.client.quit() 4687db96d56Sopenharmony_ci except poplib.error_proto: 4697db96d56Sopenharmony_ci # happens in the test_too_long_lines case; the overlong 4707db96d56Sopenharmony_ci # response will be treated as response to QUIT and raise 4717db96d56Sopenharmony_ci # this exception 4727db96d56Sopenharmony_ci self.client.close() 4737db96d56Sopenharmony_ci self.server.stop() 4747db96d56Sopenharmony_ci # Explicitly clear the attribute to prevent dangling thread 4757db96d56Sopenharmony_ci self.server = None 4767db96d56Sopenharmony_ci 4777db96d56Sopenharmony_ci def test_stls(self): 4787db96d56Sopenharmony_ci self.assertRaises(poplib.error_proto, self.client.stls) 4797db96d56Sopenharmony_ci 4807db96d56Sopenharmony_ci test_stls_context = test_stls 4817db96d56Sopenharmony_ci 4827db96d56Sopenharmony_ci def test_stls_capa(self): 4837db96d56Sopenharmony_ci capa = self.client.capa() 4847db96d56Sopenharmony_ci self.assertFalse(b'STLS' in capa.keys()) 4857db96d56Sopenharmony_ci 4867db96d56Sopenharmony_ci 4877db96d56Sopenharmony_ciclass TestTimeouts(TestCase): 4887db96d56Sopenharmony_ci 4897db96d56Sopenharmony_ci def setUp(self): 4907db96d56Sopenharmony_ci self.evt = threading.Event() 4917db96d56Sopenharmony_ci self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4927db96d56Sopenharmony_ci self.sock.settimeout(60) # Safety net. Look issue 11812 4937db96d56Sopenharmony_ci self.port = socket_helper.bind_port(self.sock) 4947db96d56Sopenharmony_ci self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock)) 4957db96d56Sopenharmony_ci self.thread.daemon = True 4967db96d56Sopenharmony_ci self.thread.start() 4977db96d56Sopenharmony_ci self.evt.wait() 4987db96d56Sopenharmony_ci 4997db96d56Sopenharmony_ci def tearDown(self): 5007db96d56Sopenharmony_ci self.thread.join() 5017db96d56Sopenharmony_ci # Explicitly clear the attribute to prevent dangling thread 5027db96d56Sopenharmony_ci self.thread = None 5037db96d56Sopenharmony_ci 5047db96d56Sopenharmony_ci def server(self, evt, serv): 5057db96d56Sopenharmony_ci serv.listen() 5067db96d56Sopenharmony_ci evt.set() 5077db96d56Sopenharmony_ci try: 5087db96d56Sopenharmony_ci conn, addr = serv.accept() 5097db96d56Sopenharmony_ci conn.send(b"+ Hola mundo\n") 5107db96d56Sopenharmony_ci conn.close() 5117db96d56Sopenharmony_ci except TimeoutError: 5127db96d56Sopenharmony_ci pass 5137db96d56Sopenharmony_ci finally: 5147db96d56Sopenharmony_ci serv.close() 5157db96d56Sopenharmony_ci 5167db96d56Sopenharmony_ci def testTimeoutDefault(self): 5177db96d56Sopenharmony_ci self.assertIsNone(socket.getdefaulttimeout()) 5187db96d56Sopenharmony_ci socket.setdefaulttimeout(test_support.LOOPBACK_TIMEOUT) 5197db96d56Sopenharmony_ci try: 5207db96d56Sopenharmony_ci pop = poplib.POP3(HOST, self.port) 5217db96d56Sopenharmony_ci finally: 5227db96d56Sopenharmony_ci socket.setdefaulttimeout(None) 5237db96d56Sopenharmony_ci self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT) 5247db96d56Sopenharmony_ci pop.close() 5257db96d56Sopenharmony_ci 5267db96d56Sopenharmony_ci def testTimeoutNone(self): 5277db96d56Sopenharmony_ci self.assertIsNone(socket.getdefaulttimeout()) 5287db96d56Sopenharmony_ci socket.setdefaulttimeout(30) 5297db96d56Sopenharmony_ci try: 5307db96d56Sopenharmony_ci pop = poplib.POP3(HOST, self.port, timeout=None) 5317db96d56Sopenharmony_ci finally: 5327db96d56Sopenharmony_ci socket.setdefaulttimeout(None) 5337db96d56Sopenharmony_ci self.assertIsNone(pop.sock.gettimeout()) 5347db96d56Sopenharmony_ci pop.close() 5357db96d56Sopenharmony_ci 5367db96d56Sopenharmony_ci def testTimeoutValue(self): 5377db96d56Sopenharmony_ci pop = poplib.POP3(HOST, self.port, timeout=test_support.LOOPBACK_TIMEOUT) 5387db96d56Sopenharmony_ci self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT) 5397db96d56Sopenharmony_ci pop.close() 5407db96d56Sopenharmony_ci with self.assertRaises(ValueError): 5417db96d56Sopenharmony_ci poplib.POP3(HOST, self.port, timeout=0) 5427db96d56Sopenharmony_ci 5437db96d56Sopenharmony_ci 5447db96d56Sopenharmony_cidef setUpModule(): 5457db96d56Sopenharmony_ci thread_info = threading_helper.threading_setup() 5467db96d56Sopenharmony_ci unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 5477db96d56Sopenharmony_ci 5487db96d56Sopenharmony_ci 5497db96d56Sopenharmony_ciif __name__ == '__main__': 5507db96d56Sopenharmony_ci unittest.main() 551