17db96d56Sopenharmony_ciimport base64 27db96d56Sopenharmony_ciimport email.mime.text 37db96d56Sopenharmony_cifrom email.message import EmailMessage 47db96d56Sopenharmony_cifrom email.base64mime import body_encode as encode_base64 57db96d56Sopenharmony_ciimport email.utils 67db96d56Sopenharmony_ciimport hashlib 77db96d56Sopenharmony_ciimport hmac 87db96d56Sopenharmony_ciimport socket 97db96d56Sopenharmony_ciimport smtplib 107db96d56Sopenharmony_ciimport io 117db96d56Sopenharmony_ciimport re 127db96d56Sopenharmony_ciimport sys 137db96d56Sopenharmony_ciimport time 147db96d56Sopenharmony_ciimport select 157db96d56Sopenharmony_ciimport errno 167db96d56Sopenharmony_ciimport textwrap 177db96d56Sopenharmony_ciimport threading 187db96d56Sopenharmony_ci 197db96d56Sopenharmony_ciimport unittest 207db96d56Sopenharmony_cifrom test import support, mock_socket 217db96d56Sopenharmony_cifrom test.support import hashlib_helper 227db96d56Sopenharmony_cifrom test.support import socket_helper 237db96d56Sopenharmony_cifrom test.support import threading_helper 247db96d56Sopenharmony_cifrom test.support import warnings_helper 257db96d56Sopenharmony_cifrom unittest.mock import Mock 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ciasyncore = warnings_helper.import_deprecated('asyncore') 297db96d56Sopenharmony_cismtpd = warnings_helper.import_deprecated('smtpd') 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_cisupport.requires_working_socket(module=True) 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ciHOST = socket_helper.HOST 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_ciif sys.platform == 'darwin': 377db96d56Sopenharmony_ci # select.poll returns a select.POLLHUP at the end of the tests 387db96d56Sopenharmony_ci # on darwin, so just ignore it 397db96d56Sopenharmony_ci def handle_expt(self): 407db96d56Sopenharmony_ci pass 417db96d56Sopenharmony_ci smtpd.SMTPChannel.handle_expt = handle_expt 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_ci 447db96d56Sopenharmony_cidef server(evt, buf, serv): 457db96d56Sopenharmony_ci serv.listen() 467db96d56Sopenharmony_ci evt.set() 477db96d56Sopenharmony_ci try: 487db96d56Sopenharmony_ci conn, addr = serv.accept() 497db96d56Sopenharmony_ci except TimeoutError: 507db96d56Sopenharmony_ci pass 517db96d56Sopenharmony_ci else: 527db96d56Sopenharmony_ci n = 500 537db96d56Sopenharmony_ci while buf and n > 0: 547db96d56Sopenharmony_ci r, w, e = select.select([], [conn], []) 557db96d56Sopenharmony_ci if w: 567db96d56Sopenharmony_ci sent = conn.send(buf) 577db96d56Sopenharmony_ci buf = buf[sent:] 587db96d56Sopenharmony_ci 597db96d56Sopenharmony_ci n -= 1 607db96d56Sopenharmony_ci 617db96d56Sopenharmony_ci conn.close() 627db96d56Sopenharmony_ci finally: 637db96d56Sopenharmony_ci serv.close() 647db96d56Sopenharmony_ci evt.set() 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ciclass GeneralTests: 677db96d56Sopenharmony_ci 687db96d56Sopenharmony_ci def setUp(self): 697db96d56Sopenharmony_ci smtplib.socket = mock_socket 707db96d56Sopenharmony_ci self.port = 25 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci def tearDown(self): 737db96d56Sopenharmony_ci smtplib.socket = socket 747db96d56Sopenharmony_ci 757db96d56Sopenharmony_ci # This method is no longer used but is retained for backward compatibility, 767db96d56Sopenharmony_ci # so test to make sure it still works. 777db96d56Sopenharmony_ci def testQuoteData(self): 787db96d56Sopenharmony_ci teststr = "abc\n.jkl\rfoo\r\n..blue" 797db96d56Sopenharmony_ci expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 807db96d56Sopenharmony_ci self.assertEqual(expected, smtplib.quotedata(teststr)) 817db96d56Sopenharmony_ci 827db96d56Sopenharmony_ci def testBasic1(self): 837db96d56Sopenharmony_ci mock_socket.reply_with(b"220 Hola mundo") 847db96d56Sopenharmony_ci # connects 857db96d56Sopenharmony_ci client = self.client(HOST, self.port) 867db96d56Sopenharmony_ci client.close() 877db96d56Sopenharmony_ci 887db96d56Sopenharmony_ci def testSourceAddress(self): 897db96d56Sopenharmony_ci mock_socket.reply_with(b"220 Hola mundo") 907db96d56Sopenharmony_ci # connects 917db96d56Sopenharmony_ci client = self.client(HOST, self.port, 927db96d56Sopenharmony_ci source_address=('127.0.0.1',19876)) 937db96d56Sopenharmony_ci self.assertEqual(client.source_address, ('127.0.0.1', 19876)) 947db96d56Sopenharmony_ci client.close() 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_ci def testBasic2(self): 977db96d56Sopenharmony_ci mock_socket.reply_with(b"220 Hola mundo") 987db96d56Sopenharmony_ci # connects, include port in host name 997db96d56Sopenharmony_ci client = self.client("%s:%s" % (HOST, self.port)) 1007db96d56Sopenharmony_ci client.close() 1017db96d56Sopenharmony_ci 1027db96d56Sopenharmony_ci def testLocalHostName(self): 1037db96d56Sopenharmony_ci mock_socket.reply_with(b"220 Hola mundo") 1047db96d56Sopenharmony_ci # check that supplied local_hostname is used 1057db96d56Sopenharmony_ci client = self.client(HOST, self.port, local_hostname="testhost") 1067db96d56Sopenharmony_ci self.assertEqual(client.local_hostname, "testhost") 1077db96d56Sopenharmony_ci client.close() 1087db96d56Sopenharmony_ci 1097db96d56Sopenharmony_ci def testTimeoutDefault(self): 1107db96d56Sopenharmony_ci mock_socket.reply_with(b"220 Hola mundo") 1117db96d56Sopenharmony_ci self.assertIsNone(mock_socket.getdefaulttimeout()) 1127db96d56Sopenharmony_ci mock_socket.setdefaulttimeout(30) 1137db96d56Sopenharmony_ci self.assertEqual(mock_socket.getdefaulttimeout(), 30) 1147db96d56Sopenharmony_ci try: 1157db96d56Sopenharmony_ci client = self.client(HOST, self.port) 1167db96d56Sopenharmony_ci finally: 1177db96d56Sopenharmony_ci mock_socket.setdefaulttimeout(None) 1187db96d56Sopenharmony_ci self.assertEqual(client.sock.gettimeout(), 30) 1197db96d56Sopenharmony_ci client.close() 1207db96d56Sopenharmony_ci 1217db96d56Sopenharmony_ci def testTimeoutNone(self): 1227db96d56Sopenharmony_ci mock_socket.reply_with(b"220 Hola mundo") 1237db96d56Sopenharmony_ci self.assertIsNone(socket.getdefaulttimeout()) 1247db96d56Sopenharmony_ci socket.setdefaulttimeout(30) 1257db96d56Sopenharmony_ci try: 1267db96d56Sopenharmony_ci client = self.client(HOST, self.port, timeout=None) 1277db96d56Sopenharmony_ci finally: 1287db96d56Sopenharmony_ci socket.setdefaulttimeout(None) 1297db96d56Sopenharmony_ci self.assertIsNone(client.sock.gettimeout()) 1307db96d56Sopenharmony_ci client.close() 1317db96d56Sopenharmony_ci 1327db96d56Sopenharmony_ci def testTimeoutZero(self): 1337db96d56Sopenharmony_ci mock_socket.reply_with(b"220 Hola mundo") 1347db96d56Sopenharmony_ci with self.assertRaises(ValueError): 1357db96d56Sopenharmony_ci self.client(HOST, self.port, timeout=0) 1367db96d56Sopenharmony_ci 1377db96d56Sopenharmony_ci def testTimeoutValue(self): 1387db96d56Sopenharmony_ci mock_socket.reply_with(b"220 Hola mundo") 1397db96d56Sopenharmony_ci client = self.client(HOST, self.port, timeout=30) 1407db96d56Sopenharmony_ci self.assertEqual(client.sock.gettimeout(), 30) 1417db96d56Sopenharmony_ci client.close() 1427db96d56Sopenharmony_ci 1437db96d56Sopenharmony_ci def test_debuglevel(self): 1447db96d56Sopenharmony_ci mock_socket.reply_with(b"220 Hello world") 1457db96d56Sopenharmony_ci client = self.client() 1467db96d56Sopenharmony_ci client.set_debuglevel(1) 1477db96d56Sopenharmony_ci with support.captured_stderr() as stderr: 1487db96d56Sopenharmony_ci client.connect(HOST, self.port) 1497db96d56Sopenharmony_ci client.close() 1507db96d56Sopenharmony_ci expected = re.compile(r"^connect:", re.MULTILINE) 1517db96d56Sopenharmony_ci self.assertRegex(stderr.getvalue(), expected) 1527db96d56Sopenharmony_ci 1537db96d56Sopenharmony_ci def test_debuglevel_2(self): 1547db96d56Sopenharmony_ci mock_socket.reply_with(b"220 Hello world") 1557db96d56Sopenharmony_ci client = self.client() 1567db96d56Sopenharmony_ci client.set_debuglevel(2) 1577db96d56Sopenharmony_ci with support.captured_stderr() as stderr: 1587db96d56Sopenharmony_ci client.connect(HOST, self.port) 1597db96d56Sopenharmony_ci client.close() 1607db96d56Sopenharmony_ci expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 1617db96d56Sopenharmony_ci re.MULTILINE) 1627db96d56Sopenharmony_ci self.assertRegex(stderr.getvalue(), expected) 1637db96d56Sopenharmony_ci 1647db96d56Sopenharmony_ci 1657db96d56Sopenharmony_ciclass SMTPGeneralTests(GeneralTests, unittest.TestCase): 1667db96d56Sopenharmony_ci 1677db96d56Sopenharmony_ci client = smtplib.SMTP 1687db96d56Sopenharmony_ci 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ciclass LMTPGeneralTests(GeneralTests, unittest.TestCase): 1717db96d56Sopenharmony_ci 1727db96d56Sopenharmony_ci client = smtplib.LMTP 1737db96d56Sopenharmony_ci 1747db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket") 1757db96d56Sopenharmony_ci def testUnixDomainSocketTimeoutDefault(self): 1767db96d56Sopenharmony_ci local_host = '/some/local/lmtp/delivery/program' 1777db96d56Sopenharmony_ci mock_socket.reply_with(b"220 Hello world") 1787db96d56Sopenharmony_ci try: 1797db96d56Sopenharmony_ci client = self.client(local_host, self.port) 1807db96d56Sopenharmony_ci finally: 1817db96d56Sopenharmony_ci mock_socket.setdefaulttimeout(None) 1827db96d56Sopenharmony_ci self.assertIsNone(client.sock.gettimeout()) 1837db96d56Sopenharmony_ci client.close() 1847db96d56Sopenharmony_ci 1857db96d56Sopenharmony_ci def testTimeoutZero(self): 1867db96d56Sopenharmony_ci super().testTimeoutZero() 1877db96d56Sopenharmony_ci local_host = '/some/local/lmtp/delivery/program' 1887db96d56Sopenharmony_ci with self.assertRaises(ValueError): 1897db96d56Sopenharmony_ci self.client(local_host, timeout=0) 1907db96d56Sopenharmony_ci 1917db96d56Sopenharmony_ci# Test server thread using the specified SMTP server class 1927db96d56Sopenharmony_cidef debugging_server(serv, serv_evt, client_evt): 1937db96d56Sopenharmony_ci serv_evt.set() 1947db96d56Sopenharmony_ci 1957db96d56Sopenharmony_ci try: 1967db96d56Sopenharmony_ci if hasattr(select, 'poll'): 1977db96d56Sopenharmony_ci poll_fun = asyncore.poll2 1987db96d56Sopenharmony_ci else: 1997db96d56Sopenharmony_ci poll_fun = asyncore.poll 2007db96d56Sopenharmony_ci 2017db96d56Sopenharmony_ci n = 1000 2027db96d56Sopenharmony_ci while asyncore.socket_map and n > 0: 2037db96d56Sopenharmony_ci poll_fun(0.01, asyncore.socket_map) 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_ci # when the client conversation is finished, it will 2067db96d56Sopenharmony_ci # set client_evt, and it's then ok to kill the server 2077db96d56Sopenharmony_ci if client_evt.is_set(): 2087db96d56Sopenharmony_ci serv.close() 2097db96d56Sopenharmony_ci break 2107db96d56Sopenharmony_ci 2117db96d56Sopenharmony_ci n -= 1 2127db96d56Sopenharmony_ci 2137db96d56Sopenharmony_ci except TimeoutError: 2147db96d56Sopenharmony_ci pass 2157db96d56Sopenharmony_ci finally: 2167db96d56Sopenharmony_ci if not client_evt.is_set(): 2177db96d56Sopenharmony_ci # allow some time for the client to read the result 2187db96d56Sopenharmony_ci time.sleep(0.5) 2197db96d56Sopenharmony_ci serv.close() 2207db96d56Sopenharmony_ci asyncore.close_all() 2217db96d56Sopenharmony_ci serv_evt.set() 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_ciMSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 2247db96d56Sopenharmony_ciMSG_END = '------------ END MESSAGE ------------\n' 2257db96d56Sopenharmony_ci 2267db96d56Sopenharmony_ci# NOTE: Some SMTP objects in the tests below are created with a non-default 2277db96d56Sopenharmony_ci# local_hostname argument to the constructor, since (on some systems) the FQDN 2287db96d56Sopenharmony_ci# lookup caused by the default local_hostname sometimes takes so long that the 2297db96d56Sopenharmony_ci# test server times out, causing the test to fail. 2307db96d56Sopenharmony_ci 2317db96d56Sopenharmony_ci# Test behavior of smtpd.DebuggingServer 2327db96d56Sopenharmony_ciclass DebuggingServerTests(unittest.TestCase): 2337db96d56Sopenharmony_ci 2347db96d56Sopenharmony_ci maxDiff = None 2357db96d56Sopenharmony_ci 2367db96d56Sopenharmony_ci def setUp(self): 2377db96d56Sopenharmony_ci self.thread_key = threading_helper.threading_setup() 2387db96d56Sopenharmony_ci self.real_getfqdn = socket.getfqdn 2397db96d56Sopenharmony_ci socket.getfqdn = mock_socket.getfqdn 2407db96d56Sopenharmony_ci # temporarily replace sys.stdout to capture DebuggingServer output 2417db96d56Sopenharmony_ci self.old_stdout = sys.stdout 2427db96d56Sopenharmony_ci self.output = io.StringIO() 2437db96d56Sopenharmony_ci sys.stdout = self.output 2447db96d56Sopenharmony_ci 2457db96d56Sopenharmony_ci self.serv_evt = threading.Event() 2467db96d56Sopenharmony_ci self.client_evt = threading.Event() 2477db96d56Sopenharmony_ci # Capture SMTPChannel debug output 2487db96d56Sopenharmony_ci self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 2497db96d56Sopenharmony_ci smtpd.DEBUGSTREAM = io.StringIO() 2507db96d56Sopenharmony_ci # Pick a random unused port by passing 0 for the port number 2517db96d56Sopenharmony_ci self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 2527db96d56Sopenharmony_ci decode_data=True) 2537db96d56Sopenharmony_ci # Keep a note of what server host and port were assigned 2547db96d56Sopenharmony_ci self.host, self.port = self.serv.socket.getsockname()[:2] 2557db96d56Sopenharmony_ci serv_args = (self.serv, self.serv_evt, self.client_evt) 2567db96d56Sopenharmony_ci self.thread = threading.Thread(target=debugging_server, args=serv_args) 2577db96d56Sopenharmony_ci self.thread.start() 2587db96d56Sopenharmony_ci 2597db96d56Sopenharmony_ci # wait until server thread has assigned a port number 2607db96d56Sopenharmony_ci self.serv_evt.wait() 2617db96d56Sopenharmony_ci self.serv_evt.clear() 2627db96d56Sopenharmony_ci 2637db96d56Sopenharmony_ci def tearDown(self): 2647db96d56Sopenharmony_ci socket.getfqdn = self.real_getfqdn 2657db96d56Sopenharmony_ci # indicate that the client is finished 2667db96d56Sopenharmony_ci self.client_evt.set() 2677db96d56Sopenharmony_ci # wait for the server thread to terminate 2687db96d56Sopenharmony_ci self.serv_evt.wait() 2697db96d56Sopenharmony_ci threading_helper.join_thread(self.thread) 2707db96d56Sopenharmony_ci # restore sys.stdout 2717db96d56Sopenharmony_ci sys.stdout = self.old_stdout 2727db96d56Sopenharmony_ci # restore DEBUGSTREAM 2737db96d56Sopenharmony_ci smtpd.DEBUGSTREAM.close() 2747db96d56Sopenharmony_ci smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 2757db96d56Sopenharmony_ci del self.thread 2767db96d56Sopenharmony_ci self.doCleanups() 2777db96d56Sopenharmony_ci threading_helper.threading_cleanup(*self.thread_key) 2787db96d56Sopenharmony_ci 2797db96d56Sopenharmony_ci def get_output_without_xpeer(self): 2807db96d56Sopenharmony_ci test_output = self.output.getvalue() 2817db96d56Sopenharmony_ci return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2', 2827db96d56Sopenharmony_ci test_output, flags=re.MULTILINE|re.DOTALL) 2837db96d56Sopenharmony_ci 2847db96d56Sopenharmony_ci def testBasic(self): 2857db96d56Sopenharmony_ci # connect 2867db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 2877db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 2887db96d56Sopenharmony_ci smtp.quit() 2897db96d56Sopenharmony_ci 2907db96d56Sopenharmony_ci def testSourceAddress(self): 2917db96d56Sopenharmony_ci # connect 2927db96d56Sopenharmony_ci src_port = socket_helper.find_unused_port() 2937db96d56Sopenharmony_ci try: 2947db96d56Sopenharmony_ci smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', 2957db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT, 2967db96d56Sopenharmony_ci source_address=(self.host, src_port)) 2977db96d56Sopenharmony_ci self.addCleanup(smtp.close) 2987db96d56Sopenharmony_ci self.assertEqual(smtp.source_address, (self.host, src_port)) 2997db96d56Sopenharmony_ci self.assertEqual(smtp.local_hostname, 'localhost') 3007db96d56Sopenharmony_ci smtp.quit() 3017db96d56Sopenharmony_ci except OSError as e: 3027db96d56Sopenharmony_ci if e.errno == errno.EADDRINUSE: 3037db96d56Sopenharmony_ci self.skipTest("couldn't bind to source port %d" % src_port) 3047db96d56Sopenharmony_ci raise 3057db96d56Sopenharmony_ci 3067db96d56Sopenharmony_ci def testNOOP(self): 3077db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 3087db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 3097db96d56Sopenharmony_ci self.addCleanup(smtp.close) 3107db96d56Sopenharmony_ci expected = (250, b'OK') 3117db96d56Sopenharmony_ci self.assertEqual(smtp.noop(), expected) 3127db96d56Sopenharmony_ci smtp.quit() 3137db96d56Sopenharmony_ci 3147db96d56Sopenharmony_ci def testRSET(self): 3157db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 3167db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 3177db96d56Sopenharmony_ci self.addCleanup(smtp.close) 3187db96d56Sopenharmony_ci expected = (250, b'OK') 3197db96d56Sopenharmony_ci self.assertEqual(smtp.rset(), expected) 3207db96d56Sopenharmony_ci smtp.quit() 3217db96d56Sopenharmony_ci 3227db96d56Sopenharmony_ci def testELHO(self): 3237db96d56Sopenharmony_ci # EHLO isn't implemented in DebuggingServer 3247db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 3257db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 3267db96d56Sopenharmony_ci self.addCleanup(smtp.close) 3277db96d56Sopenharmony_ci expected = (250, b'\nSIZE 33554432\nHELP') 3287db96d56Sopenharmony_ci self.assertEqual(smtp.ehlo(), expected) 3297db96d56Sopenharmony_ci smtp.quit() 3307db96d56Sopenharmony_ci 3317db96d56Sopenharmony_ci def testEXPNNotImplemented(self): 3327db96d56Sopenharmony_ci # EXPN isn't implemented in DebuggingServer 3337db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 3347db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 3357db96d56Sopenharmony_ci self.addCleanup(smtp.close) 3367db96d56Sopenharmony_ci expected = (502, b'EXPN not implemented') 3377db96d56Sopenharmony_ci smtp.putcmd('EXPN') 3387db96d56Sopenharmony_ci self.assertEqual(smtp.getreply(), expected) 3397db96d56Sopenharmony_ci smtp.quit() 3407db96d56Sopenharmony_ci 3417db96d56Sopenharmony_ci def test_issue43124_putcmd_escapes_newline(self): 3427db96d56Sopenharmony_ci # see: https://bugs.python.org/issue43124 3437db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 3447db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 3457db96d56Sopenharmony_ci self.addCleanup(smtp.close) 3467db96d56Sopenharmony_ci with self.assertRaises(ValueError) as exc: 3477db96d56Sopenharmony_ci smtp.putcmd('helo\nX-INJECTED') 3487db96d56Sopenharmony_ci self.assertIn("prohibited newline characters", str(exc.exception)) 3497db96d56Sopenharmony_ci smtp.quit() 3507db96d56Sopenharmony_ci 3517db96d56Sopenharmony_ci def testVRFY(self): 3527db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 3537db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 3547db96d56Sopenharmony_ci self.addCleanup(smtp.close) 3557db96d56Sopenharmony_ci expected = (252, b'Cannot VRFY user, but will accept message ' + \ 3567db96d56Sopenharmony_ci b'and attempt delivery') 3577db96d56Sopenharmony_ci self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) 3587db96d56Sopenharmony_ci self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) 3597db96d56Sopenharmony_ci smtp.quit() 3607db96d56Sopenharmony_ci 3617db96d56Sopenharmony_ci def testSecondHELO(self): 3627db96d56Sopenharmony_ci # check that a second HELO returns a message that it's a duplicate 3637db96d56Sopenharmony_ci # (this behavior is specific to smtpd.SMTPChannel) 3647db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 3657db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 3667db96d56Sopenharmony_ci self.addCleanup(smtp.close) 3677db96d56Sopenharmony_ci smtp.helo() 3687db96d56Sopenharmony_ci expected = (503, b'Duplicate HELO/EHLO') 3697db96d56Sopenharmony_ci self.assertEqual(smtp.helo(), expected) 3707db96d56Sopenharmony_ci smtp.quit() 3717db96d56Sopenharmony_ci 3727db96d56Sopenharmony_ci def testHELP(self): 3737db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 3747db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 3757db96d56Sopenharmony_ci self.addCleanup(smtp.close) 3767db96d56Sopenharmony_ci self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 3777db96d56Sopenharmony_ci b'RCPT DATA RSET NOOP QUIT VRFY') 3787db96d56Sopenharmony_ci smtp.quit() 3797db96d56Sopenharmony_ci 3807db96d56Sopenharmony_ci def testSend(self): 3817db96d56Sopenharmony_ci # connect and send mail 3827db96d56Sopenharmony_ci m = 'A test message' 3837db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 3847db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 3857db96d56Sopenharmony_ci self.addCleanup(smtp.close) 3867db96d56Sopenharmony_ci smtp.sendmail('John', 'Sally', m) 3877db96d56Sopenharmony_ci # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 3887db96d56Sopenharmony_ci # in asyncore. This sleep might help, but should really be fixed 3897db96d56Sopenharmony_ci # properly by using an Event variable. 3907db96d56Sopenharmony_ci time.sleep(0.01) 3917db96d56Sopenharmony_ci smtp.quit() 3927db96d56Sopenharmony_ci 3937db96d56Sopenharmony_ci self.client_evt.set() 3947db96d56Sopenharmony_ci self.serv_evt.wait() 3957db96d56Sopenharmony_ci self.output.flush() 3967db96d56Sopenharmony_ci mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 3977db96d56Sopenharmony_ci self.assertEqual(self.output.getvalue(), mexpect) 3987db96d56Sopenharmony_ci 3997db96d56Sopenharmony_ci def testSendBinary(self): 4007db96d56Sopenharmony_ci m = b'A test message' 4017db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 4027db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 4037db96d56Sopenharmony_ci self.addCleanup(smtp.close) 4047db96d56Sopenharmony_ci smtp.sendmail('John', 'Sally', m) 4057db96d56Sopenharmony_ci # XXX (see comment in testSend) 4067db96d56Sopenharmony_ci time.sleep(0.01) 4077db96d56Sopenharmony_ci smtp.quit() 4087db96d56Sopenharmony_ci 4097db96d56Sopenharmony_ci self.client_evt.set() 4107db96d56Sopenharmony_ci self.serv_evt.wait() 4117db96d56Sopenharmony_ci self.output.flush() 4127db96d56Sopenharmony_ci mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 4137db96d56Sopenharmony_ci self.assertEqual(self.output.getvalue(), mexpect) 4147db96d56Sopenharmony_ci 4157db96d56Sopenharmony_ci def testSendNeedingDotQuote(self): 4167db96d56Sopenharmony_ci # Issue 12283 4177db96d56Sopenharmony_ci m = '.A test\n.mes.sage.' 4187db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 4197db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 4207db96d56Sopenharmony_ci self.addCleanup(smtp.close) 4217db96d56Sopenharmony_ci smtp.sendmail('John', 'Sally', m) 4227db96d56Sopenharmony_ci # XXX (see comment in testSend) 4237db96d56Sopenharmony_ci time.sleep(0.01) 4247db96d56Sopenharmony_ci smtp.quit() 4257db96d56Sopenharmony_ci 4267db96d56Sopenharmony_ci self.client_evt.set() 4277db96d56Sopenharmony_ci self.serv_evt.wait() 4287db96d56Sopenharmony_ci self.output.flush() 4297db96d56Sopenharmony_ci mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 4307db96d56Sopenharmony_ci self.assertEqual(self.output.getvalue(), mexpect) 4317db96d56Sopenharmony_ci 4327db96d56Sopenharmony_ci def test_issue43124_escape_localhostname(self): 4337db96d56Sopenharmony_ci # see: https://bugs.python.org/issue43124 4347db96d56Sopenharmony_ci # connect and send mail 4357db96d56Sopenharmony_ci m = 'wazzuuup\nlinetwo' 4367db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED', 4377db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 4387db96d56Sopenharmony_ci self.addCleanup(smtp.close) 4397db96d56Sopenharmony_ci with self.assertRaises(ValueError) as exc: 4407db96d56Sopenharmony_ci smtp.sendmail("hi@me.com", "you@me.com", m) 4417db96d56Sopenharmony_ci self.assertIn( 4427db96d56Sopenharmony_ci "prohibited newline characters: ehlo hi\\nX-INJECTED", 4437db96d56Sopenharmony_ci str(exc.exception), 4447db96d56Sopenharmony_ci ) 4457db96d56Sopenharmony_ci # XXX (see comment in testSend) 4467db96d56Sopenharmony_ci time.sleep(0.01) 4477db96d56Sopenharmony_ci smtp.quit() 4487db96d56Sopenharmony_ci 4497db96d56Sopenharmony_ci debugout = smtpd.DEBUGSTREAM.getvalue() 4507db96d56Sopenharmony_ci self.assertNotIn("X-INJECTED", debugout) 4517db96d56Sopenharmony_ci 4527db96d56Sopenharmony_ci def test_issue43124_escape_options(self): 4537db96d56Sopenharmony_ci # see: https://bugs.python.org/issue43124 4547db96d56Sopenharmony_ci # connect and send mail 4557db96d56Sopenharmony_ci m = 'wazzuuup\nlinetwo' 4567db96d56Sopenharmony_ci smtp = smtplib.SMTP( 4577db96d56Sopenharmony_ci HOST, self.port, local_hostname='localhost', 4587db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 4597db96d56Sopenharmony_ci 4607db96d56Sopenharmony_ci self.addCleanup(smtp.close) 4617db96d56Sopenharmony_ci smtp.sendmail("hi@me.com", "you@me.com", m) 4627db96d56Sopenharmony_ci with self.assertRaises(ValueError) as exc: 4637db96d56Sopenharmony_ci smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"]) 4647db96d56Sopenharmony_ci msg = str(exc.exception) 4657db96d56Sopenharmony_ci self.assertIn("prohibited newline characters", msg) 4667db96d56Sopenharmony_ci self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg) 4677db96d56Sopenharmony_ci # XXX (see comment in testSend) 4687db96d56Sopenharmony_ci time.sleep(0.01) 4697db96d56Sopenharmony_ci smtp.quit() 4707db96d56Sopenharmony_ci 4717db96d56Sopenharmony_ci debugout = smtpd.DEBUGSTREAM.getvalue() 4727db96d56Sopenharmony_ci self.assertNotIn("X-OPTION", debugout) 4737db96d56Sopenharmony_ci self.assertNotIn("X-OPTION2", debugout) 4747db96d56Sopenharmony_ci self.assertNotIn("X-INJECTED-1", debugout) 4757db96d56Sopenharmony_ci self.assertNotIn("X-INJECTED-2", debugout) 4767db96d56Sopenharmony_ci 4777db96d56Sopenharmony_ci def testSendNullSender(self): 4787db96d56Sopenharmony_ci m = 'A test message' 4797db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 4807db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 4817db96d56Sopenharmony_ci self.addCleanup(smtp.close) 4827db96d56Sopenharmony_ci smtp.sendmail('<>', 'Sally', m) 4837db96d56Sopenharmony_ci # XXX (see comment in testSend) 4847db96d56Sopenharmony_ci time.sleep(0.01) 4857db96d56Sopenharmony_ci smtp.quit() 4867db96d56Sopenharmony_ci 4877db96d56Sopenharmony_ci self.client_evt.set() 4887db96d56Sopenharmony_ci self.serv_evt.wait() 4897db96d56Sopenharmony_ci self.output.flush() 4907db96d56Sopenharmony_ci mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 4917db96d56Sopenharmony_ci self.assertEqual(self.output.getvalue(), mexpect) 4927db96d56Sopenharmony_ci debugout = smtpd.DEBUGSTREAM.getvalue() 4937db96d56Sopenharmony_ci sender = re.compile("^sender: <>$", re.MULTILINE) 4947db96d56Sopenharmony_ci self.assertRegex(debugout, sender) 4957db96d56Sopenharmony_ci 4967db96d56Sopenharmony_ci def testSendMessage(self): 4977db96d56Sopenharmony_ci m = email.mime.text.MIMEText('A test message') 4987db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 4997db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 5007db96d56Sopenharmony_ci self.addCleanup(smtp.close) 5017db96d56Sopenharmony_ci smtp.send_message(m, from_addr='John', to_addrs='Sally') 5027db96d56Sopenharmony_ci # XXX (see comment in testSend) 5037db96d56Sopenharmony_ci time.sleep(0.01) 5047db96d56Sopenharmony_ci smtp.quit() 5057db96d56Sopenharmony_ci 5067db96d56Sopenharmony_ci self.client_evt.set() 5077db96d56Sopenharmony_ci self.serv_evt.wait() 5087db96d56Sopenharmony_ci self.output.flush() 5097db96d56Sopenharmony_ci # Remove the X-Peer header that DebuggingServer adds as figuring out 5107db96d56Sopenharmony_ci # exactly what IP address format is put there is not easy (and 5117db96d56Sopenharmony_ci # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is 5127db96d56Sopenharmony_ci # not always the same as socket.gethostbyname(HOST). :( 5137db96d56Sopenharmony_ci test_output = self.get_output_without_xpeer() 5147db96d56Sopenharmony_ci del m['X-Peer'] 5157db96d56Sopenharmony_ci mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 5167db96d56Sopenharmony_ci self.assertEqual(test_output, mexpect) 5177db96d56Sopenharmony_ci 5187db96d56Sopenharmony_ci def testSendMessageWithAddresses(self): 5197db96d56Sopenharmony_ci m = email.mime.text.MIMEText('A test message') 5207db96d56Sopenharmony_ci m['From'] = 'foo@bar.com' 5217db96d56Sopenharmony_ci m['To'] = 'John' 5227db96d56Sopenharmony_ci m['CC'] = 'Sally, Fred' 5237db96d56Sopenharmony_ci m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 5247db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 5257db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 5267db96d56Sopenharmony_ci self.addCleanup(smtp.close) 5277db96d56Sopenharmony_ci smtp.send_message(m) 5287db96d56Sopenharmony_ci # XXX (see comment in testSend) 5297db96d56Sopenharmony_ci time.sleep(0.01) 5307db96d56Sopenharmony_ci smtp.quit() 5317db96d56Sopenharmony_ci # make sure the Bcc header is still in the message. 5327db96d56Sopenharmony_ci self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 5337db96d56Sopenharmony_ci '<warped@silly.walks.com>') 5347db96d56Sopenharmony_ci 5357db96d56Sopenharmony_ci self.client_evt.set() 5367db96d56Sopenharmony_ci self.serv_evt.wait() 5377db96d56Sopenharmony_ci self.output.flush() 5387db96d56Sopenharmony_ci # Remove the X-Peer header that DebuggingServer adds. 5397db96d56Sopenharmony_ci test_output = self.get_output_without_xpeer() 5407db96d56Sopenharmony_ci del m['X-Peer'] 5417db96d56Sopenharmony_ci # The Bcc header should not be transmitted. 5427db96d56Sopenharmony_ci del m['Bcc'] 5437db96d56Sopenharmony_ci mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 5447db96d56Sopenharmony_ci self.assertEqual(test_output, mexpect) 5457db96d56Sopenharmony_ci debugout = smtpd.DEBUGSTREAM.getvalue() 5467db96d56Sopenharmony_ci sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 5477db96d56Sopenharmony_ci self.assertRegex(debugout, sender) 5487db96d56Sopenharmony_ci for addr in ('John', 'Sally', 'Fred', 'root@localhost', 5497db96d56Sopenharmony_ci 'warped@silly.walks.com'): 5507db96d56Sopenharmony_ci to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 5517db96d56Sopenharmony_ci re.MULTILINE) 5527db96d56Sopenharmony_ci self.assertRegex(debugout, to_addr) 5537db96d56Sopenharmony_ci 5547db96d56Sopenharmony_ci def testSendMessageWithSomeAddresses(self): 5557db96d56Sopenharmony_ci # Make sure nothing breaks if not all of the three 'to' headers exist 5567db96d56Sopenharmony_ci m = email.mime.text.MIMEText('A test message') 5577db96d56Sopenharmony_ci m['From'] = 'foo@bar.com' 5587db96d56Sopenharmony_ci m['To'] = 'John, Dinsdale' 5597db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 5607db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 5617db96d56Sopenharmony_ci self.addCleanup(smtp.close) 5627db96d56Sopenharmony_ci smtp.send_message(m) 5637db96d56Sopenharmony_ci # XXX (see comment in testSend) 5647db96d56Sopenharmony_ci time.sleep(0.01) 5657db96d56Sopenharmony_ci smtp.quit() 5667db96d56Sopenharmony_ci 5677db96d56Sopenharmony_ci self.client_evt.set() 5687db96d56Sopenharmony_ci self.serv_evt.wait() 5697db96d56Sopenharmony_ci self.output.flush() 5707db96d56Sopenharmony_ci # Remove the X-Peer header that DebuggingServer adds. 5717db96d56Sopenharmony_ci test_output = self.get_output_without_xpeer() 5727db96d56Sopenharmony_ci del m['X-Peer'] 5737db96d56Sopenharmony_ci mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 5747db96d56Sopenharmony_ci self.assertEqual(test_output, mexpect) 5757db96d56Sopenharmony_ci debugout = smtpd.DEBUGSTREAM.getvalue() 5767db96d56Sopenharmony_ci sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 5777db96d56Sopenharmony_ci self.assertRegex(debugout, sender) 5787db96d56Sopenharmony_ci for addr in ('John', 'Dinsdale'): 5797db96d56Sopenharmony_ci to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 5807db96d56Sopenharmony_ci re.MULTILINE) 5817db96d56Sopenharmony_ci self.assertRegex(debugout, to_addr) 5827db96d56Sopenharmony_ci 5837db96d56Sopenharmony_ci def testSendMessageWithSpecifiedAddresses(self): 5847db96d56Sopenharmony_ci # Make sure addresses specified in call override those in message. 5857db96d56Sopenharmony_ci m = email.mime.text.MIMEText('A test message') 5867db96d56Sopenharmony_ci m['From'] = 'foo@bar.com' 5877db96d56Sopenharmony_ci m['To'] = 'John, Dinsdale' 5887db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 5897db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 5907db96d56Sopenharmony_ci self.addCleanup(smtp.close) 5917db96d56Sopenharmony_ci smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net') 5927db96d56Sopenharmony_ci # XXX (see comment in testSend) 5937db96d56Sopenharmony_ci time.sleep(0.01) 5947db96d56Sopenharmony_ci smtp.quit() 5957db96d56Sopenharmony_ci 5967db96d56Sopenharmony_ci self.client_evt.set() 5977db96d56Sopenharmony_ci self.serv_evt.wait() 5987db96d56Sopenharmony_ci self.output.flush() 5997db96d56Sopenharmony_ci # Remove the X-Peer header that DebuggingServer adds. 6007db96d56Sopenharmony_ci test_output = self.get_output_without_xpeer() 6017db96d56Sopenharmony_ci del m['X-Peer'] 6027db96d56Sopenharmony_ci mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 6037db96d56Sopenharmony_ci self.assertEqual(test_output, mexpect) 6047db96d56Sopenharmony_ci debugout = smtpd.DEBUGSTREAM.getvalue() 6057db96d56Sopenharmony_ci sender = re.compile("^sender: joe@example.com$", re.MULTILINE) 6067db96d56Sopenharmony_ci self.assertRegex(debugout, sender) 6077db96d56Sopenharmony_ci for addr in ('John', 'Dinsdale'): 6087db96d56Sopenharmony_ci to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 6097db96d56Sopenharmony_ci re.MULTILINE) 6107db96d56Sopenharmony_ci self.assertNotRegex(debugout, to_addr) 6117db96d56Sopenharmony_ci recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE) 6127db96d56Sopenharmony_ci self.assertRegex(debugout, recip) 6137db96d56Sopenharmony_ci 6147db96d56Sopenharmony_ci def testSendMessageWithMultipleFrom(self): 6157db96d56Sopenharmony_ci # Sender overrides To 6167db96d56Sopenharmony_ci m = email.mime.text.MIMEText('A test message') 6177db96d56Sopenharmony_ci m['From'] = 'Bernard, Bianca' 6187db96d56Sopenharmony_ci m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com' 6197db96d56Sopenharmony_ci m['To'] = 'John, Dinsdale' 6207db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 6217db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 6227db96d56Sopenharmony_ci self.addCleanup(smtp.close) 6237db96d56Sopenharmony_ci smtp.send_message(m) 6247db96d56Sopenharmony_ci # XXX (see comment in testSend) 6257db96d56Sopenharmony_ci time.sleep(0.01) 6267db96d56Sopenharmony_ci smtp.quit() 6277db96d56Sopenharmony_ci 6287db96d56Sopenharmony_ci self.client_evt.set() 6297db96d56Sopenharmony_ci self.serv_evt.wait() 6307db96d56Sopenharmony_ci self.output.flush() 6317db96d56Sopenharmony_ci # Remove the X-Peer header that DebuggingServer adds. 6327db96d56Sopenharmony_ci test_output = self.get_output_without_xpeer() 6337db96d56Sopenharmony_ci del m['X-Peer'] 6347db96d56Sopenharmony_ci mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 6357db96d56Sopenharmony_ci self.assertEqual(test_output, mexpect) 6367db96d56Sopenharmony_ci debugout = smtpd.DEBUGSTREAM.getvalue() 6377db96d56Sopenharmony_ci sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE) 6387db96d56Sopenharmony_ci self.assertRegex(debugout, sender) 6397db96d56Sopenharmony_ci for addr in ('John', 'Dinsdale'): 6407db96d56Sopenharmony_ci to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 6417db96d56Sopenharmony_ci re.MULTILINE) 6427db96d56Sopenharmony_ci self.assertRegex(debugout, to_addr) 6437db96d56Sopenharmony_ci 6447db96d56Sopenharmony_ci def testSendMessageResent(self): 6457db96d56Sopenharmony_ci m = email.mime.text.MIMEText('A test message') 6467db96d56Sopenharmony_ci m['From'] = 'foo@bar.com' 6477db96d56Sopenharmony_ci m['To'] = 'John' 6487db96d56Sopenharmony_ci m['CC'] = 'Sally, Fred' 6497db96d56Sopenharmony_ci m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 6507db96d56Sopenharmony_ci m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 6517db96d56Sopenharmony_ci m['Resent-From'] = 'holy@grail.net' 6527db96d56Sopenharmony_ci m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 6537db96d56Sopenharmony_ci m['Resent-Bcc'] = 'doe@losthope.net' 6547db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 6557db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 6567db96d56Sopenharmony_ci self.addCleanup(smtp.close) 6577db96d56Sopenharmony_ci smtp.send_message(m) 6587db96d56Sopenharmony_ci # XXX (see comment in testSend) 6597db96d56Sopenharmony_ci time.sleep(0.01) 6607db96d56Sopenharmony_ci smtp.quit() 6617db96d56Sopenharmony_ci 6627db96d56Sopenharmony_ci self.client_evt.set() 6637db96d56Sopenharmony_ci self.serv_evt.wait() 6647db96d56Sopenharmony_ci self.output.flush() 6657db96d56Sopenharmony_ci # The Resent-Bcc headers are deleted before serialization. 6667db96d56Sopenharmony_ci del m['Bcc'] 6677db96d56Sopenharmony_ci del m['Resent-Bcc'] 6687db96d56Sopenharmony_ci # Remove the X-Peer header that DebuggingServer adds. 6697db96d56Sopenharmony_ci test_output = self.get_output_without_xpeer() 6707db96d56Sopenharmony_ci del m['X-Peer'] 6717db96d56Sopenharmony_ci mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 6727db96d56Sopenharmony_ci self.assertEqual(test_output, mexpect) 6737db96d56Sopenharmony_ci debugout = smtpd.DEBUGSTREAM.getvalue() 6747db96d56Sopenharmony_ci sender = re.compile("^sender: holy@grail.net$", re.MULTILINE) 6757db96d56Sopenharmony_ci self.assertRegex(debugout, sender) 6767db96d56Sopenharmony_ci for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'): 6777db96d56Sopenharmony_ci to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 6787db96d56Sopenharmony_ci re.MULTILINE) 6797db96d56Sopenharmony_ci self.assertRegex(debugout, to_addr) 6807db96d56Sopenharmony_ci 6817db96d56Sopenharmony_ci def testSendMessageMultipleResentRaises(self): 6827db96d56Sopenharmony_ci m = email.mime.text.MIMEText('A test message') 6837db96d56Sopenharmony_ci m['From'] = 'foo@bar.com' 6847db96d56Sopenharmony_ci m['To'] = 'John' 6857db96d56Sopenharmony_ci m['CC'] = 'Sally, Fred' 6867db96d56Sopenharmony_ci m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>' 6877db96d56Sopenharmony_ci m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 6887db96d56Sopenharmony_ci m['Resent-From'] = 'holy@grail.net' 6897db96d56Sopenharmony_ci m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff' 6907db96d56Sopenharmony_ci m['Resent-Bcc'] = 'doe@losthope.net' 6917db96d56Sopenharmony_ci m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 6927db96d56Sopenharmony_ci m['Resent-To'] = 'holy@grail.net' 6937db96d56Sopenharmony_ci m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff' 6947db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 6957db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 6967db96d56Sopenharmony_ci self.addCleanup(smtp.close) 6977db96d56Sopenharmony_ci with self.assertRaises(ValueError): 6987db96d56Sopenharmony_ci smtp.send_message(m) 6997db96d56Sopenharmony_ci smtp.close() 7007db96d56Sopenharmony_ci 7017db96d56Sopenharmony_ciclass NonConnectingTests(unittest.TestCase): 7027db96d56Sopenharmony_ci 7037db96d56Sopenharmony_ci def testNotConnected(self): 7047db96d56Sopenharmony_ci # Test various operations on an unconnected SMTP object that 7057db96d56Sopenharmony_ci # should raise exceptions (at present the attempt in SMTP.send 7067db96d56Sopenharmony_ci # to reference the nonexistent 'sock' attribute of the SMTP object 7077db96d56Sopenharmony_ci # causes an AttributeError) 7087db96d56Sopenharmony_ci smtp = smtplib.SMTP() 7097db96d56Sopenharmony_ci self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 7107db96d56Sopenharmony_ci self.assertRaises(smtplib.SMTPServerDisconnected, 7117db96d56Sopenharmony_ci smtp.send, 'test msg') 7127db96d56Sopenharmony_ci 7137db96d56Sopenharmony_ci def testNonnumericPort(self): 7147db96d56Sopenharmony_ci # check that non-numeric port raises OSError 7157db96d56Sopenharmony_ci self.assertRaises(OSError, smtplib.SMTP, 7167db96d56Sopenharmony_ci "localhost", "bogus") 7177db96d56Sopenharmony_ci self.assertRaises(OSError, smtplib.SMTP, 7187db96d56Sopenharmony_ci "localhost:bogus") 7197db96d56Sopenharmony_ci 7207db96d56Sopenharmony_ci def testSockAttributeExists(self): 7217db96d56Sopenharmony_ci # check that sock attribute is present outside of a connect() call 7227db96d56Sopenharmony_ci # (regression test, the previous behavior raised an 7237db96d56Sopenharmony_ci # AttributeError: 'SMTP' object has no attribute 'sock') 7247db96d56Sopenharmony_ci with smtplib.SMTP() as smtp: 7257db96d56Sopenharmony_ci self.assertIsNone(smtp.sock) 7267db96d56Sopenharmony_ci 7277db96d56Sopenharmony_ci 7287db96d56Sopenharmony_ciclass DefaultArgumentsTests(unittest.TestCase): 7297db96d56Sopenharmony_ci 7307db96d56Sopenharmony_ci def setUp(self): 7317db96d56Sopenharmony_ci self.msg = EmailMessage() 7327db96d56Sopenharmony_ci self.msg['From'] = 'Páolo <főo@bar.com>' 7337db96d56Sopenharmony_ci self.smtp = smtplib.SMTP() 7347db96d56Sopenharmony_ci self.smtp.ehlo = Mock(return_value=(200, 'OK')) 7357db96d56Sopenharmony_ci self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock() 7367db96d56Sopenharmony_ci 7377db96d56Sopenharmony_ci def testSendMessage(self): 7387db96d56Sopenharmony_ci expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME') 7397db96d56Sopenharmony_ci self.smtp.send_message(self.msg) 7407db96d56Sopenharmony_ci self.smtp.send_message(self.msg) 7417db96d56Sopenharmony_ci self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 7427db96d56Sopenharmony_ci expected_mail_options) 7437db96d56Sopenharmony_ci self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3], 7447db96d56Sopenharmony_ci expected_mail_options) 7457db96d56Sopenharmony_ci 7467db96d56Sopenharmony_ci def testSendMessageWithMailOptions(self): 7477db96d56Sopenharmony_ci mail_options = ['STARTTLS'] 7487db96d56Sopenharmony_ci expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME') 7497db96d56Sopenharmony_ci self.smtp.send_message(self.msg, None, None, mail_options) 7507db96d56Sopenharmony_ci self.assertEqual(mail_options, ['STARTTLS']) 7517db96d56Sopenharmony_ci self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 7527db96d56Sopenharmony_ci expected_mail_options) 7537db96d56Sopenharmony_ci 7547db96d56Sopenharmony_ci 7557db96d56Sopenharmony_ci# test response of client to a non-successful HELO message 7567db96d56Sopenharmony_ciclass BadHELOServerTests(unittest.TestCase): 7577db96d56Sopenharmony_ci 7587db96d56Sopenharmony_ci def setUp(self): 7597db96d56Sopenharmony_ci smtplib.socket = mock_socket 7607db96d56Sopenharmony_ci mock_socket.reply_with(b"199 no hello for you!") 7617db96d56Sopenharmony_ci self.old_stdout = sys.stdout 7627db96d56Sopenharmony_ci self.output = io.StringIO() 7637db96d56Sopenharmony_ci sys.stdout = self.output 7647db96d56Sopenharmony_ci self.port = 25 7657db96d56Sopenharmony_ci 7667db96d56Sopenharmony_ci def tearDown(self): 7677db96d56Sopenharmony_ci smtplib.socket = socket 7687db96d56Sopenharmony_ci sys.stdout = self.old_stdout 7697db96d56Sopenharmony_ci 7707db96d56Sopenharmony_ci def testFailingHELO(self): 7717db96d56Sopenharmony_ci self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 7727db96d56Sopenharmony_ci HOST, self.port, 'localhost', 3) 7737db96d56Sopenharmony_ci 7747db96d56Sopenharmony_ci 7757db96d56Sopenharmony_ciclass TooLongLineTests(unittest.TestCase): 7767db96d56Sopenharmony_ci respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 7777db96d56Sopenharmony_ci 7787db96d56Sopenharmony_ci def setUp(self): 7797db96d56Sopenharmony_ci self.thread_key = threading_helper.threading_setup() 7807db96d56Sopenharmony_ci self.old_stdout = sys.stdout 7817db96d56Sopenharmony_ci self.output = io.StringIO() 7827db96d56Sopenharmony_ci sys.stdout = self.output 7837db96d56Sopenharmony_ci 7847db96d56Sopenharmony_ci self.evt = threading.Event() 7857db96d56Sopenharmony_ci self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 7867db96d56Sopenharmony_ci self.sock.settimeout(15) 7877db96d56Sopenharmony_ci self.port = socket_helper.bind_port(self.sock) 7887db96d56Sopenharmony_ci servargs = (self.evt, self.respdata, self.sock) 7897db96d56Sopenharmony_ci self.thread = threading.Thread(target=server, args=servargs) 7907db96d56Sopenharmony_ci self.thread.start() 7917db96d56Sopenharmony_ci self.evt.wait() 7927db96d56Sopenharmony_ci self.evt.clear() 7937db96d56Sopenharmony_ci 7947db96d56Sopenharmony_ci def tearDown(self): 7957db96d56Sopenharmony_ci self.evt.wait() 7967db96d56Sopenharmony_ci sys.stdout = self.old_stdout 7977db96d56Sopenharmony_ci threading_helper.join_thread(self.thread) 7987db96d56Sopenharmony_ci del self.thread 7997db96d56Sopenharmony_ci self.doCleanups() 8007db96d56Sopenharmony_ci threading_helper.threading_cleanup(*self.thread_key) 8017db96d56Sopenharmony_ci 8027db96d56Sopenharmony_ci def testLineTooLong(self): 8037db96d56Sopenharmony_ci self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 8047db96d56Sopenharmony_ci HOST, self.port, 'localhost', 3) 8057db96d56Sopenharmony_ci 8067db96d56Sopenharmony_ci 8077db96d56Sopenharmony_cisim_users = {'Mr.A@somewhere.com':'John A', 8087db96d56Sopenharmony_ci 'Ms.B@xn--fo-fka.com':'Sally B', 8097db96d56Sopenharmony_ci 'Mrs.C@somewhereesle.com':'Ruth C', 8107db96d56Sopenharmony_ci } 8117db96d56Sopenharmony_ci 8127db96d56Sopenharmony_cisim_auth = ('Mr.A@somewhere.com', 'somepassword') 8137db96d56Sopenharmony_cisim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 8147db96d56Sopenharmony_ci 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 8157db96d56Sopenharmony_cisim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 8167db96d56Sopenharmony_ci 'list-2':['Ms.B@xn--fo-fka.com',], 8177db96d56Sopenharmony_ci } 8187db96d56Sopenharmony_ci 8197db96d56Sopenharmony_ci# Simulated SMTP channel & server 8207db96d56Sopenharmony_ciclass ResponseException(Exception): pass 8217db96d56Sopenharmony_ciclass SimSMTPChannel(smtpd.SMTPChannel): 8227db96d56Sopenharmony_ci 8237db96d56Sopenharmony_ci quit_response = None 8247db96d56Sopenharmony_ci mail_response = None 8257db96d56Sopenharmony_ci rcpt_response = None 8267db96d56Sopenharmony_ci data_response = None 8277db96d56Sopenharmony_ci rcpt_count = 0 8287db96d56Sopenharmony_ci rset_count = 0 8297db96d56Sopenharmony_ci disconnect = 0 8307db96d56Sopenharmony_ci AUTH = 99 # Add protocol state to enable auth testing. 8317db96d56Sopenharmony_ci authenticated_user = None 8327db96d56Sopenharmony_ci 8337db96d56Sopenharmony_ci def __init__(self, extra_features, *args, **kw): 8347db96d56Sopenharmony_ci self._extrafeatures = ''.join( 8357db96d56Sopenharmony_ci [ "250-{0}\r\n".format(x) for x in extra_features ]) 8367db96d56Sopenharmony_ci super(SimSMTPChannel, self).__init__(*args, **kw) 8377db96d56Sopenharmony_ci 8387db96d56Sopenharmony_ci # AUTH related stuff. It would be nice if support for this were in smtpd. 8397db96d56Sopenharmony_ci def found_terminator(self): 8407db96d56Sopenharmony_ci if self.smtp_state == self.AUTH: 8417db96d56Sopenharmony_ci line = self._emptystring.join(self.received_lines) 8427db96d56Sopenharmony_ci print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 8437db96d56Sopenharmony_ci self.received_lines = [] 8447db96d56Sopenharmony_ci try: 8457db96d56Sopenharmony_ci self.auth_object(line) 8467db96d56Sopenharmony_ci except ResponseException as e: 8477db96d56Sopenharmony_ci self.smtp_state = self.COMMAND 8487db96d56Sopenharmony_ci self.push('%s %s' % (e.smtp_code, e.smtp_error)) 8497db96d56Sopenharmony_ci return 8507db96d56Sopenharmony_ci super().found_terminator() 8517db96d56Sopenharmony_ci 8527db96d56Sopenharmony_ci 8537db96d56Sopenharmony_ci def smtp_AUTH(self, arg): 8547db96d56Sopenharmony_ci if not self.seen_greeting: 8557db96d56Sopenharmony_ci self.push('503 Error: send EHLO first') 8567db96d56Sopenharmony_ci return 8577db96d56Sopenharmony_ci if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 8587db96d56Sopenharmony_ci self.push('500 Error: command "AUTH" not recognized') 8597db96d56Sopenharmony_ci return 8607db96d56Sopenharmony_ci if self.authenticated_user is not None: 8617db96d56Sopenharmony_ci self.push( 8627db96d56Sopenharmony_ci '503 Bad sequence of commands: already authenticated') 8637db96d56Sopenharmony_ci return 8647db96d56Sopenharmony_ci args = arg.split() 8657db96d56Sopenharmony_ci if len(args) not in [1, 2]: 8667db96d56Sopenharmony_ci self.push('501 Syntax: AUTH <mechanism> [initial-response]') 8677db96d56Sopenharmony_ci return 8687db96d56Sopenharmony_ci auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 8697db96d56Sopenharmony_ci try: 8707db96d56Sopenharmony_ci self.auth_object = getattr(self, auth_object_name) 8717db96d56Sopenharmony_ci except AttributeError: 8727db96d56Sopenharmony_ci self.push('504 Command parameter not implemented: unsupported ' 8737db96d56Sopenharmony_ci ' authentication mechanism {!r}'.format(auth_object_name)) 8747db96d56Sopenharmony_ci return 8757db96d56Sopenharmony_ci self.smtp_state = self.AUTH 8767db96d56Sopenharmony_ci self.auth_object(args[1] if len(args) == 2 else None) 8777db96d56Sopenharmony_ci 8787db96d56Sopenharmony_ci def _authenticated(self, user, valid): 8797db96d56Sopenharmony_ci if valid: 8807db96d56Sopenharmony_ci self.authenticated_user = user 8817db96d56Sopenharmony_ci self.push('235 Authentication Succeeded') 8827db96d56Sopenharmony_ci else: 8837db96d56Sopenharmony_ci self.push('535 Authentication credentials invalid') 8847db96d56Sopenharmony_ci self.smtp_state = self.COMMAND 8857db96d56Sopenharmony_ci 8867db96d56Sopenharmony_ci def _decode_base64(self, string): 8877db96d56Sopenharmony_ci return base64.decodebytes(string.encode('ascii')).decode('utf-8') 8887db96d56Sopenharmony_ci 8897db96d56Sopenharmony_ci def _auth_plain(self, arg=None): 8907db96d56Sopenharmony_ci if arg is None: 8917db96d56Sopenharmony_ci self.push('334 ') 8927db96d56Sopenharmony_ci else: 8937db96d56Sopenharmony_ci logpass = self._decode_base64(arg) 8947db96d56Sopenharmony_ci try: 8957db96d56Sopenharmony_ci *_, user, password = logpass.split('\0') 8967db96d56Sopenharmony_ci except ValueError as e: 8977db96d56Sopenharmony_ci self.push('535 Splitting response {!r} into user and password' 8987db96d56Sopenharmony_ci ' failed: {}'.format(logpass, e)) 8997db96d56Sopenharmony_ci return 9007db96d56Sopenharmony_ci self._authenticated(user, password == sim_auth[1]) 9017db96d56Sopenharmony_ci 9027db96d56Sopenharmony_ci def _auth_login(self, arg=None): 9037db96d56Sopenharmony_ci if arg is None: 9047db96d56Sopenharmony_ci # base64 encoded 'Username:' 9057db96d56Sopenharmony_ci self.push('334 VXNlcm5hbWU6') 9067db96d56Sopenharmony_ci elif not hasattr(self, '_auth_login_user'): 9077db96d56Sopenharmony_ci self._auth_login_user = self._decode_base64(arg) 9087db96d56Sopenharmony_ci # base64 encoded 'Password:' 9097db96d56Sopenharmony_ci self.push('334 UGFzc3dvcmQ6') 9107db96d56Sopenharmony_ci else: 9117db96d56Sopenharmony_ci password = self._decode_base64(arg) 9127db96d56Sopenharmony_ci self._authenticated(self._auth_login_user, password == sim_auth[1]) 9137db96d56Sopenharmony_ci del self._auth_login_user 9147db96d56Sopenharmony_ci 9157db96d56Sopenharmony_ci def _auth_buggy(self, arg=None): 9167db96d56Sopenharmony_ci # This AUTH mechanism will 'trap' client in a neverending 334 9177db96d56Sopenharmony_ci # base64 encoded 'BuGgYbUgGy' 9187db96d56Sopenharmony_ci self.push('334 QnVHZ1liVWdHeQ==') 9197db96d56Sopenharmony_ci 9207db96d56Sopenharmony_ci def _auth_cram_md5(self, arg=None): 9217db96d56Sopenharmony_ci if arg is None: 9227db96d56Sopenharmony_ci self.push('334 {}'.format(sim_cram_md5_challenge)) 9237db96d56Sopenharmony_ci else: 9247db96d56Sopenharmony_ci logpass = self._decode_base64(arg) 9257db96d56Sopenharmony_ci try: 9267db96d56Sopenharmony_ci user, hashed_pass = logpass.split() 9277db96d56Sopenharmony_ci except ValueError as e: 9287db96d56Sopenharmony_ci self.push('535 Splitting response {!r} into user and password ' 9297db96d56Sopenharmony_ci 'failed: {}'.format(logpass, e)) 9307db96d56Sopenharmony_ci return False 9317db96d56Sopenharmony_ci valid_hashed_pass = hmac.HMAC( 9327db96d56Sopenharmony_ci sim_auth[1].encode('ascii'), 9337db96d56Sopenharmony_ci self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 9347db96d56Sopenharmony_ci 'md5').hexdigest() 9357db96d56Sopenharmony_ci self._authenticated(user, hashed_pass == valid_hashed_pass) 9367db96d56Sopenharmony_ci # end AUTH related stuff. 9377db96d56Sopenharmony_ci 9387db96d56Sopenharmony_ci def smtp_EHLO(self, arg): 9397db96d56Sopenharmony_ci resp = ('250-testhost\r\n' 9407db96d56Sopenharmony_ci '250-EXPN\r\n' 9417db96d56Sopenharmony_ci '250-SIZE 20000000\r\n' 9427db96d56Sopenharmony_ci '250-STARTTLS\r\n' 9437db96d56Sopenharmony_ci '250-DELIVERBY\r\n') 9447db96d56Sopenharmony_ci resp = resp + self._extrafeatures + '250 HELP' 9457db96d56Sopenharmony_ci self.push(resp) 9467db96d56Sopenharmony_ci self.seen_greeting = arg 9477db96d56Sopenharmony_ci self.extended_smtp = True 9487db96d56Sopenharmony_ci 9497db96d56Sopenharmony_ci def smtp_VRFY(self, arg): 9507db96d56Sopenharmony_ci # For max compatibility smtplib should be sending the raw address. 9517db96d56Sopenharmony_ci if arg in sim_users: 9527db96d56Sopenharmony_ci self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 9537db96d56Sopenharmony_ci else: 9547db96d56Sopenharmony_ci self.push('550 No such user: %s' % arg) 9557db96d56Sopenharmony_ci 9567db96d56Sopenharmony_ci def smtp_EXPN(self, arg): 9577db96d56Sopenharmony_ci list_name = arg.lower() 9587db96d56Sopenharmony_ci if list_name in sim_lists: 9597db96d56Sopenharmony_ci user_list = sim_lists[list_name] 9607db96d56Sopenharmony_ci for n, user_email in enumerate(user_list): 9617db96d56Sopenharmony_ci quoted_addr = smtplib.quoteaddr(user_email) 9627db96d56Sopenharmony_ci if n < len(user_list) - 1: 9637db96d56Sopenharmony_ci self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 9647db96d56Sopenharmony_ci else: 9657db96d56Sopenharmony_ci self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 9667db96d56Sopenharmony_ci else: 9677db96d56Sopenharmony_ci self.push('550 No access for you!') 9687db96d56Sopenharmony_ci 9697db96d56Sopenharmony_ci def smtp_QUIT(self, arg): 9707db96d56Sopenharmony_ci if self.quit_response is None: 9717db96d56Sopenharmony_ci super(SimSMTPChannel, self).smtp_QUIT(arg) 9727db96d56Sopenharmony_ci else: 9737db96d56Sopenharmony_ci self.push(self.quit_response) 9747db96d56Sopenharmony_ci self.close_when_done() 9757db96d56Sopenharmony_ci 9767db96d56Sopenharmony_ci def smtp_MAIL(self, arg): 9777db96d56Sopenharmony_ci if self.mail_response is None: 9787db96d56Sopenharmony_ci super().smtp_MAIL(arg) 9797db96d56Sopenharmony_ci else: 9807db96d56Sopenharmony_ci self.push(self.mail_response) 9817db96d56Sopenharmony_ci if self.disconnect: 9827db96d56Sopenharmony_ci self.close_when_done() 9837db96d56Sopenharmony_ci 9847db96d56Sopenharmony_ci def smtp_RCPT(self, arg): 9857db96d56Sopenharmony_ci if self.rcpt_response is None: 9867db96d56Sopenharmony_ci super().smtp_RCPT(arg) 9877db96d56Sopenharmony_ci return 9887db96d56Sopenharmony_ci self.rcpt_count += 1 9897db96d56Sopenharmony_ci self.push(self.rcpt_response[self.rcpt_count-1]) 9907db96d56Sopenharmony_ci 9917db96d56Sopenharmony_ci def smtp_RSET(self, arg): 9927db96d56Sopenharmony_ci self.rset_count += 1 9937db96d56Sopenharmony_ci super().smtp_RSET(arg) 9947db96d56Sopenharmony_ci 9957db96d56Sopenharmony_ci def smtp_DATA(self, arg): 9967db96d56Sopenharmony_ci if self.data_response is None: 9977db96d56Sopenharmony_ci super().smtp_DATA(arg) 9987db96d56Sopenharmony_ci else: 9997db96d56Sopenharmony_ci self.push(self.data_response) 10007db96d56Sopenharmony_ci 10017db96d56Sopenharmony_ci def handle_error(self): 10027db96d56Sopenharmony_ci raise 10037db96d56Sopenharmony_ci 10047db96d56Sopenharmony_ci 10057db96d56Sopenharmony_ciclass SimSMTPServer(smtpd.SMTPServer): 10067db96d56Sopenharmony_ci 10077db96d56Sopenharmony_ci channel_class = SimSMTPChannel 10087db96d56Sopenharmony_ci 10097db96d56Sopenharmony_ci def __init__(self, *args, **kw): 10107db96d56Sopenharmony_ci self._extra_features = [] 10117db96d56Sopenharmony_ci self._addresses = {} 10127db96d56Sopenharmony_ci smtpd.SMTPServer.__init__(self, *args, **kw) 10137db96d56Sopenharmony_ci 10147db96d56Sopenharmony_ci def handle_accepted(self, conn, addr): 10157db96d56Sopenharmony_ci self._SMTPchannel = self.channel_class( 10167db96d56Sopenharmony_ci self._extra_features, self, conn, addr, 10177db96d56Sopenharmony_ci decode_data=self._decode_data) 10187db96d56Sopenharmony_ci 10197db96d56Sopenharmony_ci def process_message(self, peer, mailfrom, rcpttos, data): 10207db96d56Sopenharmony_ci self._addresses['from'] = mailfrom 10217db96d56Sopenharmony_ci self._addresses['tos'] = rcpttos 10227db96d56Sopenharmony_ci 10237db96d56Sopenharmony_ci def add_feature(self, feature): 10247db96d56Sopenharmony_ci self._extra_features.append(feature) 10257db96d56Sopenharmony_ci 10267db96d56Sopenharmony_ci def handle_error(self): 10277db96d56Sopenharmony_ci raise 10287db96d56Sopenharmony_ci 10297db96d56Sopenharmony_ci 10307db96d56Sopenharmony_ci# Test various SMTP & ESMTP commands/behaviors that require a simulated server 10317db96d56Sopenharmony_ci# (i.e., something with more features than DebuggingServer) 10327db96d56Sopenharmony_ciclass SMTPSimTests(unittest.TestCase): 10337db96d56Sopenharmony_ci 10347db96d56Sopenharmony_ci def setUp(self): 10357db96d56Sopenharmony_ci self.thread_key = threading_helper.threading_setup() 10367db96d56Sopenharmony_ci self.real_getfqdn = socket.getfqdn 10377db96d56Sopenharmony_ci socket.getfqdn = mock_socket.getfqdn 10387db96d56Sopenharmony_ci self.serv_evt = threading.Event() 10397db96d56Sopenharmony_ci self.client_evt = threading.Event() 10407db96d56Sopenharmony_ci # Pick a random unused port by passing 0 for the port number 10417db96d56Sopenharmony_ci self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 10427db96d56Sopenharmony_ci # Keep a note of what port was assigned 10437db96d56Sopenharmony_ci self.port = self.serv.socket.getsockname()[1] 10447db96d56Sopenharmony_ci serv_args = (self.serv, self.serv_evt, self.client_evt) 10457db96d56Sopenharmony_ci self.thread = threading.Thread(target=debugging_server, args=serv_args) 10467db96d56Sopenharmony_ci self.thread.start() 10477db96d56Sopenharmony_ci 10487db96d56Sopenharmony_ci # wait until server thread has assigned a port number 10497db96d56Sopenharmony_ci self.serv_evt.wait() 10507db96d56Sopenharmony_ci self.serv_evt.clear() 10517db96d56Sopenharmony_ci 10527db96d56Sopenharmony_ci def tearDown(self): 10537db96d56Sopenharmony_ci socket.getfqdn = self.real_getfqdn 10547db96d56Sopenharmony_ci # indicate that the client is finished 10557db96d56Sopenharmony_ci self.client_evt.set() 10567db96d56Sopenharmony_ci # wait for the server thread to terminate 10577db96d56Sopenharmony_ci self.serv_evt.wait() 10587db96d56Sopenharmony_ci threading_helper.join_thread(self.thread) 10597db96d56Sopenharmony_ci del self.thread 10607db96d56Sopenharmony_ci self.doCleanups() 10617db96d56Sopenharmony_ci threading_helper.threading_cleanup(*self.thread_key) 10627db96d56Sopenharmony_ci 10637db96d56Sopenharmony_ci def testBasic(self): 10647db96d56Sopenharmony_ci # smoke test 10657db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 10667db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 10677db96d56Sopenharmony_ci smtp.quit() 10687db96d56Sopenharmony_ci 10697db96d56Sopenharmony_ci def testEHLO(self): 10707db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 10717db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 10727db96d56Sopenharmony_ci 10737db96d56Sopenharmony_ci # no features should be present before the EHLO 10747db96d56Sopenharmony_ci self.assertEqual(smtp.esmtp_features, {}) 10757db96d56Sopenharmony_ci 10767db96d56Sopenharmony_ci # features expected from the test server 10777db96d56Sopenharmony_ci expected_features = {'expn':'', 10787db96d56Sopenharmony_ci 'size': '20000000', 10797db96d56Sopenharmony_ci 'starttls': '', 10807db96d56Sopenharmony_ci 'deliverby': '', 10817db96d56Sopenharmony_ci 'help': '', 10827db96d56Sopenharmony_ci } 10837db96d56Sopenharmony_ci 10847db96d56Sopenharmony_ci smtp.ehlo() 10857db96d56Sopenharmony_ci self.assertEqual(smtp.esmtp_features, expected_features) 10867db96d56Sopenharmony_ci for k in expected_features: 10877db96d56Sopenharmony_ci self.assertTrue(smtp.has_extn(k)) 10887db96d56Sopenharmony_ci self.assertFalse(smtp.has_extn('unsupported-feature')) 10897db96d56Sopenharmony_ci smtp.quit() 10907db96d56Sopenharmony_ci 10917db96d56Sopenharmony_ci def testVRFY(self): 10927db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 10937db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 10947db96d56Sopenharmony_ci 10957db96d56Sopenharmony_ci for addr_spec, name in sim_users.items(): 10967db96d56Sopenharmony_ci expected_known = (250, bytes('%s %s' % 10977db96d56Sopenharmony_ci (name, smtplib.quoteaddr(addr_spec)), 10987db96d56Sopenharmony_ci "ascii")) 10997db96d56Sopenharmony_ci self.assertEqual(smtp.vrfy(addr_spec), expected_known) 11007db96d56Sopenharmony_ci 11017db96d56Sopenharmony_ci u = 'nobody@nowhere.com' 11027db96d56Sopenharmony_ci expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 11037db96d56Sopenharmony_ci self.assertEqual(smtp.vrfy(u), expected_unknown) 11047db96d56Sopenharmony_ci smtp.quit() 11057db96d56Sopenharmony_ci 11067db96d56Sopenharmony_ci def testEXPN(self): 11077db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 11087db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 11097db96d56Sopenharmony_ci 11107db96d56Sopenharmony_ci for listname, members in sim_lists.items(): 11117db96d56Sopenharmony_ci users = [] 11127db96d56Sopenharmony_ci for m in members: 11137db96d56Sopenharmony_ci users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 11147db96d56Sopenharmony_ci expected_known = (250, bytes('\n'.join(users), "ascii")) 11157db96d56Sopenharmony_ci self.assertEqual(smtp.expn(listname), expected_known) 11167db96d56Sopenharmony_ci 11177db96d56Sopenharmony_ci u = 'PSU-Members-List' 11187db96d56Sopenharmony_ci expected_unknown = (550, b'No access for you!') 11197db96d56Sopenharmony_ci self.assertEqual(smtp.expn(u), expected_unknown) 11207db96d56Sopenharmony_ci smtp.quit() 11217db96d56Sopenharmony_ci 11227db96d56Sopenharmony_ci def testAUTH_PLAIN(self): 11237db96d56Sopenharmony_ci self.serv.add_feature("AUTH PLAIN") 11247db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 11257db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 11267db96d56Sopenharmony_ci resp = smtp.login(sim_auth[0], sim_auth[1]) 11277db96d56Sopenharmony_ci self.assertEqual(resp, (235, b'Authentication Succeeded')) 11287db96d56Sopenharmony_ci smtp.close() 11297db96d56Sopenharmony_ci 11307db96d56Sopenharmony_ci def testAUTH_LOGIN(self): 11317db96d56Sopenharmony_ci self.serv.add_feature("AUTH LOGIN") 11327db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 11337db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 11347db96d56Sopenharmony_ci resp = smtp.login(sim_auth[0], sim_auth[1]) 11357db96d56Sopenharmony_ci self.assertEqual(resp, (235, b'Authentication Succeeded')) 11367db96d56Sopenharmony_ci smtp.close() 11377db96d56Sopenharmony_ci 11387db96d56Sopenharmony_ci def testAUTH_LOGIN_initial_response_ok(self): 11397db96d56Sopenharmony_ci self.serv.add_feature("AUTH LOGIN") 11407db96d56Sopenharmony_ci with smtplib.SMTP(HOST, self.port, local_hostname='localhost', 11417db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) as smtp: 11427db96d56Sopenharmony_ci smtp.user, smtp.password = sim_auth 11437db96d56Sopenharmony_ci smtp.ehlo("test_auth_login") 11447db96d56Sopenharmony_ci resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True) 11457db96d56Sopenharmony_ci self.assertEqual(resp, (235, b'Authentication Succeeded')) 11467db96d56Sopenharmony_ci 11477db96d56Sopenharmony_ci def testAUTH_LOGIN_initial_response_notok(self): 11487db96d56Sopenharmony_ci self.serv.add_feature("AUTH LOGIN") 11497db96d56Sopenharmony_ci with smtplib.SMTP(HOST, self.port, local_hostname='localhost', 11507db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) as smtp: 11517db96d56Sopenharmony_ci smtp.user, smtp.password = sim_auth 11527db96d56Sopenharmony_ci smtp.ehlo("test_auth_login") 11537db96d56Sopenharmony_ci resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False) 11547db96d56Sopenharmony_ci self.assertEqual(resp, (235, b'Authentication Succeeded')) 11557db96d56Sopenharmony_ci 11567db96d56Sopenharmony_ci def testAUTH_BUGGY(self): 11577db96d56Sopenharmony_ci self.serv.add_feature("AUTH BUGGY") 11587db96d56Sopenharmony_ci 11597db96d56Sopenharmony_ci def auth_buggy(challenge=None): 11607db96d56Sopenharmony_ci self.assertEqual(b"BuGgYbUgGy", challenge) 11617db96d56Sopenharmony_ci return "\0" 11627db96d56Sopenharmony_ci 11637db96d56Sopenharmony_ci smtp = smtplib.SMTP( 11647db96d56Sopenharmony_ci HOST, self.port, local_hostname='localhost', 11657db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT 11667db96d56Sopenharmony_ci ) 11677db96d56Sopenharmony_ci try: 11687db96d56Sopenharmony_ci smtp.user, smtp.password = sim_auth 11697db96d56Sopenharmony_ci smtp.ehlo("test_auth_buggy") 11707db96d56Sopenharmony_ci expect = r"^Server AUTH mechanism infinite loop.*" 11717db96d56Sopenharmony_ci with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm: 11727db96d56Sopenharmony_ci smtp.auth("BUGGY", auth_buggy, initial_response_ok=False) 11737db96d56Sopenharmony_ci finally: 11747db96d56Sopenharmony_ci smtp.close() 11757db96d56Sopenharmony_ci 11767db96d56Sopenharmony_ci @hashlib_helper.requires_hashdigest('md5', openssl=True) 11777db96d56Sopenharmony_ci def testAUTH_CRAM_MD5(self): 11787db96d56Sopenharmony_ci self.serv.add_feature("AUTH CRAM-MD5") 11797db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 11807db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 11817db96d56Sopenharmony_ci resp = smtp.login(sim_auth[0], sim_auth[1]) 11827db96d56Sopenharmony_ci self.assertEqual(resp, (235, b'Authentication Succeeded')) 11837db96d56Sopenharmony_ci smtp.close() 11847db96d56Sopenharmony_ci 11857db96d56Sopenharmony_ci @hashlib_helper.requires_hashdigest('md5', openssl=True) 11867db96d56Sopenharmony_ci def testAUTH_multiple(self): 11877db96d56Sopenharmony_ci # Test that multiple authentication methods are tried. 11887db96d56Sopenharmony_ci self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 11897db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 11907db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 11917db96d56Sopenharmony_ci resp = smtp.login(sim_auth[0], sim_auth[1]) 11927db96d56Sopenharmony_ci self.assertEqual(resp, (235, b'Authentication Succeeded')) 11937db96d56Sopenharmony_ci smtp.close() 11947db96d56Sopenharmony_ci 11957db96d56Sopenharmony_ci def test_auth_function(self): 11967db96d56Sopenharmony_ci supported = {'PLAIN', 'LOGIN'} 11977db96d56Sopenharmony_ci try: 11987db96d56Sopenharmony_ci hashlib.md5() 11997db96d56Sopenharmony_ci except ValueError: 12007db96d56Sopenharmony_ci pass 12017db96d56Sopenharmony_ci else: 12027db96d56Sopenharmony_ci supported.add('CRAM-MD5') 12037db96d56Sopenharmony_ci for mechanism in supported: 12047db96d56Sopenharmony_ci self.serv.add_feature("AUTH {}".format(mechanism)) 12057db96d56Sopenharmony_ci for mechanism in supported: 12067db96d56Sopenharmony_ci with self.subTest(mechanism=mechanism): 12077db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, 12087db96d56Sopenharmony_ci local_hostname='localhost', 12097db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 12107db96d56Sopenharmony_ci smtp.ehlo('foo') 12117db96d56Sopenharmony_ci smtp.user, smtp.password = sim_auth[0], sim_auth[1] 12127db96d56Sopenharmony_ci method = 'auth_' + mechanism.lower().replace('-', '_') 12137db96d56Sopenharmony_ci resp = smtp.auth(mechanism, getattr(smtp, method)) 12147db96d56Sopenharmony_ci self.assertEqual(resp, (235, b'Authentication Succeeded')) 12157db96d56Sopenharmony_ci smtp.close() 12167db96d56Sopenharmony_ci 12177db96d56Sopenharmony_ci def test_quit_resets_greeting(self): 12187db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, 12197db96d56Sopenharmony_ci local_hostname='localhost', 12207db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 12217db96d56Sopenharmony_ci code, message = smtp.ehlo() 12227db96d56Sopenharmony_ci self.assertEqual(code, 250) 12237db96d56Sopenharmony_ci self.assertIn('size', smtp.esmtp_features) 12247db96d56Sopenharmony_ci smtp.quit() 12257db96d56Sopenharmony_ci self.assertNotIn('size', smtp.esmtp_features) 12267db96d56Sopenharmony_ci smtp.connect(HOST, self.port) 12277db96d56Sopenharmony_ci self.assertNotIn('size', smtp.esmtp_features) 12287db96d56Sopenharmony_ci smtp.ehlo_or_helo_if_needed() 12297db96d56Sopenharmony_ci self.assertIn('size', smtp.esmtp_features) 12307db96d56Sopenharmony_ci smtp.quit() 12317db96d56Sopenharmony_ci 12327db96d56Sopenharmony_ci def test_with_statement(self): 12337db96d56Sopenharmony_ci with smtplib.SMTP(HOST, self.port) as smtp: 12347db96d56Sopenharmony_ci code, message = smtp.noop() 12357db96d56Sopenharmony_ci self.assertEqual(code, 250) 12367db96d56Sopenharmony_ci self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 12377db96d56Sopenharmony_ci with smtplib.SMTP(HOST, self.port) as smtp: 12387db96d56Sopenharmony_ci smtp.close() 12397db96d56Sopenharmony_ci self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 12407db96d56Sopenharmony_ci 12417db96d56Sopenharmony_ci def test_with_statement_QUIT_failure(self): 12427db96d56Sopenharmony_ci with self.assertRaises(smtplib.SMTPResponseException) as error: 12437db96d56Sopenharmony_ci with smtplib.SMTP(HOST, self.port) as smtp: 12447db96d56Sopenharmony_ci smtp.noop() 12457db96d56Sopenharmony_ci self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 12467db96d56Sopenharmony_ci self.assertEqual(error.exception.smtp_code, 421) 12477db96d56Sopenharmony_ci self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 12487db96d56Sopenharmony_ci 12497db96d56Sopenharmony_ci #TODO: add tests for correct AUTH method fallback now that the 12507db96d56Sopenharmony_ci #test infrastructure can support it. 12517db96d56Sopenharmony_ci 12527db96d56Sopenharmony_ci # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 12537db96d56Sopenharmony_ci def test__rest_from_mail_cmd(self): 12547db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 12557db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 12567db96d56Sopenharmony_ci smtp.noop() 12577db96d56Sopenharmony_ci self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 12587db96d56Sopenharmony_ci self.serv._SMTPchannel.disconnect = True 12597db96d56Sopenharmony_ci with self.assertRaises(smtplib.SMTPSenderRefused): 12607db96d56Sopenharmony_ci smtp.sendmail('John', 'Sally', 'test message') 12617db96d56Sopenharmony_ci self.assertIsNone(smtp.sock) 12627db96d56Sopenharmony_ci 12637db96d56Sopenharmony_ci # Issue 5713: make sure close, not rset, is called if we get a 421 error 12647db96d56Sopenharmony_ci def test_421_from_mail_cmd(self): 12657db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 12667db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 12677db96d56Sopenharmony_ci smtp.noop() 12687db96d56Sopenharmony_ci self.serv._SMTPchannel.mail_response = '421 closing connection' 12697db96d56Sopenharmony_ci with self.assertRaises(smtplib.SMTPSenderRefused): 12707db96d56Sopenharmony_ci smtp.sendmail('John', 'Sally', 'test message') 12717db96d56Sopenharmony_ci self.assertIsNone(smtp.sock) 12727db96d56Sopenharmony_ci self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 12737db96d56Sopenharmony_ci 12747db96d56Sopenharmony_ci def test_421_from_rcpt_cmd(self): 12757db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 12767db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 12777db96d56Sopenharmony_ci smtp.noop() 12787db96d56Sopenharmony_ci self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 12797db96d56Sopenharmony_ci with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 12807db96d56Sopenharmony_ci smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 12817db96d56Sopenharmony_ci self.assertIsNone(smtp.sock) 12827db96d56Sopenharmony_ci self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 12837db96d56Sopenharmony_ci self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 12847db96d56Sopenharmony_ci 12857db96d56Sopenharmony_ci def test_421_from_data_cmd(self): 12867db96d56Sopenharmony_ci class MySimSMTPChannel(SimSMTPChannel): 12877db96d56Sopenharmony_ci def found_terminator(self): 12887db96d56Sopenharmony_ci if self.smtp_state == self.DATA: 12897db96d56Sopenharmony_ci self.push('421 closing') 12907db96d56Sopenharmony_ci else: 12917db96d56Sopenharmony_ci super().found_terminator() 12927db96d56Sopenharmony_ci self.serv.channel_class = MySimSMTPChannel 12937db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 12947db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 12957db96d56Sopenharmony_ci smtp.noop() 12967db96d56Sopenharmony_ci with self.assertRaises(smtplib.SMTPDataError): 12977db96d56Sopenharmony_ci smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message') 12987db96d56Sopenharmony_ci self.assertIsNone(smtp.sock) 12997db96d56Sopenharmony_ci self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 13007db96d56Sopenharmony_ci 13017db96d56Sopenharmony_ci def test_smtputf8_NotSupportedError_if_no_server_support(self): 13027db96d56Sopenharmony_ci smtp = smtplib.SMTP( 13037db96d56Sopenharmony_ci HOST, self.port, local_hostname='localhost', 13047db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 13057db96d56Sopenharmony_ci self.addCleanup(smtp.close) 13067db96d56Sopenharmony_ci smtp.ehlo() 13077db96d56Sopenharmony_ci self.assertTrue(smtp.does_esmtp) 13087db96d56Sopenharmony_ci self.assertFalse(smtp.has_extn('smtputf8')) 13097db96d56Sopenharmony_ci self.assertRaises( 13107db96d56Sopenharmony_ci smtplib.SMTPNotSupportedError, 13117db96d56Sopenharmony_ci smtp.sendmail, 13127db96d56Sopenharmony_ci 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 13137db96d56Sopenharmony_ci self.assertRaises( 13147db96d56Sopenharmony_ci smtplib.SMTPNotSupportedError, 13157db96d56Sopenharmony_ci smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 13167db96d56Sopenharmony_ci 13177db96d56Sopenharmony_ci def test_send_unicode_without_SMTPUTF8(self): 13187db96d56Sopenharmony_ci smtp = smtplib.SMTP( 13197db96d56Sopenharmony_ci HOST, self.port, local_hostname='localhost', 13207db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 13217db96d56Sopenharmony_ci self.addCleanup(smtp.close) 13227db96d56Sopenharmony_ci self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') 13237db96d56Sopenharmony_ci self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') 13247db96d56Sopenharmony_ci 13257db96d56Sopenharmony_ci def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 13267db96d56Sopenharmony_ci # This test is located here and not in the SMTPUTF8SimTests 13277db96d56Sopenharmony_ci # class because it needs a "regular" SMTP server to work 13287db96d56Sopenharmony_ci msg = EmailMessage() 13297db96d56Sopenharmony_ci msg['From'] = "Páolo <főo@bar.com>" 13307db96d56Sopenharmony_ci msg['To'] = 'Dinsdale' 13317db96d56Sopenharmony_ci msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 13327db96d56Sopenharmony_ci smtp = smtplib.SMTP( 13337db96d56Sopenharmony_ci HOST, self.port, local_hostname='localhost', 13347db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 13357db96d56Sopenharmony_ci self.addCleanup(smtp.close) 13367db96d56Sopenharmony_ci with self.assertRaises(smtplib.SMTPNotSupportedError): 13377db96d56Sopenharmony_ci smtp.send_message(msg) 13387db96d56Sopenharmony_ci 13397db96d56Sopenharmony_ci def test_name_field_not_included_in_envelop_addresses(self): 13407db96d56Sopenharmony_ci smtp = smtplib.SMTP( 13417db96d56Sopenharmony_ci HOST, self.port, local_hostname='localhost', 13427db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 13437db96d56Sopenharmony_ci self.addCleanup(smtp.close) 13447db96d56Sopenharmony_ci 13457db96d56Sopenharmony_ci message = EmailMessage() 13467db96d56Sopenharmony_ci message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com')) 13477db96d56Sopenharmony_ci message['To'] = email.utils.formataddr(('René', 'rene@example.com')) 13487db96d56Sopenharmony_ci 13497db96d56Sopenharmony_ci self.assertDictEqual(smtp.send_message(message), {}) 13507db96d56Sopenharmony_ci 13517db96d56Sopenharmony_ci self.assertEqual(self.serv._addresses['from'], 'michael@example.com') 13527db96d56Sopenharmony_ci self.assertEqual(self.serv._addresses['tos'], ['rene@example.com']) 13537db96d56Sopenharmony_ci 13547db96d56Sopenharmony_ci 13557db96d56Sopenharmony_ciclass SimSMTPUTF8Server(SimSMTPServer): 13567db96d56Sopenharmony_ci 13577db96d56Sopenharmony_ci def __init__(self, *args, **kw): 13587db96d56Sopenharmony_ci # The base SMTP server turns these on automatically, but our test 13597db96d56Sopenharmony_ci # server is set up to munge the EHLO response, so we need to provide 13607db96d56Sopenharmony_ci # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 13617db96d56Sopenharmony_ci self._extra_features = ['SMTPUTF8', '8BITMIME'] 13627db96d56Sopenharmony_ci smtpd.SMTPServer.__init__(self, *args, **kw) 13637db96d56Sopenharmony_ci 13647db96d56Sopenharmony_ci def handle_accepted(self, conn, addr): 13657db96d56Sopenharmony_ci self._SMTPchannel = self.channel_class( 13667db96d56Sopenharmony_ci self._extra_features, self, conn, addr, 13677db96d56Sopenharmony_ci decode_data=self._decode_data, 13687db96d56Sopenharmony_ci enable_SMTPUTF8=self.enable_SMTPUTF8, 13697db96d56Sopenharmony_ci ) 13707db96d56Sopenharmony_ci 13717db96d56Sopenharmony_ci def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 13727db96d56Sopenharmony_ci rcpt_options=None): 13737db96d56Sopenharmony_ci self.last_peer = peer 13747db96d56Sopenharmony_ci self.last_mailfrom = mailfrom 13757db96d56Sopenharmony_ci self.last_rcpttos = rcpttos 13767db96d56Sopenharmony_ci self.last_message = data 13777db96d56Sopenharmony_ci self.last_mail_options = mail_options 13787db96d56Sopenharmony_ci self.last_rcpt_options = rcpt_options 13797db96d56Sopenharmony_ci 13807db96d56Sopenharmony_ci 13817db96d56Sopenharmony_ciclass SMTPUTF8SimTests(unittest.TestCase): 13827db96d56Sopenharmony_ci 13837db96d56Sopenharmony_ci maxDiff = None 13847db96d56Sopenharmony_ci 13857db96d56Sopenharmony_ci def setUp(self): 13867db96d56Sopenharmony_ci self.thread_key = threading_helper.threading_setup() 13877db96d56Sopenharmony_ci self.real_getfqdn = socket.getfqdn 13887db96d56Sopenharmony_ci socket.getfqdn = mock_socket.getfqdn 13897db96d56Sopenharmony_ci self.serv_evt = threading.Event() 13907db96d56Sopenharmony_ci self.client_evt = threading.Event() 13917db96d56Sopenharmony_ci # Pick a random unused port by passing 0 for the port number 13927db96d56Sopenharmony_ci self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 13937db96d56Sopenharmony_ci decode_data=False, 13947db96d56Sopenharmony_ci enable_SMTPUTF8=True) 13957db96d56Sopenharmony_ci # Keep a note of what port was assigned 13967db96d56Sopenharmony_ci self.port = self.serv.socket.getsockname()[1] 13977db96d56Sopenharmony_ci serv_args = (self.serv, self.serv_evt, self.client_evt) 13987db96d56Sopenharmony_ci self.thread = threading.Thread(target=debugging_server, args=serv_args) 13997db96d56Sopenharmony_ci self.thread.start() 14007db96d56Sopenharmony_ci 14017db96d56Sopenharmony_ci # wait until server thread has assigned a port number 14027db96d56Sopenharmony_ci self.serv_evt.wait() 14037db96d56Sopenharmony_ci self.serv_evt.clear() 14047db96d56Sopenharmony_ci 14057db96d56Sopenharmony_ci def tearDown(self): 14067db96d56Sopenharmony_ci socket.getfqdn = self.real_getfqdn 14077db96d56Sopenharmony_ci # indicate that the client is finished 14087db96d56Sopenharmony_ci self.client_evt.set() 14097db96d56Sopenharmony_ci # wait for the server thread to terminate 14107db96d56Sopenharmony_ci self.serv_evt.wait() 14117db96d56Sopenharmony_ci threading_helper.join_thread(self.thread) 14127db96d56Sopenharmony_ci del self.thread 14137db96d56Sopenharmony_ci self.doCleanups() 14147db96d56Sopenharmony_ci threading_helper.threading_cleanup(*self.thread_key) 14157db96d56Sopenharmony_ci 14167db96d56Sopenharmony_ci def test_test_server_supports_extensions(self): 14177db96d56Sopenharmony_ci smtp = smtplib.SMTP( 14187db96d56Sopenharmony_ci HOST, self.port, local_hostname='localhost', 14197db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 14207db96d56Sopenharmony_ci self.addCleanup(smtp.close) 14217db96d56Sopenharmony_ci smtp.ehlo() 14227db96d56Sopenharmony_ci self.assertTrue(smtp.does_esmtp) 14237db96d56Sopenharmony_ci self.assertTrue(smtp.has_extn('smtputf8')) 14247db96d56Sopenharmony_ci 14257db96d56Sopenharmony_ci def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 14267db96d56Sopenharmony_ci m = '¡a test message containing unicode!'.encode('utf-8') 14277db96d56Sopenharmony_ci smtp = smtplib.SMTP( 14287db96d56Sopenharmony_ci HOST, self.port, local_hostname='localhost', 14297db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 14307db96d56Sopenharmony_ci self.addCleanup(smtp.close) 14317db96d56Sopenharmony_ci smtp.sendmail('Jőhn', 'Sálly', m, 14327db96d56Sopenharmony_ci mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 14337db96d56Sopenharmony_ci self.assertEqual(self.serv.last_mailfrom, 'Jőhn') 14347db96d56Sopenharmony_ci self.assertEqual(self.serv.last_rcpttos, ['Sálly']) 14357db96d56Sopenharmony_ci self.assertEqual(self.serv.last_message, m) 14367db96d56Sopenharmony_ci self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 14377db96d56Sopenharmony_ci self.assertIn('SMTPUTF8', self.serv.last_mail_options) 14387db96d56Sopenharmony_ci self.assertEqual(self.serv.last_rcpt_options, []) 14397db96d56Sopenharmony_ci 14407db96d56Sopenharmony_ci def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 14417db96d56Sopenharmony_ci m = '¡a test message containing unicode!'.encode('utf-8') 14427db96d56Sopenharmony_ci smtp = smtplib.SMTP( 14437db96d56Sopenharmony_ci HOST, self.port, local_hostname='localhost', 14447db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 14457db96d56Sopenharmony_ci self.addCleanup(smtp.close) 14467db96d56Sopenharmony_ci smtp.ehlo() 14477db96d56Sopenharmony_ci self.assertEqual( 14487db96d56Sopenharmony_ci smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), 14497db96d56Sopenharmony_ci (250, b'OK')) 14507db96d56Sopenharmony_ci self.assertEqual(smtp.rcpt('János'), (250, b'OK')) 14517db96d56Sopenharmony_ci self.assertEqual(smtp.data(m), (250, b'OK')) 14527db96d56Sopenharmony_ci self.assertEqual(self.serv.last_mailfrom, 'Jő') 14537db96d56Sopenharmony_ci self.assertEqual(self.serv.last_rcpttos, ['János']) 14547db96d56Sopenharmony_ci self.assertEqual(self.serv.last_message, m) 14557db96d56Sopenharmony_ci self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 14567db96d56Sopenharmony_ci self.assertIn('SMTPUTF8', self.serv.last_mail_options) 14577db96d56Sopenharmony_ci self.assertEqual(self.serv.last_rcpt_options, []) 14587db96d56Sopenharmony_ci 14597db96d56Sopenharmony_ci def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 14607db96d56Sopenharmony_ci msg = EmailMessage() 14617db96d56Sopenharmony_ci msg['From'] = "Páolo <főo@bar.com>" 14627db96d56Sopenharmony_ci msg['To'] = 'Dinsdale' 14637db96d56Sopenharmony_ci msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 14647db96d56Sopenharmony_ci # XXX I don't know why I need two \n's here, but this is an existing 14657db96d56Sopenharmony_ci # bug (if it is one) and not a problem with the new functionality. 14667db96d56Sopenharmony_ci msg.set_content("oh là là, know what I mean, know what I mean?\n\n") 14677db96d56Sopenharmony_ci # XXX smtpd converts received /r/n to /n, so we can't easily test that 14687db96d56Sopenharmony_ci # we are successfully sending /r/n :(. 14697db96d56Sopenharmony_ci expected = textwrap.dedent("""\ 14707db96d56Sopenharmony_ci From: Páolo <főo@bar.com> 14717db96d56Sopenharmony_ci To: Dinsdale 14727db96d56Sopenharmony_ci Subject: Nudge nudge, wink, wink \u1F609 14737db96d56Sopenharmony_ci Content-Type: text/plain; charset="utf-8" 14747db96d56Sopenharmony_ci Content-Transfer-Encoding: 8bit 14757db96d56Sopenharmony_ci MIME-Version: 1.0 14767db96d56Sopenharmony_ci 14777db96d56Sopenharmony_ci oh là là, know what I mean, know what I mean? 14787db96d56Sopenharmony_ci """) 14797db96d56Sopenharmony_ci smtp = smtplib.SMTP( 14807db96d56Sopenharmony_ci HOST, self.port, local_hostname='localhost', 14817db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 14827db96d56Sopenharmony_ci self.addCleanup(smtp.close) 14837db96d56Sopenharmony_ci self.assertEqual(smtp.send_message(msg), {}) 14847db96d56Sopenharmony_ci self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com') 14857db96d56Sopenharmony_ci self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 14867db96d56Sopenharmony_ci self.assertEqual(self.serv.last_message.decode(), expected) 14877db96d56Sopenharmony_ci self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 14887db96d56Sopenharmony_ci self.assertIn('SMTPUTF8', self.serv.last_mail_options) 14897db96d56Sopenharmony_ci self.assertEqual(self.serv.last_rcpt_options, []) 14907db96d56Sopenharmony_ci 14917db96d56Sopenharmony_ci 14927db96d56Sopenharmony_ciEXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 14937db96d56Sopenharmony_ci 14947db96d56Sopenharmony_ciclass SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 14957db96d56Sopenharmony_ci def smtp_AUTH(self, arg): 14967db96d56Sopenharmony_ci # RFC 4954's AUTH command allows for an optional initial-response. 14977db96d56Sopenharmony_ci # Not all AUTH methods support this; some require a challenge. AUTH 14987db96d56Sopenharmony_ci # PLAIN does those, so test that here. See issue #15014. 14997db96d56Sopenharmony_ci args = arg.split() 15007db96d56Sopenharmony_ci if args[0].lower() == 'plain': 15017db96d56Sopenharmony_ci if len(args) == 2: 15027db96d56Sopenharmony_ci # AUTH PLAIN <initial-response> with the response base 64 15037db96d56Sopenharmony_ci # encoded. Hard code the expected response for the test. 15047db96d56Sopenharmony_ci if args[1] == EXPECTED_RESPONSE: 15057db96d56Sopenharmony_ci self.push('235 Ok') 15067db96d56Sopenharmony_ci return 15077db96d56Sopenharmony_ci self.push('571 Bad authentication') 15087db96d56Sopenharmony_ci 15097db96d56Sopenharmony_ciclass SimSMTPAUTHInitialResponseServer(SimSMTPServer): 15107db96d56Sopenharmony_ci channel_class = SimSMTPAUTHInitialResponseChannel 15117db96d56Sopenharmony_ci 15127db96d56Sopenharmony_ci 15137db96d56Sopenharmony_ciclass SMTPAUTHInitialResponseSimTests(unittest.TestCase): 15147db96d56Sopenharmony_ci def setUp(self): 15157db96d56Sopenharmony_ci self.thread_key = threading_helper.threading_setup() 15167db96d56Sopenharmony_ci self.real_getfqdn = socket.getfqdn 15177db96d56Sopenharmony_ci socket.getfqdn = mock_socket.getfqdn 15187db96d56Sopenharmony_ci self.serv_evt = threading.Event() 15197db96d56Sopenharmony_ci self.client_evt = threading.Event() 15207db96d56Sopenharmony_ci # Pick a random unused port by passing 0 for the port number 15217db96d56Sopenharmony_ci self.serv = SimSMTPAUTHInitialResponseServer( 15227db96d56Sopenharmony_ci (HOST, 0), ('nowhere', -1), decode_data=True) 15237db96d56Sopenharmony_ci # Keep a note of what port was assigned 15247db96d56Sopenharmony_ci self.port = self.serv.socket.getsockname()[1] 15257db96d56Sopenharmony_ci serv_args = (self.serv, self.serv_evt, self.client_evt) 15267db96d56Sopenharmony_ci self.thread = threading.Thread(target=debugging_server, args=serv_args) 15277db96d56Sopenharmony_ci self.thread.start() 15287db96d56Sopenharmony_ci 15297db96d56Sopenharmony_ci # wait until server thread has assigned a port number 15307db96d56Sopenharmony_ci self.serv_evt.wait() 15317db96d56Sopenharmony_ci self.serv_evt.clear() 15327db96d56Sopenharmony_ci 15337db96d56Sopenharmony_ci def tearDown(self): 15347db96d56Sopenharmony_ci socket.getfqdn = self.real_getfqdn 15357db96d56Sopenharmony_ci # indicate that the client is finished 15367db96d56Sopenharmony_ci self.client_evt.set() 15377db96d56Sopenharmony_ci # wait for the server thread to terminate 15387db96d56Sopenharmony_ci self.serv_evt.wait() 15397db96d56Sopenharmony_ci threading_helper.join_thread(self.thread) 15407db96d56Sopenharmony_ci del self.thread 15417db96d56Sopenharmony_ci self.doCleanups() 15427db96d56Sopenharmony_ci threading_helper.threading_cleanup(*self.thread_key) 15437db96d56Sopenharmony_ci 15447db96d56Sopenharmony_ci def testAUTH_PLAIN_initial_response_login(self): 15457db96d56Sopenharmony_ci self.serv.add_feature('AUTH PLAIN') 15467db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 15477db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 15487db96d56Sopenharmony_ci smtp.login('psu', 'doesnotexist') 15497db96d56Sopenharmony_ci smtp.close() 15507db96d56Sopenharmony_ci 15517db96d56Sopenharmony_ci def testAUTH_PLAIN_initial_response_auth(self): 15527db96d56Sopenharmony_ci self.serv.add_feature('AUTH PLAIN') 15537db96d56Sopenharmony_ci smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 15547db96d56Sopenharmony_ci timeout=support.LOOPBACK_TIMEOUT) 15557db96d56Sopenharmony_ci smtp.user = 'psu' 15567db96d56Sopenharmony_ci smtp.password = 'doesnotexist' 15577db96d56Sopenharmony_ci code, response = smtp.auth('plain', smtp.auth_plain) 15587db96d56Sopenharmony_ci smtp.close() 15597db96d56Sopenharmony_ci self.assertEqual(code, 235) 15607db96d56Sopenharmony_ci 15617db96d56Sopenharmony_ci 15627db96d56Sopenharmony_ciif __name__ == '__main__': 15637db96d56Sopenharmony_ci unittest.main() 1564