17db96d56Sopenharmony_ciimport base64
27db96d56Sopenharmony_ciimport email.mime.text
37db96d56Sopenharmony_cifrom email.message import EmailMessage
47db96d56Sopenharmony_cifrom email.base64mime import body_encode as encode_base64
57db96d56Sopenharmony_ciimport email.utils
67db96d56Sopenharmony_ciimport hashlib
77db96d56Sopenharmony_ciimport hmac
87db96d56Sopenharmony_ciimport socket
97db96d56Sopenharmony_ciimport smtplib
107db96d56Sopenharmony_ciimport io
117db96d56Sopenharmony_ciimport re
127db96d56Sopenharmony_ciimport sys
137db96d56Sopenharmony_ciimport time
147db96d56Sopenharmony_ciimport select
157db96d56Sopenharmony_ciimport errno
167db96d56Sopenharmony_ciimport textwrap
177db96d56Sopenharmony_ciimport threading
187db96d56Sopenharmony_ci
197db96d56Sopenharmony_ciimport unittest
207db96d56Sopenharmony_cifrom test import support, mock_socket
217db96d56Sopenharmony_cifrom test.support import hashlib_helper
227db96d56Sopenharmony_cifrom test.support import socket_helper
237db96d56Sopenharmony_cifrom test.support import threading_helper
247db96d56Sopenharmony_cifrom test.support import warnings_helper
257db96d56Sopenharmony_cifrom unittest.mock import Mock
267db96d56Sopenharmony_ci
277db96d56Sopenharmony_ci
287db96d56Sopenharmony_ciasyncore = warnings_helper.import_deprecated('asyncore')
297db96d56Sopenharmony_cismtpd = warnings_helper.import_deprecated('smtpd')
307db96d56Sopenharmony_ci
317db96d56Sopenharmony_ci
327db96d56Sopenharmony_cisupport.requires_working_socket(module=True)
337db96d56Sopenharmony_ci
347db96d56Sopenharmony_ciHOST = socket_helper.HOST
357db96d56Sopenharmony_ci
367db96d56Sopenharmony_ciif sys.platform == 'darwin':
377db96d56Sopenharmony_ci    # select.poll returns a select.POLLHUP at the end of the tests
387db96d56Sopenharmony_ci    # on darwin, so just ignore it
397db96d56Sopenharmony_ci    def handle_expt(self):
407db96d56Sopenharmony_ci        pass
417db96d56Sopenharmony_ci    smtpd.SMTPChannel.handle_expt = handle_expt
427db96d56Sopenharmony_ci
437db96d56Sopenharmony_ci
447db96d56Sopenharmony_cidef server(evt, buf, serv):
457db96d56Sopenharmony_ci    serv.listen()
467db96d56Sopenharmony_ci    evt.set()
477db96d56Sopenharmony_ci    try:
487db96d56Sopenharmony_ci        conn, addr = serv.accept()
497db96d56Sopenharmony_ci    except TimeoutError:
507db96d56Sopenharmony_ci        pass
517db96d56Sopenharmony_ci    else:
527db96d56Sopenharmony_ci        n = 500
537db96d56Sopenharmony_ci        while buf and n > 0:
547db96d56Sopenharmony_ci            r, w, e = select.select([], [conn], [])
557db96d56Sopenharmony_ci            if w:
567db96d56Sopenharmony_ci                sent = conn.send(buf)
577db96d56Sopenharmony_ci                buf = buf[sent:]
587db96d56Sopenharmony_ci
597db96d56Sopenharmony_ci            n -= 1
607db96d56Sopenharmony_ci
617db96d56Sopenharmony_ci        conn.close()
627db96d56Sopenharmony_ci    finally:
637db96d56Sopenharmony_ci        serv.close()
647db96d56Sopenharmony_ci        evt.set()
657db96d56Sopenharmony_ci
667db96d56Sopenharmony_ciclass GeneralTests:
677db96d56Sopenharmony_ci
687db96d56Sopenharmony_ci    def setUp(self):
697db96d56Sopenharmony_ci        smtplib.socket = mock_socket
707db96d56Sopenharmony_ci        self.port = 25
717db96d56Sopenharmony_ci
727db96d56Sopenharmony_ci    def tearDown(self):
737db96d56Sopenharmony_ci        smtplib.socket = socket
747db96d56Sopenharmony_ci
757db96d56Sopenharmony_ci    # This method is no longer used but is retained for backward compatibility,
767db96d56Sopenharmony_ci    # so test to make sure it still works.
777db96d56Sopenharmony_ci    def testQuoteData(self):
787db96d56Sopenharmony_ci        teststr  = "abc\n.jkl\rfoo\r\n..blue"
797db96d56Sopenharmony_ci        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
807db96d56Sopenharmony_ci        self.assertEqual(expected, smtplib.quotedata(teststr))
817db96d56Sopenharmony_ci
827db96d56Sopenharmony_ci    def testBasic1(self):
837db96d56Sopenharmony_ci        mock_socket.reply_with(b"220 Hola mundo")
847db96d56Sopenharmony_ci        # connects
857db96d56Sopenharmony_ci        client = self.client(HOST, self.port)
867db96d56Sopenharmony_ci        client.close()
877db96d56Sopenharmony_ci
887db96d56Sopenharmony_ci    def testSourceAddress(self):
897db96d56Sopenharmony_ci        mock_socket.reply_with(b"220 Hola mundo")
907db96d56Sopenharmony_ci        # connects
917db96d56Sopenharmony_ci        client = self.client(HOST, self.port,
927db96d56Sopenharmony_ci                             source_address=('127.0.0.1',19876))
937db96d56Sopenharmony_ci        self.assertEqual(client.source_address, ('127.0.0.1', 19876))
947db96d56Sopenharmony_ci        client.close()
957db96d56Sopenharmony_ci
967db96d56Sopenharmony_ci    def testBasic2(self):
977db96d56Sopenharmony_ci        mock_socket.reply_with(b"220 Hola mundo")
987db96d56Sopenharmony_ci        # connects, include port in host name
997db96d56Sopenharmony_ci        client = self.client("%s:%s" % (HOST, self.port))
1007db96d56Sopenharmony_ci        client.close()
1017db96d56Sopenharmony_ci
1027db96d56Sopenharmony_ci    def testLocalHostName(self):
1037db96d56Sopenharmony_ci        mock_socket.reply_with(b"220 Hola mundo")
1047db96d56Sopenharmony_ci        # check that supplied local_hostname is used
1057db96d56Sopenharmony_ci        client = self.client(HOST, self.port, local_hostname="testhost")
1067db96d56Sopenharmony_ci        self.assertEqual(client.local_hostname, "testhost")
1077db96d56Sopenharmony_ci        client.close()
1087db96d56Sopenharmony_ci
1097db96d56Sopenharmony_ci    def testTimeoutDefault(self):
1107db96d56Sopenharmony_ci        mock_socket.reply_with(b"220 Hola mundo")
1117db96d56Sopenharmony_ci        self.assertIsNone(mock_socket.getdefaulttimeout())
1127db96d56Sopenharmony_ci        mock_socket.setdefaulttimeout(30)
1137db96d56Sopenharmony_ci        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
1147db96d56Sopenharmony_ci        try:
1157db96d56Sopenharmony_ci            client = self.client(HOST, self.port)
1167db96d56Sopenharmony_ci        finally:
1177db96d56Sopenharmony_ci            mock_socket.setdefaulttimeout(None)
1187db96d56Sopenharmony_ci        self.assertEqual(client.sock.gettimeout(), 30)
1197db96d56Sopenharmony_ci        client.close()
1207db96d56Sopenharmony_ci
1217db96d56Sopenharmony_ci    def testTimeoutNone(self):
1227db96d56Sopenharmony_ci        mock_socket.reply_with(b"220 Hola mundo")
1237db96d56Sopenharmony_ci        self.assertIsNone(socket.getdefaulttimeout())
1247db96d56Sopenharmony_ci        socket.setdefaulttimeout(30)
1257db96d56Sopenharmony_ci        try:
1267db96d56Sopenharmony_ci            client = self.client(HOST, self.port, timeout=None)
1277db96d56Sopenharmony_ci        finally:
1287db96d56Sopenharmony_ci            socket.setdefaulttimeout(None)
1297db96d56Sopenharmony_ci        self.assertIsNone(client.sock.gettimeout())
1307db96d56Sopenharmony_ci        client.close()
1317db96d56Sopenharmony_ci
1327db96d56Sopenharmony_ci    def testTimeoutZero(self):
1337db96d56Sopenharmony_ci        mock_socket.reply_with(b"220 Hola mundo")
1347db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
1357db96d56Sopenharmony_ci            self.client(HOST, self.port, timeout=0)
1367db96d56Sopenharmony_ci
1377db96d56Sopenharmony_ci    def testTimeoutValue(self):
1387db96d56Sopenharmony_ci        mock_socket.reply_with(b"220 Hola mundo")
1397db96d56Sopenharmony_ci        client = self.client(HOST, self.port, timeout=30)
1407db96d56Sopenharmony_ci        self.assertEqual(client.sock.gettimeout(), 30)
1417db96d56Sopenharmony_ci        client.close()
1427db96d56Sopenharmony_ci
1437db96d56Sopenharmony_ci    def test_debuglevel(self):
1447db96d56Sopenharmony_ci        mock_socket.reply_with(b"220 Hello world")
1457db96d56Sopenharmony_ci        client = self.client()
1467db96d56Sopenharmony_ci        client.set_debuglevel(1)
1477db96d56Sopenharmony_ci        with support.captured_stderr() as stderr:
1487db96d56Sopenharmony_ci            client.connect(HOST, self.port)
1497db96d56Sopenharmony_ci        client.close()
1507db96d56Sopenharmony_ci        expected = re.compile(r"^connect:", re.MULTILINE)
1517db96d56Sopenharmony_ci        self.assertRegex(stderr.getvalue(), expected)
1527db96d56Sopenharmony_ci
1537db96d56Sopenharmony_ci    def test_debuglevel_2(self):
1547db96d56Sopenharmony_ci        mock_socket.reply_with(b"220 Hello world")
1557db96d56Sopenharmony_ci        client = self.client()
1567db96d56Sopenharmony_ci        client.set_debuglevel(2)
1577db96d56Sopenharmony_ci        with support.captured_stderr() as stderr:
1587db96d56Sopenharmony_ci            client.connect(HOST, self.port)
1597db96d56Sopenharmony_ci        client.close()
1607db96d56Sopenharmony_ci        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
1617db96d56Sopenharmony_ci                              re.MULTILINE)
1627db96d56Sopenharmony_ci        self.assertRegex(stderr.getvalue(), expected)
1637db96d56Sopenharmony_ci
1647db96d56Sopenharmony_ci
1657db96d56Sopenharmony_ciclass SMTPGeneralTests(GeneralTests, unittest.TestCase):
1667db96d56Sopenharmony_ci
1677db96d56Sopenharmony_ci    client = smtplib.SMTP
1687db96d56Sopenharmony_ci
1697db96d56Sopenharmony_ci
1707db96d56Sopenharmony_ciclass LMTPGeneralTests(GeneralTests, unittest.TestCase):
1717db96d56Sopenharmony_ci
1727db96d56Sopenharmony_ci    client = smtplib.LMTP
1737db96d56Sopenharmony_ci
1747db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket")
1757db96d56Sopenharmony_ci    def testUnixDomainSocketTimeoutDefault(self):
1767db96d56Sopenharmony_ci        local_host = '/some/local/lmtp/delivery/program'
1777db96d56Sopenharmony_ci        mock_socket.reply_with(b"220 Hello world")
1787db96d56Sopenharmony_ci        try:
1797db96d56Sopenharmony_ci            client = self.client(local_host, self.port)
1807db96d56Sopenharmony_ci        finally:
1817db96d56Sopenharmony_ci            mock_socket.setdefaulttimeout(None)
1827db96d56Sopenharmony_ci        self.assertIsNone(client.sock.gettimeout())
1837db96d56Sopenharmony_ci        client.close()
1847db96d56Sopenharmony_ci
1857db96d56Sopenharmony_ci    def testTimeoutZero(self):
1867db96d56Sopenharmony_ci        super().testTimeoutZero()
1877db96d56Sopenharmony_ci        local_host = '/some/local/lmtp/delivery/program'
1887db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
1897db96d56Sopenharmony_ci            self.client(local_host, timeout=0)
1907db96d56Sopenharmony_ci
1917db96d56Sopenharmony_ci# Test server thread using the specified SMTP server class
1927db96d56Sopenharmony_cidef debugging_server(serv, serv_evt, client_evt):
1937db96d56Sopenharmony_ci    serv_evt.set()
1947db96d56Sopenharmony_ci
1957db96d56Sopenharmony_ci    try:
1967db96d56Sopenharmony_ci        if hasattr(select, 'poll'):
1977db96d56Sopenharmony_ci            poll_fun = asyncore.poll2
1987db96d56Sopenharmony_ci        else:
1997db96d56Sopenharmony_ci            poll_fun = asyncore.poll
2007db96d56Sopenharmony_ci
2017db96d56Sopenharmony_ci        n = 1000
2027db96d56Sopenharmony_ci        while asyncore.socket_map and n > 0:
2037db96d56Sopenharmony_ci            poll_fun(0.01, asyncore.socket_map)
2047db96d56Sopenharmony_ci
2057db96d56Sopenharmony_ci            # when the client conversation is finished, it will
2067db96d56Sopenharmony_ci            # set client_evt, and it's then ok to kill the server
2077db96d56Sopenharmony_ci            if client_evt.is_set():
2087db96d56Sopenharmony_ci                serv.close()
2097db96d56Sopenharmony_ci                break
2107db96d56Sopenharmony_ci
2117db96d56Sopenharmony_ci            n -= 1
2127db96d56Sopenharmony_ci
2137db96d56Sopenharmony_ci    except TimeoutError:
2147db96d56Sopenharmony_ci        pass
2157db96d56Sopenharmony_ci    finally:
2167db96d56Sopenharmony_ci        if not client_evt.is_set():
2177db96d56Sopenharmony_ci            # allow some time for the client to read the result
2187db96d56Sopenharmony_ci            time.sleep(0.5)
2197db96d56Sopenharmony_ci            serv.close()
2207db96d56Sopenharmony_ci        asyncore.close_all()
2217db96d56Sopenharmony_ci        serv_evt.set()
2227db96d56Sopenharmony_ci
2237db96d56Sopenharmony_ciMSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
2247db96d56Sopenharmony_ciMSG_END = '------------ END MESSAGE ------------\n'
2257db96d56Sopenharmony_ci
2267db96d56Sopenharmony_ci# NOTE: Some SMTP objects in the tests below are created with a non-default
2277db96d56Sopenharmony_ci# local_hostname argument to the constructor, since (on some systems) the FQDN
2287db96d56Sopenharmony_ci# lookup caused by the default local_hostname sometimes takes so long that the
2297db96d56Sopenharmony_ci# test server times out, causing the test to fail.
2307db96d56Sopenharmony_ci
2317db96d56Sopenharmony_ci# Test behavior of smtpd.DebuggingServer
2327db96d56Sopenharmony_ciclass DebuggingServerTests(unittest.TestCase):
2337db96d56Sopenharmony_ci
2347db96d56Sopenharmony_ci    maxDiff = None
2357db96d56Sopenharmony_ci
2367db96d56Sopenharmony_ci    def setUp(self):
2377db96d56Sopenharmony_ci        self.thread_key = threading_helper.threading_setup()
2387db96d56Sopenharmony_ci        self.real_getfqdn = socket.getfqdn
2397db96d56Sopenharmony_ci        socket.getfqdn = mock_socket.getfqdn
2407db96d56Sopenharmony_ci        # temporarily replace sys.stdout to capture DebuggingServer output
2417db96d56Sopenharmony_ci        self.old_stdout = sys.stdout
2427db96d56Sopenharmony_ci        self.output = io.StringIO()
2437db96d56Sopenharmony_ci        sys.stdout = self.output
2447db96d56Sopenharmony_ci
2457db96d56Sopenharmony_ci        self.serv_evt = threading.Event()
2467db96d56Sopenharmony_ci        self.client_evt = threading.Event()
2477db96d56Sopenharmony_ci        # Capture SMTPChannel debug output
2487db96d56Sopenharmony_ci        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
2497db96d56Sopenharmony_ci        smtpd.DEBUGSTREAM = io.StringIO()
2507db96d56Sopenharmony_ci        # Pick a random unused port by passing 0 for the port number
2517db96d56Sopenharmony_ci        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
2527db96d56Sopenharmony_ci                                          decode_data=True)
2537db96d56Sopenharmony_ci        # Keep a note of what server host and port were assigned
2547db96d56Sopenharmony_ci        self.host, self.port = self.serv.socket.getsockname()[:2]
2557db96d56Sopenharmony_ci        serv_args = (self.serv, self.serv_evt, self.client_evt)
2567db96d56Sopenharmony_ci        self.thread = threading.Thread(target=debugging_server, args=serv_args)
2577db96d56Sopenharmony_ci        self.thread.start()
2587db96d56Sopenharmony_ci
2597db96d56Sopenharmony_ci        # wait until server thread has assigned a port number
2607db96d56Sopenharmony_ci        self.serv_evt.wait()
2617db96d56Sopenharmony_ci        self.serv_evt.clear()
2627db96d56Sopenharmony_ci
2637db96d56Sopenharmony_ci    def tearDown(self):
2647db96d56Sopenharmony_ci        socket.getfqdn = self.real_getfqdn
2657db96d56Sopenharmony_ci        # indicate that the client is finished
2667db96d56Sopenharmony_ci        self.client_evt.set()
2677db96d56Sopenharmony_ci        # wait for the server thread to terminate
2687db96d56Sopenharmony_ci        self.serv_evt.wait()
2697db96d56Sopenharmony_ci        threading_helper.join_thread(self.thread)
2707db96d56Sopenharmony_ci        # restore sys.stdout
2717db96d56Sopenharmony_ci        sys.stdout = self.old_stdout
2727db96d56Sopenharmony_ci        # restore DEBUGSTREAM
2737db96d56Sopenharmony_ci        smtpd.DEBUGSTREAM.close()
2747db96d56Sopenharmony_ci        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
2757db96d56Sopenharmony_ci        del self.thread
2767db96d56Sopenharmony_ci        self.doCleanups()
2777db96d56Sopenharmony_ci        threading_helper.threading_cleanup(*self.thread_key)
2787db96d56Sopenharmony_ci
2797db96d56Sopenharmony_ci    def get_output_without_xpeer(self):
2807db96d56Sopenharmony_ci        test_output = self.output.getvalue()
2817db96d56Sopenharmony_ci        return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
2827db96d56Sopenharmony_ci                      test_output, flags=re.MULTILINE|re.DOTALL)
2837db96d56Sopenharmony_ci
2847db96d56Sopenharmony_ci    def testBasic(self):
2857db96d56Sopenharmony_ci        # connect
2867db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
2877db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
2887db96d56Sopenharmony_ci        smtp.quit()
2897db96d56Sopenharmony_ci
2907db96d56Sopenharmony_ci    def testSourceAddress(self):
2917db96d56Sopenharmony_ci        # connect
2927db96d56Sopenharmony_ci        src_port = socket_helper.find_unused_port()
2937db96d56Sopenharmony_ci        try:
2947db96d56Sopenharmony_ci            smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
2957db96d56Sopenharmony_ci                                timeout=support.LOOPBACK_TIMEOUT,
2967db96d56Sopenharmony_ci                                source_address=(self.host, src_port))
2977db96d56Sopenharmony_ci            self.addCleanup(smtp.close)
2987db96d56Sopenharmony_ci            self.assertEqual(smtp.source_address, (self.host, src_port))
2997db96d56Sopenharmony_ci            self.assertEqual(smtp.local_hostname, 'localhost')
3007db96d56Sopenharmony_ci            smtp.quit()
3017db96d56Sopenharmony_ci        except OSError as e:
3027db96d56Sopenharmony_ci            if e.errno == errno.EADDRINUSE:
3037db96d56Sopenharmony_ci                self.skipTest("couldn't bind to source port %d" % src_port)
3047db96d56Sopenharmony_ci            raise
3057db96d56Sopenharmony_ci
3067db96d56Sopenharmony_ci    def testNOOP(self):
3077db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
3087db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
3097db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
3107db96d56Sopenharmony_ci        expected = (250, b'OK')
3117db96d56Sopenharmony_ci        self.assertEqual(smtp.noop(), expected)
3127db96d56Sopenharmony_ci        smtp.quit()
3137db96d56Sopenharmony_ci
3147db96d56Sopenharmony_ci    def testRSET(self):
3157db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
3167db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
3177db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
3187db96d56Sopenharmony_ci        expected = (250, b'OK')
3197db96d56Sopenharmony_ci        self.assertEqual(smtp.rset(), expected)
3207db96d56Sopenharmony_ci        smtp.quit()
3217db96d56Sopenharmony_ci
3227db96d56Sopenharmony_ci    def testELHO(self):
3237db96d56Sopenharmony_ci        # EHLO isn't implemented in DebuggingServer
3247db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
3257db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
3267db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
3277db96d56Sopenharmony_ci        expected = (250, b'\nSIZE 33554432\nHELP')
3287db96d56Sopenharmony_ci        self.assertEqual(smtp.ehlo(), expected)
3297db96d56Sopenharmony_ci        smtp.quit()
3307db96d56Sopenharmony_ci
3317db96d56Sopenharmony_ci    def testEXPNNotImplemented(self):
3327db96d56Sopenharmony_ci        # EXPN isn't implemented in DebuggingServer
3337db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
3347db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
3357db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
3367db96d56Sopenharmony_ci        expected = (502, b'EXPN not implemented')
3377db96d56Sopenharmony_ci        smtp.putcmd('EXPN')
3387db96d56Sopenharmony_ci        self.assertEqual(smtp.getreply(), expected)
3397db96d56Sopenharmony_ci        smtp.quit()
3407db96d56Sopenharmony_ci
3417db96d56Sopenharmony_ci    def test_issue43124_putcmd_escapes_newline(self):
3427db96d56Sopenharmony_ci        # see: https://bugs.python.org/issue43124
3437db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
3447db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
3457db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
3467db96d56Sopenharmony_ci        with self.assertRaises(ValueError) as exc:
3477db96d56Sopenharmony_ci            smtp.putcmd('helo\nX-INJECTED')
3487db96d56Sopenharmony_ci        self.assertIn("prohibited newline characters", str(exc.exception))
3497db96d56Sopenharmony_ci        smtp.quit()
3507db96d56Sopenharmony_ci
3517db96d56Sopenharmony_ci    def testVRFY(self):
3527db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
3537db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
3547db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
3557db96d56Sopenharmony_ci        expected = (252, b'Cannot VRFY user, but will accept message ' + \
3567db96d56Sopenharmony_ci                         b'and attempt delivery')
3577db96d56Sopenharmony_ci        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
3587db96d56Sopenharmony_ci        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
3597db96d56Sopenharmony_ci        smtp.quit()
3607db96d56Sopenharmony_ci
3617db96d56Sopenharmony_ci    def testSecondHELO(self):
3627db96d56Sopenharmony_ci        # check that a second HELO returns a message that it's a duplicate
3637db96d56Sopenharmony_ci        # (this behavior is specific to smtpd.SMTPChannel)
3647db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
3657db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
3667db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
3677db96d56Sopenharmony_ci        smtp.helo()
3687db96d56Sopenharmony_ci        expected = (503, b'Duplicate HELO/EHLO')
3697db96d56Sopenharmony_ci        self.assertEqual(smtp.helo(), expected)
3707db96d56Sopenharmony_ci        smtp.quit()
3717db96d56Sopenharmony_ci
3727db96d56Sopenharmony_ci    def testHELP(self):
3737db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
3747db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
3757db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
3767db96d56Sopenharmony_ci        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
3777db96d56Sopenharmony_ci                                      b'RCPT DATA RSET NOOP QUIT VRFY')
3787db96d56Sopenharmony_ci        smtp.quit()
3797db96d56Sopenharmony_ci
3807db96d56Sopenharmony_ci    def testSend(self):
3817db96d56Sopenharmony_ci        # connect and send mail
3827db96d56Sopenharmony_ci        m = 'A test message'
3837db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
3847db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
3857db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
3867db96d56Sopenharmony_ci        smtp.sendmail('John', 'Sally', m)
3877db96d56Sopenharmony_ci        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
3887db96d56Sopenharmony_ci        # in asyncore.  This sleep might help, but should really be fixed
3897db96d56Sopenharmony_ci        # properly by using an Event variable.
3907db96d56Sopenharmony_ci        time.sleep(0.01)
3917db96d56Sopenharmony_ci        smtp.quit()
3927db96d56Sopenharmony_ci
3937db96d56Sopenharmony_ci        self.client_evt.set()
3947db96d56Sopenharmony_ci        self.serv_evt.wait()
3957db96d56Sopenharmony_ci        self.output.flush()
3967db96d56Sopenharmony_ci        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
3977db96d56Sopenharmony_ci        self.assertEqual(self.output.getvalue(), mexpect)
3987db96d56Sopenharmony_ci
3997db96d56Sopenharmony_ci    def testSendBinary(self):
4007db96d56Sopenharmony_ci        m = b'A test message'
4017db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
4027db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
4037db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
4047db96d56Sopenharmony_ci        smtp.sendmail('John', 'Sally', m)
4057db96d56Sopenharmony_ci        # XXX (see comment in testSend)
4067db96d56Sopenharmony_ci        time.sleep(0.01)
4077db96d56Sopenharmony_ci        smtp.quit()
4087db96d56Sopenharmony_ci
4097db96d56Sopenharmony_ci        self.client_evt.set()
4107db96d56Sopenharmony_ci        self.serv_evt.wait()
4117db96d56Sopenharmony_ci        self.output.flush()
4127db96d56Sopenharmony_ci        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
4137db96d56Sopenharmony_ci        self.assertEqual(self.output.getvalue(), mexpect)
4147db96d56Sopenharmony_ci
4157db96d56Sopenharmony_ci    def testSendNeedingDotQuote(self):
4167db96d56Sopenharmony_ci        # Issue 12283
4177db96d56Sopenharmony_ci        m = '.A test\n.mes.sage.'
4187db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
4197db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
4207db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
4217db96d56Sopenharmony_ci        smtp.sendmail('John', 'Sally', m)
4227db96d56Sopenharmony_ci        # XXX (see comment in testSend)
4237db96d56Sopenharmony_ci        time.sleep(0.01)
4247db96d56Sopenharmony_ci        smtp.quit()
4257db96d56Sopenharmony_ci
4267db96d56Sopenharmony_ci        self.client_evt.set()
4277db96d56Sopenharmony_ci        self.serv_evt.wait()
4287db96d56Sopenharmony_ci        self.output.flush()
4297db96d56Sopenharmony_ci        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
4307db96d56Sopenharmony_ci        self.assertEqual(self.output.getvalue(), mexpect)
4317db96d56Sopenharmony_ci
4327db96d56Sopenharmony_ci    def test_issue43124_escape_localhostname(self):
4337db96d56Sopenharmony_ci        # see: https://bugs.python.org/issue43124
4347db96d56Sopenharmony_ci        # connect and send mail
4357db96d56Sopenharmony_ci        m = 'wazzuuup\nlinetwo'
4367db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED',
4377db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
4387db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
4397db96d56Sopenharmony_ci        with self.assertRaises(ValueError) as exc:
4407db96d56Sopenharmony_ci            smtp.sendmail("hi@me.com", "you@me.com", m)
4417db96d56Sopenharmony_ci        self.assertIn(
4427db96d56Sopenharmony_ci            "prohibited newline characters: ehlo hi\\nX-INJECTED",
4437db96d56Sopenharmony_ci            str(exc.exception),
4447db96d56Sopenharmony_ci        )
4457db96d56Sopenharmony_ci        # XXX (see comment in testSend)
4467db96d56Sopenharmony_ci        time.sleep(0.01)
4477db96d56Sopenharmony_ci        smtp.quit()
4487db96d56Sopenharmony_ci
4497db96d56Sopenharmony_ci        debugout = smtpd.DEBUGSTREAM.getvalue()
4507db96d56Sopenharmony_ci        self.assertNotIn("X-INJECTED", debugout)
4517db96d56Sopenharmony_ci
4527db96d56Sopenharmony_ci    def test_issue43124_escape_options(self):
4537db96d56Sopenharmony_ci        # see: https://bugs.python.org/issue43124
4547db96d56Sopenharmony_ci        # connect and send mail
4557db96d56Sopenharmony_ci        m = 'wazzuuup\nlinetwo'
4567db96d56Sopenharmony_ci        smtp = smtplib.SMTP(
4577db96d56Sopenharmony_ci            HOST, self.port, local_hostname='localhost',
4587db96d56Sopenharmony_ci            timeout=support.LOOPBACK_TIMEOUT)
4597db96d56Sopenharmony_ci
4607db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
4617db96d56Sopenharmony_ci        smtp.sendmail("hi@me.com", "you@me.com", m)
4627db96d56Sopenharmony_ci        with self.assertRaises(ValueError) as exc:
4637db96d56Sopenharmony_ci            smtp.mail("hi@me.com", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"])
4647db96d56Sopenharmony_ci        msg = str(exc.exception)
4657db96d56Sopenharmony_ci        self.assertIn("prohibited newline characters", msg)
4667db96d56Sopenharmony_ci        self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg)
4677db96d56Sopenharmony_ci        # XXX (see comment in testSend)
4687db96d56Sopenharmony_ci        time.sleep(0.01)
4697db96d56Sopenharmony_ci        smtp.quit()
4707db96d56Sopenharmony_ci
4717db96d56Sopenharmony_ci        debugout = smtpd.DEBUGSTREAM.getvalue()
4727db96d56Sopenharmony_ci        self.assertNotIn("X-OPTION", debugout)
4737db96d56Sopenharmony_ci        self.assertNotIn("X-OPTION2", debugout)
4747db96d56Sopenharmony_ci        self.assertNotIn("X-INJECTED-1", debugout)
4757db96d56Sopenharmony_ci        self.assertNotIn("X-INJECTED-2", debugout)
4767db96d56Sopenharmony_ci
4777db96d56Sopenharmony_ci    def testSendNullSender(self):
4787db96d56Sopenharmony_ci        m = 'A test message'
4797db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
4807db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
4817db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
4827db96d56Sopenharmony_ci        smtp.sendmail('<>', 'Sally', m)
4837db96d56Sopenharmony_ci        # XXX (see comment in testSend)
4847db96d56Sopenharmony_ci        time.sleep(0.01)
4857db96d56Sopenharmony_ci        smtp.quit()
4867db96d56Sopenharmony_ci
4877db96d56Sopenharmony_ci        self.client_evt.set()
4887db96d56Sopenharmony_ci        self.serv_evt.wait()
4897db96d56Sopenharmony_ci        self.output.flush()
4907db96d56Sopenharmony_ci        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
4917db96d56Sopenharmony_ci        self.assertEqual(self.output.getvalue(), mexpect)
4927db96d56Sopenharmony_ci        debugout = smtpd.DEBUGSTREAM.getvalue()
4937db96d56Sopenharmony_ci        sender = re.compile("^sender: <>$", re.MULTILINE)
4947db96d56Sopenharmony_ci        self.assertRegex(debugout, sender)
4957db96d56Sopenharmony_ci
4967db96d56Sopenharmony_ci    def testSendMessage(self):
4977db96d56Sopenharmony_ci        m = email.mime.text.MIMEText('A test message')
4987db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
4997db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
5007db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
5017db96d56Sopenharmony_ci        smtp.send_message(m, from_addr='John', to_addrs='Sally')
5027db96d56Sopenharmony_ci        # XXX (see comment in testSend)
5037db96d56Sopenharmony_ci        time.sleep(0.01)
5047db96d56Sopenharmony_ci        smtp.quit()
5057db96d56Sopenharmony_ci
5067db96d56Sopenharmony_ci        self.client_evt.set()
5077db96d56Sopenharmony_ci        self.serv_evt.wait()
5087db96d56Sopenharmony_ci        self.output.flush()
5097db96d56Sopenharmony_ci        # Remove the X-Peer header that DebuggingServer adds as figuring out
5107db96d56Sopenharmony_ci        # exactly what IP address format is put there is not easy (and
5117db96d56Sopenharmony_ci        # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
5127db96d56Sopenharmony_ci        # not always the same as socket.gethostbyname(HOST). :(
5137db96d56Sopenharmony_ci        test_output = self.get_output_without_xpeer()
5147db96d56Sopenharmony_ci        del m['X-Peer']
5157db96d56Sopenharmony_ci        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
5167db96d56Sopenharmony_ci        self.assertEqual(test_output, mexpect)
5177db96d56Sopenharmony_ci
5187db96d56Sopenharmony_ci    def testSendMessageWithAddresses(self):
5197db96d56Sopenharmony_ci        m = email.mime.text.MIMEText('A test message')
5207db96d56Sopenharmony_ci        m['From'] = 'foo@bar.com'
5217db96d56Sopenharmony_ci        m['To'] = 'John'
5227db96d56Sopenharmony_ci        m['CC'] = 'Sally, Fred'
5237db96d56Sopenharmony_ci        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
5247db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
5257db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
5267db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
5277db96d56Sopenharmony_ci        smtp.send_message(m)
5287db96d56Sopenharmony_ci        # XXX (see comment in testSend)
5297db96d56Sopenharmony_ci        time.sleep(0.01)
5307db96d56Sopenharmony_ci        smtp.quit()
5317db96d56Sopenharmony_ci        # make sure the Bcc header is still in the message.
5327db96d56Sopenharmony_ci        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
5337db96d56Sopenharmony_ci                                    '<warped@silly.walks.com>')
5347db96d56Sopenharmony_ci
5357db96d56Sopenharmony_ci        self.client_evt.set()
5367db96d56Sopenharmony_ci        self.serv_evt.wait()
5377db96d56Sopenharmony_ci        self.output.flush()
5387db96d56Sopenharmony_ci        # Remove the X-Peer header that DebuggingServer adds.
5397db96d56Sopenharmony_ci        test_output = self.get_output_without_xpeer()
5407db96d56Sopenharmony_ci        del m['X-Peer']
5417db96d56Sopenharmony_ci        # The Bcc header should not be transmitted.
5427db96d56Sopenharmony_ci        del m['Bcc']
5437db96d56Sopenharmony_ci        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
5447db96d56Sopenharmony_ci        self.assertEqual(test_output, mexpect)
5457db96d56Sopenharmony_ci        debugout = smtpd.DEBUGSTREAM.getvalue()
5467db96d56Sopenharmony_ci        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
5477db96d56Sopenharmony_ci        self.assertRegex(debugout, sender)
5487db96d56Sopenharmony_ci        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
5497db96d56Sopenharmony_ci                     'warped@silly.walks.com'):
5507db96d56Sopenharmony_ci            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
5517db96d56Sopenharmony_ci                                 re.MULTILINE)
5527db96d56Sopenharmony_ci            self.assertRegex(debugout, to_addr)
5537db96d56Sopenharmony_ci
5547db96d56Sopenharmony_ci    def testSendMessageWithSomeAddresses(self):
5557db96d56Sopenharmony_ci        # Make sure nothing breaks if not all of the three 'to' headers exist
5567db96d56Sopenharmony_ci        m = email.mime.text.MIMEText('A test message')
5577db96d56Sopenharmony_ci        m['From'] = 'foo@bar.com'
5587db96d56Sopenharmony_ci        m['To'] = 'John, Dinsdale'
5597db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
5607db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
5617db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
5627db96d56Sopenharmony_ci        smtp.send_message(m)
5637db96d56Sopenharmony_ci        # XXX (see comment in testSend)
5647db96d56Sopenharmony_ci        time.sleep(0.01)
5657db96d56Sopenharmony_ci        smtp.quit()
5667db96d56Sopenharmony_ci
5677db96d56Sopenharmony_ci        self.client_evt.set()
5687db96d56Sopenharmony_ci        self.serv_evt.wait()
5697db96d56Sopenharmony_ci        self.output.flush()
5707db96d56Sopenharmony_ci        # Remove the X-Peer header that DebuggingServer adds.
5717db96d56Sopenharmony_ci        test_output = self.get_output_without_xpeer()
5727db96d56Sopenharmony_ci        del m['X-Peer']
5737db96d56Sopenharmony_ci        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
5747db96d56Sopenharmony_ci        self.assertEqual(test_output, mexpect)
5757db96d56Sopenharmony_ci        debugout = smtpd.DEBUGSTREAM.getvalue()
5767db96d56Sopenharmony_ci        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
5777db96d56Sopenharmony_ci        self.assertRegex(debugout, sender)
5787db96d56Sopenharmony_ci        for addr in ('John', 'Dinsdale'):
5797db96d56Sopenharmony_ci            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
5807db96d56Sopenharmony_ci                                 re.MULTILINE)
5817db96d56Sopenharmony_ci            self.assertRegex(debugout, to_addr)
5827db96d56Sopenharmony_ci
5837db96d56Sopenharmony_ci    def testSendMessageWithSpecifiedAddresses(self):
5847db96d56Sopenharmony_ci        # Make sure addresses specified in call override those in message.
5857db96d56Sopenharmony_ci        m = email.mime.text.MIMEText('A test message')
5867db96d56Sopenharmony_ci        m['From'] = 'foo@bar.com'
5877db96d56Sopenharmony_ci        m['To'] = 'John, Dinsdale'
5887db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
5897db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
5907db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
5917db96d56Sopenharmony_ci        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
5927db96d56Sopenharmony_ci        # XXX (see comment in testSend)
5937db96d56Sopenharmony_ci        time.sleep(0.01)
5947db96d56Sopenharmony_ci        smtp.quit()
5957db96d56Sopenharmony_ci
5967db96d56Sopenharmony_ci        self.client_evt.set()
5977db96d56Sopenharmony_ci        self.serv_evt.wait()
5987db96d56Sopenharmony_ci        self.output.flush()
5997db96d56Sopenharmony_ci        # Remove the X-Peer header that DebuggingServer adds.
6007db96d56Sopenharmony_ci        test_output = self.get_output_without_xpeer()
6017db96d56Sopenharmony_ci        del m['X-Peer']
6027db96d56Sopenharmony_ci        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
6037db96d56Sopenharmony_ci        self.assertEqual(test_output, mexpect)
6047db96d56Sopenharmony_ci        debugout = smtpd.DEBUGSTREAM.getvalue()
6057db96d56Sopenharmony_ci        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
6067db96d56Sopenharmony_ci        self.assertRegex(debugout, sender)
6077db96d56Sopenharmony_ci        for addr in ('John', 'Dinsdale'):
6087db96d56Sopenharmony_ci            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
6097db96d56Sopenharmony_ci                                 re.MULTILINE)
6107db96d56Sopenharmony_ci            self.assertNotRegex(debugout, to_addr)
6117db96d56Sopenharmony_ci        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
6127db96d56Sopenharmony_ci        self.assertRegex(debugout, recip)
6137db96d56Sopenharmony_ci
6147db96d56Sopenharmony_ci    def testSendMessageWithMultipleFrom(self):
6157db96d56Sopenharmony_ci        # Sender overrides To
6167db96d56Sopenharmony_ci        m = email.mime.text.MIMEText('A test message')
6177db96d56Sopenharmony_ci        m['From'] = 'Bernard, Bianca'
6187db96d56Sopenharmony_ci        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
6197db96d56Sopenharmony_ci        m['To'] = 'John, Dinsdale'
6207db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
6217db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
6227db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
6237db96d56Sopenharmony_ci        smtp.send_message(m)
6247db96d56Sopenharmony_ci        # XXX (see comment in testSend)
6257db96d56Sopenharmony_ci        time.sleep(0.01)
6267db96d56Sopenharmony_ci        smtp.quit()
6277db96d56Sopenharmony_ci
6287db96d56Sopenharmony_ci        self.client_evt.set()
6297db96d56Sopenharmony_ci        self.serv_evt.wait()
6307db96d56Sopenharmony_ci        self.output.flush()
6317db96d56Sopenharmony_ci        # Remove the X-Peer header that DebuggingServer adds.
6327db96d56Sopenharmony_ci        test_output = self.get_output_without_xpeer()
6337db96d56Sopenharmony_ci        del m['X-Peer']
6347db96d56Sopenharmony_ci        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
6357db96d56Sopenharmony_ci        self.assertEqual(test_output, mexpect)
6367db96d56Sopenharmony_ci        debugout = smtpd.DEBUGSTREAM.getvalue()
6377db96d56Sopenharmony_ci        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
6387db96d56Sopenharmony_ci        self.assertRegex(debugout, sender)
6397db96d56Sopenharmony_ci        for addr in ('John', 'Dinsdale'):
6407db96d56Sopenharmony_ci            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
6417db96d56Sopenharmony_ci                                 re.MULTILINE)
6427db96d56Sopenharmony_ci            self.assertRegex(debugout, to_addr)
6437db96d56Sopenharmony_ci
6447db96d56Sopenharmony_ci    def testSendMessageResent(self):
6457db96d56Sopenharmony_ci        m = email.mime.text.MIMEText('A test message')
6467db96d56Sopenharmony_ci        m['From'] = 'foo@bar.com'
6477db96d56Sopenharmony_ci        m['To'] = 'John'
6487db96d56Sopenharmony_ci        m['CC'] = 'Sally, Fred'
6497db96d56Sopenharmony_ci        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
6507db96d56Sopenharmony_ci        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
6517db96d56Sopenharmony_ci        m['Resent-From'] = 'holy@grail.net'
6527db96d56Sopenharmony_ci        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
6537db96d56Sopenharmony_ci        m['Resent-Bcc'] = 'doe@losthope.net'
6547db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
6557db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
6567db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
6577db96d56Sopenharmony_ci        smtp.send_message(m)
6587db96d56Sopenharmony_ci        # XXX (see comment in testSend)
6597db96d56Sopenharmony_ci        time.sleep(0.01)
6607db96d56Sopenharmony_ci        smtp.quit()
6617db96d56Sopenharmony_ci
6627db96d56Sopenharmony_ci        self.client_evt.set()
6637db96d56Sopenharmony_ci        self.serv_evt.wait()
6647db96d56Sopenharmony_ci        self.output.flush()
6657db96d56Sopenharmony_ci        # The Resent-Bcc headers are deleted before serialization.
6667db96d56Sopenharmony_ci        del m['Bcc']
6677db96d56Sopenharmony_ci        del m['Resent-Bcc']
6687db96d56Sopenharmony_ci        # Remove the X-Peer header that DebuggingServer adds.
6697db96d56Sopenharmony_ci        test_output = self.get_output_without_xpeer()
6707db96d56Sopenharmony_ci        del m['X-Peer']
6717db96d56Sopenharmony_ci        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
6727db96d56Sopenharmony_ci        self.assertEqual(test_output, mexpect)
6737db96d56Sopenharmony_ci        debugout = smtpd.DEBUGSTREAM.getvalue()
6747db96d56Sopenharmony_ci        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
6757db96d56Sopenharmony_ci        self.assertRegex(debugout, sender)
6767db96d56Sopenharmony_ci        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
6777db96d56Sopenharmony_ci            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
6787db96d56Sopenharmony_ci                                 re.MULTILINE)
6797db96d56Sopenharmony_ci            self.assertRegex(debugout, to_addr)
6807db96d56Sopenharmony_ci
6817db96d56Sopenharmony_ci    def testSendMessageMultipleResentRaises(self):
6827db96d56Sopenharmony_ci        m = email.mime.text.MIMEText('A test message')
6837db96d56Sopenharmony_ci        m['From'] = 'foo@bar.com'
6847db96d56Sopenharmony_ci        m['To'] = 'John'
6857db96d56Sopenharmony_ci        m['CC'] = 'Sally, Fred'
6867db96d56Sopenharmony_ci        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
6877db96d56Sopenharmony_ci        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
6887db96d56Sopenharmony_ci        m['Resent-From'] = 'holy@grail.net'
6897db96d56Sopenharmony_ci        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
6907db96d56Sopenharmony_ci        m['Resent-Bcc'] = 'doe@losthope.net'
6917db96d56Sopenharmony_ci        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
6927db96d56Sopenharmony_ci        m['Resent-To'] = 'holy@grail.net'
6937db96d56Sopenharmony_ci        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
6947db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
6957db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
6967db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
6977db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
6987db96d56Sopenharmony_ci            smtp.send_message(m)
6997db96d56Sopenharmony_ci        smtp.close()
7007db96d56Sopenharmony_ci
7017db96d56Sopenharmony_ciclass NonConnectingTests(unittest.TestCase):
7027db96d56Sopenharmony_ci
7037db96d56Sopenharmony_ci    def testNotConnected(self):
7047db96d56Sopenharmony_ci        # Test various operations on an unconnected SMTP object that
7057db96d56Sopenharmony_ci        # should raise exceptions (at present the attempt in SMTP.send
7067db96d56Sopenharmony_ci        # to reference the nonexistent 'sock' attribute of the SMTP object
7077db96d56Sopenharmony_ci        # causes an AttributeError)
7087db96d56Sopenharmony_ci        smtp = smtplib.SMTP()
7097db96d56Sopenharmony_ci        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
7107db96d56Sopenharmony_ci        self.assertRaises(smtplib.SMTPServerDisconnected,
7117db96d56Sopenharmony_ci                          smtp.send, 'test msg')
7127db96d56Sopenharmony_ci
7137db96d56Sopenharmony_ci    def testNonnumericPort(self):
7147db96d56Sopenharmony_ci        # check that non-numeric port raises OSError
7157db96d56Sopenharmony_ci        self.assertRaises(OSError, smtplib.SMTP,
7167db96d56Sopenharmony_ci                          "localhost", "bogus")
7177db96d56Sopenharmony_ci        self.assertRaises(OSError, smtplib.SMTP,
7187db96d56Sopenharmony_ci                          "localhost:bogus")
7197db96d56Sopenharmony_ci
7207db96d56Sopenharmony_ci    def testSockAttributeExists(self):
7217db96d56Sopenharmony_ci        # check that sock attribute is present outside of a connect() call
7227db96d56Sopenharmony_ci        # (regression test, the previous behavior raised an
7237db96d56Sopenharmony_ci        #  AttributeError: 'SMTP' object has no attribute 'sock')
7247db96d56Sopenharmony_ci        with smtplib.SMTP() as smtp:
7257db96d56Sopenharmony_ci            self.assertIsNone(smtp.sock)
7267db96d56Sopenharmony_ci
7277db96d56Sopenharmony_ci
7287db96d56Sopenharmony_ciclass DefaultArgumentsTests(unittest.TestCase):
7297db96d56Sopenharmony_ci
7307db96d56Sopenharmony_ci    def setUp(self):
7317db96d56Sopenharmony_ci        self.msg = EmailMessage()
7327db96d56Sopenharmony_ci        self.msg['From'] = 'Páolo <főo@bar.com>'
7337db96d56Sopenharmony_ci        self.smtp = smtplib.SMTP()
7347db96d56Sopenharmony_ci        self.smtp.ehlo = Mock(return_value=(200, 'OK'))
7357db96d56Sopenharmony_ci        self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
7367db96d56Sopenharmony_ci
7377db96d56Sopenharmony_ci    def testSendMessage(self):
7387db96d56Sopenharmony_ci        expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
7397db96d56Sopenharmony_ci        self.smtp.send_message(self.msg)
7407db96d56Sopenharmony_ci        self.smtp.send_message(self.msg)
7417db96d56Sopenharmony_ci        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
7427db96d56Sopenharmony_ci                         expected_mail_options)
7437db96d56Sopenharmony_ci        self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
7447db96d56Sopenharmony_ci                         expected_mail_options)
7457db96d56Sopenharmony_ci
7467db96d56Sopenharmony_ci    def testSendMessageWithMailOptions(self):
7477db96d56Sopenharmony_ci        mail_options = ['STARTTLS']
7487db96d56Sopenharmony_ci        expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
7497db96d56Sopenharmony_ci        self.smtp.send_message(self.msg, None, None, mail_options)
7507db96d56Sopenharmony_ci        self.assertEqual(mail_options, ['STARTTLS'])
7517db96d56Sopenharmony_ci        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
7527db96d56Sopenharmony_ci                         expected_mail_options)
7537db96d56Sopenharmony_ci
7547db96d56Sopenharmony_ci
7557db96d56Sopenharmony_ci# test response of client to a non-successful HELO message
7567db96d56Sopenharmony_ciclass BadHELOServerTests(unittest.TestCase):
7577db96d56Sopenharmony_ci
7587db96d56Sopenharmony_ci    def setUp(self):
7597db96d56Sopenharmony_ci        smtplib.socket = mock_socket
7607db96d56Sopenharmony_ci        mock_socket.reply_with(b"199 no hello for you!")
7617db96d56Sopenharmony_ci        self.old_stdout = sys.stdout
7627db96d56Sopenharmony_ci        self.output = io.StringIO()
7637db96d56Sopenharmony_ci        sys.stdout = self.output
7647db96d56Sopenharmony_ci        self.port = 25
7657db96d56Sopenharmony_ci
7667db96d56Sopenharmony_ci    def tearDown(self):
7677db96d56Sopenharmony_ci        smtplib.socket = socket
7687db96d56Sopenharmony_ci        sys.stdout = self.old_stdout
7697db96d56Sopenharmony_ci
7707db96d56Sopenharmony_ci    def testFailingHELO(self):
7717db96d56Sopenharmony_ci        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
7727db96d56Sopenharmony_ci                            HOST, self.port, 'localhost', 3)
7737db96d56Sopenharmony_ci
7747db96d56Sopenharmony_ci
7757db96d56Sopenharmony_ciclass TooLongLineTests(unittest.TestCase):
7767db96d56Sopenharmony_ci    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
7777db96d56Sopenharmony_ci
7787db96d56Sopenharmony_ci    def setUp(self):
7797db96d56Sopenharmony_ci        self.thread_key = threading_helper.threading_setup()
7807db96d56Sopenharmony_ci        self.old_stdout = sys.stdout
7817db96d56Sopenharmony_ci        self.output = io.StringIO()
7827db96d56Sopenharmony_ci        sys.stdout = self.output
7837db96d56Sopenharmony_ci
7847db96d56Sopenharmony_ci        self.evt = threading.Event()
7857db96d56Sopenharmony_ci        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
7867db96d56Sopenharmony_ci        self.sock.settimeout(15)
7877db96d56Sopenharmony_ci        self.port = socket_helper.bind_port(self.sock)
7887db96d56Sopenharmony_ci        servargs = (self.evt, self.respdata, self.sock)
7897db96d56Sopenharmony_ci        self.thread = threading.Thread(target=server, args=servargs)
7907db96d56Sopenharmony_ci        self.thread.start()
7917db96d56Sopenharmony_ci        self.evt.wait()
7927db96d56Sopenharmony_ci        self.evt.clear()
7937db96d56Sopenharmony_ci
7947db96d56Sopenharmony_ci    def tearDown(self):
7957db96d56Sopenharmony_ci        self.evt.wait()
7967db96d56Sopenharmony_ci        sys.stdout = self.old_stdout
7977db96d56Sopenharmony_ci        threading_helper.join_thread(self.thread)
7987db96d56Sopenharmony_ci        del self.thread
7997db96d56Sopenharmony_ci        self.doCleanups()
8007db96d56Sopenharmony_ci        threading_helper.threading_cleanup(*self.thread_key)
8017db96d56Sopenharmony_ci
8027db96d56Sopenharmony_ci    def testLineTooLong(self):
8037db96d56Sopenharmony_ci        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
8047db96d56Sopenharmony_ci                          HOST, self.port, 'localhost', 3)
8057db96d56Sopenharmony_ci
8067db96d56Sopenharmony_ci
8077db96d56Sopenharmony_cisim_users = {'Mr.A@somewhere.com':'John A',
8087db96d56Sopenharmony_ci             'Ms.B@xn--fo-fka.com':'Sally B',
8097db96d56Sopenharmony_ci             'Mrs.C@somewhereesle.com':'Ruth C',
8107db96d56Sopenharmony_ci            }
8117db96d56Sopenharmony_ci
8127db96d56Sopenharmony_cisim_auth = ('Mr.A@somewhere.com', 'somepassword')
8137db96d56Sopenharmony_cisim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
8147db96d56Sopenharmony_ci                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
8157db96d56Sopenharmony_cisim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
8167db96d56Sopenharmony_ci             'list-2':['Ms.B@xn--fo-fka.com',],
8177db96d56Sopenharmony_ci            }
8187db96d56Sopenharmony_ci
8197db96d56Sopenharmony_ci# Simulated SMTP channel & server
8207db96d56Sopenharmony_ciclass ResponseException(Exception): pass
8217db96d56Sopenharmony_ciclass SimSMTPChannel(smtpd.SMTPChannel):
8227db96d56Sopenharmony_ci
8237db96d56Sopenharmony_ci    quit_response = None
8247db96d56Sopenharmony_ci    mail_response = None
8257db96d56Sopenharmony_ci    rcpt_response = None
8267db96d56Sopenharmony_ci    data_response = None
8277db96d56Sopenharmony_ci    rcpt_count = 0
8287db96d56Sopenharmony_ci    rset_count = 0
8297db96d56Sopenharmony_ci    disconnect = 0
8307db96d56Sopenharmony_ci    AUTH = 99    # Add protocol state to enable auth testing.
8317db96d56Sopenharmony_ci    authenticated_user = None
8327db96d56Sopenharmony_ci
8337db96d56Sopenharmony_ci    def __init__(self, extra_features, *args, **kw):
8347db96d56Sopenharmony_ci        self._extrafeatures = ''.join(
8357db96d56Sopenharmony_ci            [ "250-{0}\r\n".format(x) for x in extra_features ])
8367db96d56Sopenharmony_ci        super(SimSMTPChannel, self).__init__(*args, **kw)
8377db96d56Sopenharmony_ci
8387db96d56Sopenharmony_ci    # AUTH related stuff.  It would be nice if support for this were in smtpd.
8397db96d56Sopenharmony_ci    def found_terminator(self):
8407db96d56Sopenharmony_ci        if self.smtp_state == self.AUTH:
8417db96d56Sopenharmony_ci            line = self._emptystring.join(self.received_lines)
8427db96d56Sopenharmony_ci            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
8437db96d56Sopenharmony_ci            self.received_lines = []
8447db96d56Sopenharmony_ci            try:
8457db96d56Sopenharmony_ci                self.auth_object(line)
8467db96d56Sopenharmony_ci            except ResponseException as e:
8477db96d56Sopenharmony_ci                self.smtp_state = self.COMMAND
8487db96d56Sopenharmony_ci                self.push('%s %s' % (e.smtp_code, e.smtp_error))
8497db96d56Sopenharmony_ci            return
8507db96d56Sopenharmony_ci        super().found_terminator()
8517db96d56Sopenharmony_ci
8527db96d56Sopenharmony_ci
8537db96d56Sopenharmony_ci    def smtp_AUTH(self, arg):
8547db96d56Sopenharmony_ci        if not self.seen_greeting:
8557db96d56Sopenharmony_ci            self.push('503 Error: send EHLO first')
8567db96d56Sopenharmony_ci            return
8577db96d56Sopenharmony_ci        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
8587db96d56Sopenharmony_ci            self.push('500 Error: command "AUTH" not recognized')
8597db96d56Sopenharmony_ci            return
8607db96d56Sopenharmony_ci        if self.authenticated_user is not None:
8617db96d56Sopenharmony_ci            self.push(
8627db96d56Sopenharmony_ci                '503 Bad sequence of commands: already authenticated')
8637db96d56Sopenharmony_ci            return
8647db96d56Sopenharmony_ci        args = arg.split()
8657db96d56Sopenharmony_ci        if len(args) not in [1, 2]:
8667db96d56Sopenharmony_ci            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
8677db96d56Sopenharmony_ci            return
8687db96d56Sopenharmony_ci        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
8697db96d56Sopenharmony_ci        try:
8707db96d56Sopenharmony_ci            self.auth_object = getattr(self, auth_object_name)
8717db96d56Sopenharmony_ci        except AttributeError:
8727db96d56Sopenharmony_ci            self.push('504 Command parameter not implemented: unsupported '
8737db96d56Sopenharmony_ci                      ' authentication mechanism {!r}'.format(auth_object_name))
8747db96d56Sopenharmony_ci            return
8757db96d56Sopenharmony_ci        self.smtp_state = self.AUTH
8767db96d56Sopenharmony_ci        self.auth_object(args[1] if len(args) == 2 else None)
8777db96d56Sopenharmony_ci
8787db96d56Sopenharmony_ci    def _authenticated(self, user, valid):
8797db96d56Sopenharmony_ci        if valid:
8807db96d56Sopenharmony_ci            self.authenticated_user = user
8817db96d56Sopenharmony_ci            self.push('235 Authentication Succeeded')
8827db96d56Sopenharmony_ci        else:
8837db96d56Sopenharmony_ci            self.push('535 Authentication credentials invalid')
8847db96d56Sopenharmony_ci        self.smtp_state = self.COMMAND
8857db96d56Sopenharmony_ci
8867db96d56Sopenharmony_ci    def _decode_base64(self, string):
8877db96d56Sopenharmony_ci        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
8887db96d56Sopenharmony_ci
8897db96d56Sopenharmony_ci    def _auth_plain(self, arg=None):
8907db96d56Sopenharmony_ci        if arg is None:
8917db96d56Sopenharmony_ci            self.push('334 ')
8927db96d56Sopenharmony_ci        else:
8937db96d56Sopenharmony_ci            logpass = self._decode_base64(arg)
8947db96d56Sopenharmony_ci            try:
8957db96d56Sopenharmony_ci                *_, user, password = logpass.split('\0')
8967db96d56Sopenharmony_ci            except ValueError as e:
8977db96d56Sopenharmony_ci                self.push('535 Splitting response {!r} into user and password'
8987db96d56Sopenharmony_ci                          ' failed: {}'.format(logpass, e))
8997db96d56Sopenharmony_ci                return
9007db96d56Sopenharmony_ci            self._authenticated(user, password == sim_auth[1])
9017db96d56Sopenharmony_ci
9027db96d56Sopenharmony_ci    def _auth_login(self, arg=None):
9037db96d56Sopenharmony_ci        if arg is None:
9047db96d56Sopenharmony_ci            # base64 encoded 'Username:'
9057db96d56Sopenharmony_ci            self.push('334 VXNlcm5hbWU6')
9067db96d56Sopenharmony_ci        elif not hasattr(self, '_auth_login_user'):
9077db96d56Sopenharmony_ci            self._auth_login_user = self._decode_base64(arg)
9087db96d56Sopenharmony_ci            # base64 encoded 'Password:'
9097db96d56Sopenharmony_ci            self.push('334 UGFzc3dvcmQ6')
9107db96d56Sopenharmony_ci        else:
9117db96d56Sopenharmony_ci            password = self._decode_base64(arg)
9127db96d56Sopenharmony_ci            self._authenticated(self._auth_login_user, password == sim_auth[1])
9137db96d56Sopenharmony_ci            del self._auth_login_user
9147db96d56Sopenharmony_ci
9157db96d56Sopenharmony_ci    def _auth_buggy(self, arg=None):
9167db96d56Sopenharmony_ci        # This AUTH mechanism will 'trap' client in a neverending 334
9177db96d56Sopenharmony_ci        # base64 encoded 'BuGgYbUgGy'
9187db96d56Sopenharmony_ci        self.push('334 QnVHZ1liVWdHeQ==')
9197db96d56Sopenharmony_ci
9207db96d56Sopenharmony_ci    def _auth_cram_md5(self, arg=None):
9217db96d56Sopenharmony_ci        if arg is None:
9227db96d56Sopenharmony_ci            self.push('334 {}'.format(sim_cram_md5_challenge))
9237db96d56Sopenharmony_ci        else:
9247db96d56Sopenharmony_ci            logpass = self._decode_base64(arg)
9257db96d56Sopenharmony_ci            try:
9267db96d56Sopenharmony_ci                user, hashed_pass = logpass.split()
9277db96d56Sopenharmony_ci            except ValueError as e:
9287db96d56Sopenharmony_ci                self.push('535 Splitting response {!r} into user and password '
9297db96d56Sopenharmony_ci                          'failed: {}'.format(logpass, e))
9307db96d56Sopenharmony_ci                return False
9317db96d56Sopenharmony_ci            valid_hashed_pass = hmac.HMAC(
9327db96d56Sopenharmony_ci                sim_auth[1].encode('ascii'),
9337db96d56Sopenharmony_ci                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
9347db96d56Sopenharmony_ci                'md5').hexdigest()
9357db96d56Sopenharmony_ci            self._authenticated(user, hashed_pass == valid_hashed_pass)
9367db96d56Sopenharmony_ci    # end AUTH related stuff.
9377db96d56Sopenharmony_ci
9387db96d56Sopenharmony_ci    def smtp_EHLO(self, arg):
9397db96d56Sopenharmony_ci        resp = ('250-testhost\r\n'
9407db96d56Sopenharmony_ci                '250-EXPN\r\n'
9417db96d56Sopenharmony_ci                '250-SIZE 20000000\r\n'
9427db96d56Sopenharmony_ci                '250-STARTTLS\r\n'
9437db96d56Sopenharmony_ci                '250-DELIVERBY\r\n')
9447db96d56Sopenharmony_ci        resp = resp + self._extrafeatures + '250 HELP'
9457db96d56Sopenharmony_ci        self.push(resp)
9467db96d56Sopenharmony_ci        self.seen_greeting = arg
9477db96d56Sopenharmony_ci        self.extended_smtp = True
9487db96d56Sopenharmony_ci
9497db96d56Sopenharmony_ci    def smtp_VRFY(self, arg):
9507db96d56Sopenharmony_ci        # For max compatibility smtplib should be sending the raw address.
9517db96d56Sopenharmony_ci        if arg in sim_users:
9527db96d56Sopenharmony_ci            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
9537db96d56Sopenharmony_ci        else:
9547db96d56Sopenharmony_ci            self.push('550 No such user: %s' % arg)
9557db96d56Sopenharmony_ci
9567db96d56Sopenharmony_ci    def smtp_EXPN(self, arg):
9577db96d56Sopenharmony_ci        list_name = arg.lower()
9587db96d56Sopenharmony_ci        if list_name in sim_lists:
9597db96d56Sopenharmony_ci            user_list = sim_lists[list_name]
9607db96d56Sopenharmony_ci            for n, user_email in enumerate(user_list):
9617db96d56Sopenharmony_ci                quoted_addr = smtplib.quoteaddr(user_email)
9627db96d56Sopenharmony_ci                if n < len(user_list) - 1:
9637db96d56Sopenharmony_ci                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
9647db96d56Sopenharmony_ci                else:
9657db96d56Sopenharmony_ci                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
9667db96d56Sopenharmony_ci        else:
9677db96d56Sopenharmony_ci            self.push('550 No access for you!')
9687db96d56Sopenharmony_ci
9697db96d56Sopenharmony_ci    def smtp_QUIT(self, arg):
9707db96d56Sopenharmony_ci        if self.quit_response is None:
9717db96d56Sopenharmony_ci            super(SimSMTPChannel, self).smtp_QUIT(arg)
9727db96d56Sopenharmony_ci        else:
9737db96d56Sopenharmony_ci            self.push(self.quit_response)
9747db96d56Sopenharmony_ci            self.close_when_done()
9757db96d56Sopenharmony_ci
9767db96d56Sopenharmony_ci    def smtp_MAIL(self, arg):
9777db96d56Sopenharmony_ci        if self.mail_response is None:
9787db96d56Sopenharmony_ci            super().smtp_MAIL(arg)
9797db96d56Sopenharmony_ci        else:
9807db96d56Sopenharmony_ci            self.push(self.mail_response)
9817db96d56Sopenharmony_ci            if self.disconnect:
9827db96d56Sopenharmony_ci                self.close_when_done()
9837db96d56Sopenharmony_ci
9847db96d56Sopenharmony_ci    def smtp_RCPT(self, arg):
9857db96d56Sopenharmony_ci        if self.rcpt_response is None:
9867db96d56Sopenharmony_ci            super().smtp_RCPT(arg)
9877db96d56Sopenharmony_ci            return
9887db96d56Sopenharmony_ci        self.rcpt_count += 1
9897db96d56Sopenharmony_ci        self.push(self.rcpt_response[self.rcpt_count-1])
9907db96d56Sopenharmony_ci
9917db96d56Sopenharmony_ci    def smtp_RSET(self, arg):
9927db96d56Sopenharmony_ci        self.rset_count += 1
9937db96d56Sopenharmony_ci        super().smtp_RSET(arg)
9947db96d56Sopenharmony_ci
9957db96d56Sopenharmony_ci    def smtp_DATA(self, arg):
9967db96d56Sopenharmony_ci        if self.data_response is None:
9977db96d56Sopenharmony_ci            super().smtp_DATA(arg)
9987db96d56Sopenharmony_ci        else:
9997db96d56Sopenharmony_ci            self.push(self.data_response)
10007db96d56Sopenharmony_ci
10017db96d56Sopenharmony_ci    def handle_error(self):
10027db96d56Sopenharmony_ci        raise
10037db96d56Sopenharmony_ci
10047db96d56Sopenharmony_ci
10057db96d56Sopenharmony_ciclass SimSMTPServer(smtpd.SMTPServer):
10067db96d56Sopenharmony_ci
10077db96d56Sopenharmony_ci    channel_class = SimSMTPChannel
10087db96d56Sopenharmony_ci
10097db96d56Sopenharmony_ci    def __init__(self, *args, **kw):
10107db96d56Sopenharmony_ci        self._extra_features = []
10117db96d56Sopenharmony_ci        self._addresses = {}
10127db96d56Sopenharmony_ci        smtpd.SMTPServer.__init__(self, *args, **kw)
10137db96d56Sopenharmony_ci
10147db96d56Sopenharmony_ci    def handle_accepted(self, conn, addr):
10157db96d56Sopenharmony_ci        self._SMTPchannel = self.channel_class(
10167db96d56Sopenharmony_ci            self._extra_features, self, conn, addr,
10177db96d56Sopenharmony_ci            decode_data=self._decode_data)
10187db96d56Sopenharmony_ci
10197db96d56Sopenharmony_ci    def process_message(self, peer, mailfrom, rcpttos, data):
10207db96d56Sopenharmony_ci        self._addresses['from'] = mailfrom
10217db96d56Sopenharmony_ci        self._addresses['tos'] = rcpttos
10227db96d56Sopenharmony_ci
10237db96d56Sopenharmony_ci    def add_feature(self, feature):
10247db96d56Sopenharmony_ci        self._extra_features.append(feature)
10257db96d56Sopenharmony_ci
10267db96d56Sopenharmony_ci    def handle_error(self):
10277db96d56Sopenharmony_ci        raise
10287db96d56Sopenharmony_ci
10297db96d56Sopenharmony_ci
10307db96d56Sopenharmony_ci# Test various SMTP & ESMTP commands/behaviors that require a simulated server
10317db96d56Sopenharmony_ci# (i.e., something with more features than DebuggingServer)
10327db96d56Sopenharmony_ciclass SMTPSimTests(unittest.TestCase):
10337db96d56Sopenharmony_ci
10347db96d56Sopenharmony_ci    def setUp(self):
10357db96d56Sopenharmony_ci        self.thread_key = threading_helper.threading_setup()
10367db96d56Sopenharmony_ci        self.real_getfqdn = socket.getfqdn
10377db96d56Sopenharmony_ci        socket.getfqdn = mock_socket.getfqdn
10387db96d56Sopenharmony_ci        self.serv_evt = threading.Event()
10397db96d56Sopenharmony_ci        self.client_evt = threading.Event()
10407db96d56Sopenharmony_ci        # Pick a random unused port by passing 0 for the port number
10417db96d56Sopenharmony_ci        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
10427db96d56Sopenharmony_ci        # Keep a note of what port was assigned
10437db96d56Sopenharmony_ci        self.port = self.serv.socket.getsockname()[1]
10447db96d56Sopenharmony_ci        serv_args = (self.serv, self.serv_evt, self.client_evt)
10457db96d56Sopenharmony_ci        self.thread = threading.Thread(target=debugging_server, args=serv_args)
10467db96d56Sopenharmony_ci        self.thread.start()
10477db96d56Sopenharmony_ci
10487db96d56Sopenharmony_ci        # wait until server thread has assigned a port number
10497db96d56Sopenharmony_ci        self.serv_evt.wait()
10507db96d56Sopenharmony_ci        self.serv_evt.clear()
10517db96d56Sopenharmony_ci
10527db96d56Sopenharmony_ci    def tearDown(self):
10537db96d56Sopenharmony_ci        socket.getfqdn = self.real_getfqdn
10547db96d56Sopenharmony_ci        # indicate that the client is finished
10557db96d56Sopenharmony_ci        self.client_evt.set()
10567db96d56Sopenharmony_ci        # wait for the server thread to terminate
10577db96d56Sopenharmony_ci        self.serv_evt.wait()
10587db96d56Sopenharmony_ci        threading_helper.join_thread(self.thread)
10597db96d56Sopenharmony_ci        del self.thread
10607db96d56Sopenharmony_ci        self.doCleanups()
10617db96d56Sopenharmony_ci        threading_helper.threading_cleanup(*self.thread_key)
10627db96d56Sopenharmony_ci
10637db96d56Sopenharmony_ci    def testBasic(self):
10647db96d56Sopenharmony_ci        # smoke test
10657db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
10667db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
10677db96d56Sopenharmony_ci        smtp.quit()
10687db96d56Sopenharmony_ci
10697db96d56Sopenharmony_ci    def testEHLO(self):
10707db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
10717db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
10727db96d56Sopenharmony_ci
10737db96d56Sopenharmony_ci        # no features should be present before the EHLO
10747db96d56Sopenharmony_ci        self.assertEqual(smtp.esmtp_features, {})
10757db96d56Sopenharmony_ci
10767db96d56Sopenharmony_ci        # features expected from the test server
10777db96d56Sopenharmony_ci        expected_features = {'expn':'',
10787db96d56Sopenharmony_ci                             'size': '20000000',
10797db96d56Sopenharmony_ci                             'starttls': '',
10807db96d56Sopenharmony_ci                             'deliverby': '',
10817db96d56Sopenharmony_ci                             'help': '',
10827db96d56Sopenharmony_ci                             }
10837db96d56Sopenharmony_ci
10847db96d56Sopenharmony_ci        smtp.ehlo()
10857db96d56Sopenharmony_ci        self.assertEqual(smtp.esmtp_features, expected_features)
10867db96d56Sopenharmony_ci        for k in expected_features:
10877db96d56Sopenharmony_ci            self.assertTrue(smtp.has_extn(k))
10887db96d56Sopenharmony_ci        self.assertFalse(smtp.has_extn('unsupported-feature'))
10897db96d56Sopenharmony_ci        smtp.quit()
10907db96d56Sopenharmony_ci
10917db96d56Sopenharmony_ci    def testVRFY(self):
10927db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
10937db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
10947db96d56Sopenharmony_ci
10957db96d56Sopenharmony_ci        for addr_spec, name in sim_users.items():
10967db96d56Sopenharmony_ci            expected_known = (250, bytes('%s %s' %
10977db96d56Sopenharmony_ci                                         (name, smtplib.quoteaddr(addr_spec)),
10987db96d56Sopenharmony_ci                                         "ascii"))
10997db96d56Sopenharmony_ci            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
11007db96d56Sopenharmony_ci
11017db96d56Sopenharmony_ci        u = 'nobody@nowhere.com'
11027db96d56Sopenharmony_ci        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
11037db96d56Sopenharmony_ci        self.assertEqual(smtp.vrfy(u), expected_unknown)
11047db96d56Sopenharmony_ci        smtp.quit()
11057db96d56Sopenharmony_ci
11067db96d56Sopenharmony_ci    def testEXPN(self):
11077db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
11087db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
11097db96d56Sopenharmony_ci
11107db96d56Sopenharmony_ci        for listname, members in sim_lists.items():
11117db96d56Sopenharmony_ci            users = []
11127db96d56Sopenharmony_ci            for m in members:
11137db96d56Sopenharmony_ci                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
11147db96d56Sopenharmony_ci            expected_known = (250, bytes('\n'.join(users), "ascii"))
11157db96d56Sopenharmony_ci            self.assertEqual(smtp.expn(listname), expected_known)
11167db96d56Sopenharmony_ci
11177db96d56Sopenharmony_ci        u = 'PSU-Members-List'
11187db96d56Sopenharmony_ci        expected_unknown = (550, b'No access for you!')
11197db96d56Sopenharmony_ci        self.assertEqual(smtp.expn(u), expected_unknown)
11207db96d56Sopenharmony_ci        smtp.quit()
11217db96d56Sopenharmony_ci
11227db96d56Sopenharmony_ci    def testAUTH_PLAIN(self):
11237db96d56Sopenharmony_ci        self.serv.add_feature("AUTH PLAIN")
11247db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
11257db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
11267db96d56Sopenharmony_ci        resp = smtp.login(sim_auth[0], sim_auth[1])
11277db96d56Sopenharmony_ci        self.assertEqual(resp, (235, b'Authentication Succeeded'))
11287db96d56Sopenharmony_ci        smtp.close()
11297db96d56Sopenharmony_ci
11307db96d56Sopenharmony_ci    def testAUTH_LOGIN(self):
11317db96d56Sopenharmony_ci        self.serv.add_feature("AUTH LOGIN")
11327db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
11337db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
11347db96d56Sopenharmony_ci        resp = smtp.login(sim_auth[0], sim_auth[1])
11357db96d56Sopenharmony_ci        self.assertEqual(resp, (235, b'Authentication Succeeded'))
11367db96d56Sopenharmony_ci        smtp.close()
11377db96d56Sopenharmony_ci
11387db96d56Sopenharmony_ci    def testAUTH_LOGIN_initial_response_ok(self):
11397db96d56Sopenharmony_ci        self.serv.add_feature("AUTH LOGIN")
11407db96d56Sopenharmony_ci        with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
11417db96d56Sopenharmony_ci                          timeout=support.LOOPBACK_TIMEOUT) as smtp:
11427db96d56Sopenharmony_ci            smtp.user, smtp.password = sim_auth
11437db96d56Sopenharmony_ci            smtp.ehlo("test_auth_login")
11447db96d56Sopenharmony_ci            resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True)
11457db96d56Sopenharmony_ci            self.assertEqual(resp, (235, b'Authentication Succeeded'))
11467db96d56Sopenharmony_ci
11477db96d56Sopenharmony_ci    def testAUTH_LOGIN_initial_response_notok(self):
11487db96d56Sopenharmony_ci        self.serv.add_feature("AUTH LOGIN")
11497db96d56Sopenharmony_ci        with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
11507db96d56Sopenharmony_ci                          timeout=support.LOOPBACK_TIMEOUT) as smtp:
11517db96d56Sopenharmony_ci            smtp.user, smtp.password = sim_auth
11527db96d56Sopenharmony_ci            smtp.ehlo("test_auth_login")
11537db96d56Sopenharmony_ci            resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False)
11547db96d56Sopenharmony_ci            self.assertEqual(resp, (235, b'Authentication Succeeded'))
11557db96d56Sopenharmony_ci
11567db96d56Sopenharmony_ci    def testAUTH_BUGGY(self):
11577db96d56Sopenharmony_ci        self.serv.add_feature("AUTH BUGGY")
11587db96d56Sopenharmony_ci
11597db96d56Sopenharmony_ci        def auth_buggy(challenge=None):
11607db96d56Sopenharmony_ci            self.assertEqual(b"BuGgYbUgGy", challenge)
11617db96d56Sopenharmony_ci            return "\0"
11627db96d56Sopenharmony_ci
11637db96d56Sopenharmony_ci        smtp = smtplib.SMTP(
11647db96d56Sopenharmony_ci            HOST, self.port, local_hostname='localhost',
11657db96d56Sopenharmony_ci            timeout=support.LOOPBACK_TIMEOUT
11667db96d56Sopenharmony_ci        )
11677db96d56Sopenharmony_ci        try:
11687db96d56Sopenharmony_ci            smtp.user, smtp.password = sim_auth
11697db96d56Sopenharmony_ci            smtp.ehlo("test_auth_buggy")
11707db96d56Sopenharmony_ci            expect = r"^Server AUTH mechanism infinite loop.*"
11717db96d56Sopenharmony_ci            with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm:
11727db96d56Sopenharmony_ci                smtp.auth("BUGGY", auth_buggy, initial_response_ok=False)
11737db96d56Sopenharmony_ci        finally:
11747db96d56Sopenharmony_ci            smtp.close()
11757db96d56Sopenharmony_ci
11767db96d56Sopenharmony_ci    @hashlib_helper.requires_hashdigest('md5', openssl=True)
11777db96d56Sopenharmony_ci    def testAUTH_CRAM_MD5(self):
11787db96d56Sopenharmony_ci        self.serv.add_feature("AUTH CRAM-MD5")
11797db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
11807db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
11817db96d56Sopenharmony_ci        resp = smtp.login(sim_auth[0], sim_auth[1])
11827db96d56Sopenharmony_ci        self.assertEqual(resp, (235, b'Authentication Succeeded'))
11837db96d56Sopenharmony_ci        smtp.close()
11847db96d56Sopenharmony_ci
11857db96d56Sopenharmony_ci    @hashlib_helper.requires_hashdigest('md5', openssl=True)
11867db96d56Sopenharmony_ci    def testAUTH_multiple(self):
11877db96d56Sopenharmony_ci        # Test that multiple authentication methods are tried.
11887db96d56Sopenharmony_ci        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
11897db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
11907db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
11917db96d56Sopenharmony_ci        resp = smtp.login(sim_auth[0], sim_auth[1])
11927db96d56Sopenharmony_ci        self.assertEqual(resp, (235, b'Authentication Succeeded'))
11937db96d56Sopenharmony_ci        smtp.close()
11947db96d56Sopenharmony_ci
11957db96d56Sopenharmony_ci    def test_auth_function(self):
11967db96d56Sopenharmony_ci        supported = {'PLAIN', 'LOGIN'}
11977db96d56Sopenharmony_ci        try:
11987db96d56Sopenharmony_ci            hashlib.md5()
11997db96d56Sopenharmony_ci        except ValueError:
12007db96d56Sopenharmony_ci            pass
12017db96d56Sopenharmony_ci        else:
12027db96d56Sopenharmony_ci            supported.add('CRAM-MD5')
12037db96d56Sopenharmony_ci        for mechanism in supported:
12047db96d56Sopenharmony_ci            self.serv.add_feature("AUTH {}".format(mechanism))
12057db96d56Sopenharmony_ci        for mechanism in supported:
12067db96d56Sopenharmony_ci            with self.subTest(mechanism=mechanism):
12077db96d56Sopenharmony_ci                smtp = smtplib.SMTP(HOST, self.port,
12087db96d56Sopenharmony_ci                                    local_hostname='localhost',
12097db96d56Sopenharmony_ci                                    timeout=support.LOOPBACK_TIMEOUT)
12107db96d56Sopenharmony_ci                smtp.ehlo('foo')
12117db96d56Sopenharmony_ci                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
12127db96d56Sopenharmony_ci                method = 'auth_' + mechanism.lower().replace('-', '_')
12137db96d56Sopenharmony_ci                resp = smtp.auth(mechanism, getattr(smtp, method))
12147db96d56Sopenharmony_ci                self.assertEqual(resp, (235, b'Authentication Succeeded'))
12157db96d56Sopenharmony_ci                smtp.close()
12167db96d56Sopenharmony_ci
12177db96d56Sopenharmony_ci    def test_quit_resets_greeting(self):
12187db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port,
12197db96d56Sopenharmony_ci                            local_hostname='localhost',
12207db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
12217db96d56Sopenharmony_ci        code, message = smtp.ehlo()
12227db96d56Sopenharmony_ci        self.assertEqual(code, 250)
12237db96d56Sopenharmony_ci        self.assertIn('size', smtp.esmtp_features)
12247db96d56Sopenharmony_ci        smtp.quit()
12257db96d56Sopenharmony_ci        self.assertNotIn('size', smtp.esmtp_features)
12267db96d56Sopenharmony_ci        smtp.connect(HOST, self.port)
12277db96d56Sopenharmony_ci        self.assertNotIn('size', smtp.esmtp_features)
12287db96d56Sopenharmony_ci        smtp.ehlo_or_helo_if_needed()
12297db96d56Sopenharmony_ci        self.assertIn('size', smtp.esmtp_features)
12307db96d56Sopenharmony_ci        smtp.quit()
12317db96d56Sopenharmony_ci
12327db96d56Sopenharmony_ci    def test_with_statement(self):
12337db96d56Sopenharmony_ci        with smtplib.SMTP(HOST, self.port) as smtp:
12347db96d56Sopenharmony_ci            code, message = smtp.noop()
12357db96d56Sopenharmony_ci            self.assertEqual(code, 250)
12367db96d56Sopenharmony_ci        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
12377db96d56Sopenharmony_ci        with smtplib.SMTP(HOST, self.port) as smtp:
12387db96d56Sopenharmony_ci            smtp.close()
12397db96d56Sopenharmony_ci        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
12407db96d56Sopenharmony_ci
12417db96d56Sopenharmony_ci    def test_with_statement_QUIT_failure(self):
12427db96d56Sopenharmony_ci        with self.assertRaises(smtplib.SMTPResponseException) as error:
12437db96d56Sopenharmony_ci            with smtplib.SMTP(HOST, self.port) as smtp:
12447db96d56Sopenharmony_ci                smtp.noop()
12457db96d56Sopenharmony_ci                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
12467db96d56Sopenharmony_ci        self.assertEqual(error.exception.smtp_code, 421)
12477db96d56Sopenharmony_ci        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
12487db96d56Sopenharmony_ci
12497db96d56Sopenharmony_ci    #TODO: add tests for correct AUTH method fallback now that the
12507db96d56Sopenharmony_ci    #test infrastructure can support it.
12517db96d56Sopenharmony_ci
12527db96d56Sopenharmony_ci    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
12537db96d56Sopenharmony_ci    def test__rest_from_mail_cmd(self):
12547db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
12557db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
12567db96d56Sopenharmony_ci        smtp.noop()
12577db96d56Sopenharmony_ci        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
12587db96d56Sopenharmony_ci        self.serv._SMTPchannel.disconnect = True
12597db96d56Sopenharmony_ci        with self.assertRaises(smtplib.SMTPSenderRefused):
12607db96d56Sopenharmony_ci            smtp.sendmail('John', 'Sally', 'test message')
12617db96d56Sopenharmony_ci        self.assertIsNone(smtp.sock)
12627db96d56Sopenharmony_ci
12637db96d56Sopenharmony_ci    # Issue 5713: make sure close, not rset, is called if we get a 421 error
12647db96d56Sopenharmony_ci    def test_421_from_mail_cmd(self):
12657db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
12667db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
12677db96d56Sopenharmony_ci        smtp.noop()
12687db96d56Sopenharmony_ci        self.serv._SMTPchannel.mail_response = '421 closing connection'
12697db96d56Sopenharmony_ci        with self.assertRaises(smtplib.SMTPSenderRefused):
12707db96d56Sopenharmony_ci            smtp.sendmail('John', 'Sally', 'test message')
12717db96d56Sopenharmony_ci        self.assertIsNone(smtp.sock)
12727db96d56Sopenharmony_ci        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
12737db96d56Sopenharmony_ci
12747db96d56Sopenharmony_ci    def test_421_from_rcpt_cmd(self):
12757db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
12767db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
12777db96d56Sopenharmony_ci        smtp.noop()
12787db96d56Sopenharmony_ci        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
12797db96d56Sopenharmony_ci        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
12807db96d56Sopenharmony_ci            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
12817db96d56Sopenharmony_ci        self.assertIsNone(smtp.sock)
12827db96d56Sopenharmony_ci        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
12837db96d56Sopenharmony_ci        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
12847db96d56Sopenharmony_ci
12857db96d56Sopenharmony_ci    def test_421_from_data_cmd(self):
12867db96d56Sopenharmony_ci        class MySimSMTPChannel(SimSMTPChannel):
12877db96d56Sopenharmony_ci            def found_terminator(self):
12887db96d56Sopenharmony_ci                if self.smtp_state == self.DATA:
12897db96d56Sopenharmony_ci                    self.push('421 closing')
12907db96d56Sopenharmony_ci                else:
12917db96d56Sopenharmony_ci                    super().found_terminator()
12927db96d56Sopenharmony_ci        self.serv.channel_class = MySimSMTPChannel
12937db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
12947db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
12957db96d56Sopenharmony_ci        smtp.noop()
12967db96d56Sopenharmony_ci        with self.assertRaises(smtplib.SMTPDataError):
12977db96d56Sopenharmony_ci            smtp.sendmail('John@foo.org', ['Sally@foo.org'], 'test message')
12987db96d56Sopenharmony_ci        self.assertIsNone(smtp.sock)
12997db96d56Sopenharmony_ci        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
13007db96d56Sopenharmony_ci
13017db96d56Sopenharmony_ci    def test_smtputf8_NotSupportedError_if_no_server_support(self):
13027db96d56Sopenharmony_ci        smtp = smtplib.SMTP(
13037db96d56Sopenharmony_ci            HOST, self.port, local_hostname='localhost',
13047db96d56Sopenharmony_ci            timeout=support.LOOPBACK_TIMEOUT)
13057db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
13067db96d56Sopenharmony_ci        smtp.ehlo()
13077db96d56Sopenharmony_ci        self.assertTrue(smtp.does_esmtp)
13087db96d56Sopenharmony_ci        self.assertFalse(smtp.has_extn('smtputf8'))
13097db96d56Sopenharmony_ci        self.assertRaises(
13107db96d56Sopenharmony_ci            smtplib.SMTPNotSupportedError,
13117db96d56Sopenharmony_ci            smtp.sendmail,
13127db96d56Sopenharmony_ci            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
13137db96d56Sopenharmony_ci        self.assertRaises(
13147db96d56Sopenharmony_ci            smtplib.SMTPNotSupportedError,
13157db96d56Sopenharmony_ci            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
13167db96d56Sopenharmony_ci
13177db96d56Sopenharmony_ci    def test_send_unicode_without_SMTPUTF8(self):
13187db96d56Sopenharmony_ci        smtp = smtplib.SMTP(
13197db96d56Sopenharmony_ci            HOST, self.port, local_hostname='localhost',
13207db96d56Sopenharmony_ci            timeout=support.LOOPBACK_TIMEOUT)
13217db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
13227db96d56Sopenharmony_ci        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
13237db96d56Sopenharmony_ci        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
13247db96d56Sopenharmony_ci
13257db96d56Sopenharmony_ci    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
13267db96d56Sopenharmony_ci        # This test is located here and not in the SMTPUTF8SimTests
13277db96d56Sopenharmony_ci        # class because it needs a "regular" SMTP server to work
13287db96d56Sopenharmony_ci        msg = EmailMessage()
13297db96d56Sopenharmony_ci        msg['From'] = "Páolo <főo@bar.com>"
13307db96d56Sopenharmony_ci        msg['To'] = 'Dinsdale'
13317db96d56Sopenharmony_ci        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
13327db96d56Sopenharmony_ci        smtp = smtplib.SMTP(
13337db96d56Sopenharmony_ci            HOST, self.port, local_hostname='localhost',
13347db96d56Sopenharmony_ci            timeout=support.LOOPBACK_TIMEOUT)
13357db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
13367db96d56Sopenharmony_ci        with self.assertRaises(smtplib.SMTPNotSupportedError):
13377db96d56Sopenharmony_ci            smtp.send_message(msg)
13387db96d56Sopenharmony_ci
13397db96d56Sopenharmony_ci    def test_name_field_not_included_in_envelop_addresses(self):
13407db96d56Sopenharmony_ci        smtp = smtplib.SMTP(
13417db96d56Sopenharmony_ci            HOST, self.port, local_hostname='localhost',
13427db96d56Sopenharmony_ci            timeout=support.LOOPBACK_TIMEOUT)
13437db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
13447db96d56Sopenharmony_ci
13457db96d56Sopenharmony_ci        message = EmailMessage()
13467db96d56Sopenharmony_ci        message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
13477db96d56Sopenharmony_ci        message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
13487db96d56Sopenharmony_ci
13497db96d56Sopenharmony_ci        self.assertDictEqual(smtp.send_message(message), {})
13507db96d56Sopenharmony_ci
13517db96d56Sopenharmony_ci        self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
13527db96d56Sopenharmony_ci        self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
13537db96d56Sopenharmony_ci
13547db96d56Sopenharmony_ci
13557db96d56Sopenharmony_ciclass SimSMTPUTF8Server(SimSMTPServer):
13567db96d56Sopenharmony_ci
13577db96d56Sopenharmony_ci    def __init__(self, *args, **kw):
13587db96d56Sopenharmony_ci        # The base SMTP server turns these on automatically, but our test
13597db96d56Sopenharmony_ci        # server is set up to munge the EHLO response, so we need to provide
13607db96d56Sopenharmony_ci        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
13617db96d56Sopenharmony_ci        self._extra_features = ['SMTPUTF8', '8BITMIME']
13627db96d56Sopenharmony_ci        smtpd.SMTPServer.__init__(self, *args, **kw)
13637db96d56Sopenharmony_ci
13647db96d56Sopenharmony_ci    def handle_accepted(self, conn, addr):
13657db96d56Sopenharmony_ci        self._SMTPchannel = self.channel_class(
13667db96d56Sopenharmony_ci            self._extra_features, self, conn, addr,
13677db96d56Sopenharmony_ci            decode_data=self._decode_data,
13687db96d56Sopenharmony_ci            enable_SMTPUTF8=self.enable_SMTPUTF8,
13697db96d56Sopenharmony_ci        )
13707db96d56Sopenharmony_ci
13717db96d56Sopenharmony_ci    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
13727db96d56Sopenharmony_ci                                                             rcpt_options=None):
13737db96d56Sopenharmony_ci        self.last_peer = peer
13747db96d56Sopenharmony_ci        self.last_mailfrom = mailfrom
13757db96d56Sopenharmony_ci        self.last_rcpttos = rcpttos
13767db96d56Sopenharmony_ci        self.last_message = data
13777db96d56Sopenharmony_ci        self.last_mail_options = mail_options
13787db96d56Sopenharmony_ci        self.last_rcpt_options = rcpt_options
13797db96d56Sopenharmony_ci
13807db96d56Sopenharmony_ci
13817db96d56Sopenharmony_ciclass SMTPUTF8SimTests(unittest.TestCase):
13827db96d56Sopenharmony_ci
13837db96d56Sopenharmony_ci    maxDiff = None
13847db96d56Sopenharmony_ci
13857db96d56Sopenharmony_ci    def setUp(self):
13867db96d56Sopenharmony_ci        self.thread_key = threading_helper.threading_setup()
13877db96d56Sopenharmony_ci        self.real_getfqdn = socket.getfqdn
13887db96d56Sopenharmony_ci        socket.getfqdn = mock_socket.getfqdn
13897db96d56Sopenharmony_ci        self.serv_evt = threading.Event()
13907db96d56Sopenharmony_ci        self.client_evt = threading.Event()
13917db96d56Sopenharmony_ci        # Pick a random unused port by passing 0 for the port number
13927db96d56Sopenharmony_ci        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
13937db96d56Sopenharmony_ci                                      decode_data=False,
13947db96d56Sopenharmony_ci                                      enable_SMTPUTF8=True)
13957db96d56Sopenharmony_ci        # Keep a note of what port was assigned
13967db96d56Sopenharmony_ci        self.port = self.serv.socket.getsockname()[1]
13977db96d56Sopenharmony_ci        serv_args = (self.serv, self.serv_evt, self.client_evt)
13987db96d56Sopenharmony_ci        self.thread = threading.Thread(target=debugging_server, args=serv_args)
13997db96d56Sopenharmony_ci        self.thread.start()
14007db96d56Sopenharmony_ci
14017db96d56Sopenharmony_ci        # wait until server thread has assigned a port number
14027db96d56Sopenharmony_ci        self.serv_evt.wait()
14037db96d56Sopenharmony_ci        self.serv_evt.clear()
14047db96d56Sopenharmony_ci
14057db96d56Sopenharmony_ci    def tearDown(self):
14067db96d56Sopenharmony_ci        socket.getfqdn = self.real_getfqdn
14077db96d56Sopenharmony_ci        # indicate that the client is finished
14087db96d56Sopenharmony_ci        self.client_evt.set()
14097db96d56Sopenharmony_ci        # wait for the server thread to terminate
14107db96d56Sopenharmony_ci        self.serv_evt.wait()
14117db96d56Sopenharmony_ci        threading_helper.join_thread(self.thread)
14127db96d56Sopenharmony_ci        del self.thread
14137db96d56Sopenharmony_ci        self.doCleanups()
14147db96d56Sopenharmony_ci        threading_helper.threading_cleanup(*self.thread_key)
14157db96d56Sopenharmony_ci
14167db96d56Sopenharmony_ci    def test_test_server_supports_extensions(self):
14177db96d56Sopenharmony_ci        smtp = smtplib.SMTP(
14187db96d56Sopenharmony_ci            HOST, self.port, local_hostname='localhost',
14197db96d56Sopenharmony_ci            timeout=support.LOOPBACK_TIMEOUT)
14207db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
14217db96d56Sopenharmony_ci        smtp.ehlo()
14227db96d56Sopenharmony_ci        self.assertTrue(smtp.does_esmtp)
14237db96d56Sopenharmony_ci        self.assertTrue(smtp.has_extn('smtputf8'))
14247db96d56Sopenharmony_ci
14257db96d56Sopenharmony_ci    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
14267db96d56Sopenharmony_ci        m = '¡a test message containing unicode!'.encode('utf-8')
14277db96d56Sopenharmony_ci        smtp = smtplib.SMTP(
14287db96d56Sopenharmony_ci            HOST, self.port, local_hostname='localhost',
14297db96d56Sopenharmony_ci            timeout=support.LOOPBACK_TIMEOUT)
14307db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
14317db96d56Sopenharmony_ci        smtp.sendmail('Jőhn', 'Sálly', m,
14327db96d56Sopenharmony_ci                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
14337db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
14347db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
14357db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_message, m)
14367db96d56Sopenharmony_ci        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
14377db96d56Sopenharmony_ci        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
14387db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_rcpt_options, [])
14397db96d56Sopenharmony_ci
14407db96d56Sopenharmony_ci    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
14417db96d56Sopenharmony_ci        m = '¡a test message containing unicode!'.encode('utf-8')
14427db96d56Sopenharmony_ci        smtp = smtplib.SMTP(
14437db96d56Sopenharmony_ci            HOST, self.port, local_hostname='localhost',
14447db96d56Sopenharmony_ci            timeout=support.LOOPBACK_TIMEOUT)
14457db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
14467db96d56Sopenharmony_ci        smtp.ehlo()
14477db96d56Sopenharmony_ci        self.assertEqual(
14487db96d56Sopenharmony_ci            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
14497db96d56Sopenharmony_ci            (250, b'OK'))
14507db96d56Sopenharmony_ci        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
14517db96d56Sopenharmony_ci        self.assertEqual(smtp.data(m), (250, b'OK'))
14527db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_mailfrom, 'Jő')
14537db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_rcpttos, ['János'])
14547db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_message, m)
14557db96d56Sopenharmony_ci        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
14567db96d56Sopenharmony_ci        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
14577db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_rcpt_options, [])
14587db96d56Sopenharmony_ci
14597db96d56Sopenharmony_ci    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
14607db96d56Sopenharmony_ci        msg = EmailMessage()
14617db96d56Sopenharmony_ci        msg['From'] = "Páolo <főo@bar.com>"
14627db96d56Sopenharmony_ci        msg['To'] = 'Dinsdale'
14637db96d56Sopenharmony_ci        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
14647db96d56Sopenharmony_ci        # XXX I don't know why I need two \n's here, but this is an existing
14657db96d56Sopenharmony_ci        # bug (if it is one) and not a problem with the new functionality.
14667db96d56Sopenharmony_ci        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
14677db96d56Sopenharmony_ci        # XXX smtpd converts received /r/n to /n, so we can't easily test that
14687db96d56Sopenharmony_ci        # we are successfully sending /r/n :(.
14697db96d56Sopenharmony_ci        expected = textwrap.dedent("""\
14707db96d56Sopenharmony_ci            From: Páolo <főo@bar.com>
14717db96d56Sopenharmony_ci            To: Dinsdale
14727db96d56Sopenharmony_ci            Subject: Nudge nudge, wink, wink \u1F609
14737db96d56Sopenharmony_ci            Content-Type: text/plain; charset="utf-8"
14747db96d56Sopenharmony_ci            Content-Transfer-Encoding: 8bit
14757db96d56Sopenharmony_ci            MIME-Version: 1.0
14767db96d56Sopenharmony_ci
14777db96d56Sopenharmony_ci            oh là là, know what I mean, know what I mean?
14787db96d56Sopenharmony_ci            """)
14797db96d56Sopenharmony_ci        smtp = smtplib.SMTP(
14807db96d56Sopenharmony_ci            HOST, self.port, local_hostname='localhost',
14817db96d56Sopenharmony_ci            timeout=support.LOOPBACK_TIMEOUT)
14827db96d56Sopenharmony_ci        self.addCleanup(smtp.close)
14837db96d56Sopenharmony_ci        self.assertEqual(smtp.send_message(msg), {})
14847db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_mailfrom, 'főo@bar.com')
14857db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
14867db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_message.decode(), expected)
14877db96d56Sopenharmony_ci        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
14887db96d56Sopenharmony_ci        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
14897db96d56Sopenharmony_ci        self.assertEqual(self.serv.last_rcpt_options, [])
14907db96d56Sopenharmony_ci
14917db96d56Sopenharmony_ci
14927db96d56Sopenharmony_ciEXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
14937db96d56Sopenharmony_ci
14947db96d56Sopenharmony_ciclass SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
14957db96d56Sopenharmony_ci    def smtp_AUTH(self, arg):
14967db96d56Sopenharmony_ci        # RFC 4954's AUTH command allows for an optional initial-response.
14977db96d56Sopenharmony_ci        # Not all AUTH methods support this; some require a challenge.  AUTH
14987db96d56Sopenharmony_ci        # PLAIN does those, so test that here.  See issue #15014.
14997db96d56Sopenharmony_ci        args = arg.split()
15007db96d56Sopenharmony_ci        if args[0].lower() == 'plain':
15017db96d56Sopenharmony_ci            if len(args) == 2:
15027db96d56Sopenharmony_ci                # AUTH PLAIN <initial-response> with the response base 64
15037db96d56Sopenharmony_ci                # encoded.  Hard code the expected response for the test.
15047db96d56Sopenharmony_ci                if args[1] == EXPECTED_RESPONSE:
15057db96d56Sopenharmony_ci                    self.push('235 Ok')
15067db96d56Sopenharmony_ci                    return
15077db96d56Sopenharmony_ci        self.push('571 Bad authentication')
15087db96d56Sopenharmony_ci
15097db96d56Sopenharmony_ciclass SimSMTPAUTHInitialResponseServer(SimSMTPServer):
15107db96d56Sopenharmony_ci    channel_class = SimSMTPAUTHInitialResponseChannel
15117db96d56Sopenharmony_ci
15127db96d56Sopenharmony_ci
15137db96d56Sopenharmony_ciclass SMTPAUTHInitialResponseSimTests(unittest.TestCase):
15147db96d56Sopenharmony_ci    def setUp(self):
15157db96d56Sopenharmony_ci        self.thread_key = threading_helper.threading_setup()
15167db96d56Sopenharmony_ci        self.real_getfqdn = socket.getfqdn
15177db96d56Sopenharmony_ci        socket.getfqdn = mock_socket.getfqdn
15187db96d56Sopenharmony_ci        self.serv_evt = threading.Event()
15197db96d56Sopenharmony_ci        self.client_evt = threading.Event()
15207db96d56Sopenharmony_ci        # Pick a random unused port by passing 0 for the port number
15217db96d56Sopenharmony_ci        self.serv = SimSMTPAUTHInitialResponseServer(
15227db96d56Sopenharmony_ci            (HOST, 0), ('nowhere', -1), decode_data=True)
15237db96d56Sopenharmony_ci        # Keep a note of what port was assigned
15247db96d56Sopenharmony_ci        self.port = self.serv.socket.getsockname()[1]
15257db96d56Sopenharmony_ci        serv_args = (self.serv, self.serv_evt, self.client_evt)
15267db96d56Sopenharmony_ci        self.thread = threading.Thread(target=debugging_server, args=serv_args)
15277db96d56Sopenharmony_ci        self.thread.start()
15287db96d56Sopenharmony_ci
15297db96d56Sopenharmony_ci        # wait until server thread has assigned a port number
15307db96d56Sopenharmony_ci        self.serv_evt.wait()
15317db96d56Sopenharmony_ci        self.serv_evt.clear()
15327db96d56Sopenharmony_ci
15337db96d56Sopenharmony_ci    def tearDown(self):
15347db96d56Sopenharmony_ci        socket.getfqdn = self.real_getfqdn
15357db96d56Sopenharmony_ci        # indicate that the client is finished
15367db96d56Sopenharmony_ci        self.client_evt.set()
15377db96d56Sopenharmony_ci        # wait for the server thread to terminate
15387db96d56Sopenharmony_ci        self.serv_evt.wait()
15397db96d56Sopenharmony_ci        threading_helper.join_thread(self.thread)
15407db96d56Sopenharmony_ci        del self.thread
15417db96d56Sopenharmony_ci        self.doCleanups()
15427db96d56Sopenharmony_ci        threading_helper.threading_cleanup(*self.thread_key)
15437db96d56Sopenharmony_ci
15447db96d56Sopenharmony_ci    def testAUTH_PLAIN_initial_response_login(self):
15457db96d56Sopenharmony_ci        self.serv.add_feature('AUTH PLAIN')
15467db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
15477db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
15487db96d56Sopenharmony_ci        smtp.login('psu', 'doesnotexist')
15497db96d56Sopenharmony_ci        smtp.close()
15507db96d56Sopenharmony_ci
15517db96d56Sopenharmony_ci    def testAUTH_PLAIN_initial_response_auth(self):
15527db96d56Sopenharmony_ci        self.serv.add_feature('AUTH PLAIN')
15537db96d56Sopenharmony_ci        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
15547db96d56Sopenharmony_ci                            timeout=support.LOOPBACK_TIMEOUT)
15557db96d56Sopenharmony_ci        smtp.user = 'psu'
15567db96d56Sopenharmony_ci        smtp.password = 'doesnotexist'
15577db96d56Sopenharmony_ci        code, response = smtp.auth('plain', smtp.auth_plain)
15587db96d56Sopenharmony_ci        smtp.close()
15597db96d56Sopenharmony_ci        self.assertEqual(code, 235)
15607db96d56Sopenharmony_ci
15617db96d56Sopenharmony_ci
15627db96d56Sopenharmony_ciif __name__ == '__main__':
15637db96d56Sopenharmony_ci    unittest.main()
1564