17db96d56Sopenharmony_ciimport unittest
27db96d56Sopenharmony_ciimport textwrap
37db96d56Sopenharmony_cifrom test import support, mock_socket
47db96d56Sopenharmony_cifrom test.support import socket_helper
57db96d56Sopenharmony_cifrom test.support import warnings_helper
67db96d56Sopenharmony_ciimport socket
77db96d56Sopenharmony_ciimport io
87db96d56Sopenharmony_ci
97db96d56Sopenharmony_ci
107db96d56Sopenharmony_cismtpd = warnings_helper.import_deprecated('smtpd')
117db96d56Sopenharmony_ciasyncore = warnings_helper.import_deprecated('asyncore')
127db96d56Sopenharmony_ci
137db96d56Sopenharmony_ciif not socket_helper.has_gethostname:
147db96d56Sopenharmony_ci    raise unittest.SkipTest("test requires gethostname()")
157db96d56Sopenharmony_ci
167db96d56Sopenharmony_ci
177db96d56Sopenharmony_ciclass DummyServer(smtpd.SMTPServer):
187db96d56Sopenharmony_ci    def __init__(self, *args, **kwargs):
197db96d56Sopenharmony_ci        smtpd.SMTPServer.__init__(self, *args, **kwargs)
207db96d56Sopenharmony_ci        self.messages = []
217db96d56Sopenharmony_ci        if self._decode_data:
227db96d56Sopenharmony_ci            self.return_status = 'return status'
237db96d56Sopenharmony_ci        else:
247db96d56Sopenharmony_ci            self.return_status = b'return status'
257db96d56Sopenharmony_ci
267db96d56Sopenharmony_ci    def process_message(self, peer, mailfrom, rcpttos, data, **kw):
277db96d56Sopenharmony_ci        self.messages.append((peer, mailfrom, rcpttos, data))
287db96d56Sopenharmony_ci        if data == self.return_status:
297db96d56Sopenharmony_ci            return '250 Okish'
307db96d56Sopenharmony_ci        if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']:
317db96d56Sopenharmony_ci            return '250 SMTPUTF8 message okish'
327db96d56Sopenharmony_ci
337db96d56Sopenharmony_ci
347db96d56Sopenharmony_ciclass DummyDispatcherBroken(Exception):
357db96d56Sopenharmony_ci    pass
367db96d56Sopenharmony_ci
377db96d56Sopenharmony_ci
387db96d56Sopenharmony_ciclass BrokenDummyServer(DummyServer):
397db96d56Sopenharmony_ci    def listen(self, num):
407db96d56Sopenharmony_ci        raise DummyDispatcherBroken()
417db96d56Sopenharmony_ci
427db96d56Sopenharmony_ci
437db96d56Sopenharmony_ciclass SMTPDServerTest(unittest.TestCase):
447db96d56Sopenharmony_ci    def setUp(self):
457db96d56Sopenharmony_ci        smtpd.socket = asyncore.socket = mock_socket
467db96d56Sopenharmony_ci
477db96d56Sopenharmony_ci    def test_process_message_unimplemented(self):
487db96d56Sopenharmony_ci        server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0),
497db96d56Sopenharmony_ci                                  decode_data=True)
507db96d56Sopenharmony_ci        conn, addr = server.accept()
517db96d56Sopenharmony_ci        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ci        def write_line(line):
547db96d56Sopenharmony_ci            channel.socket.queue_recv(line)
557db96d56Sopenharmony_ci            channel.handle_read()
567db96d56Sopenharmony_ci
577db96d56Sopenharmony_ci        write_line(b'HELO example')
587db96d56Sopenharmony_ci        write_line(b'MAIL From:eggs@example')
597db96d56Sopenharmony_ci        write_line(b'RCPT To:spam@example')
607db96d56Sopenharmony_ci        write_line(b'DATA')
617db96d56Sopenharmony_ci        self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
627db96d56Sopenharmony_ci
637db96d56Sopenharmony_ci    def test_decode_data_and_enable_SMTPUTF8_raises(self):
647db96d56Sopenharmony_ci        self.assertRaises(
657db96d56Sopenharmony_ci            ValueError,
667db96d56Sopenharmony_ci            smtpd.SMTPServer,
677db96d56Sopenharmony_ci            (socket_helper.HOST, 0),
687db96d56Sopenharmony_ci            ('b', 0),
697db96d56Sopenharmony_ci            enable_SMTPUTF8=True,
707db96d56Sopenharmony_ci            decode_data=True)
717db96d56Sopenharmony_ci
727db96d56Sopenharmony_ci    def tearDown(self):
737db96d56Sopenharmony_ci        asyncore.close_all()
747db96d56Sopenharmony_ci        asyncore.socket = smtpd.socket = socket
757db96d56Sopenharmony_ci
767db96d56Sopenharmony_ci
777db96d56Sopenharmony_ciclass DebuggingServerTest(unittest.TestCase):
787db96d56Sopenharmony_ci
797db96d56Sopenharmony_ci    def setUp(self):
807db96d56Sopenharmony_ci        smtpd.socket = asyncore.socket = mock_socket
817db96d56Sopenharmony_ci
827db96d56Sopenharmony_ci    def send_data(self, channel, data, enable_SMTPUTF8=False):
837db96d56Sopenharmony_ci        def write_line(line):
847db96d56Sopenharmony_ci            channel.socket.queue_recv(line)
857db96d56Sopenharmony_ci            channel.handle_read()
867db96d56Sopenharmony_ci        write_line(b'EHLO example')
877db96d56Sopenharmony_ci        if enable_SMTPUTF8:
887db96d56Sopenharmony_ci            write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8')
897db96d56Sopenharmony_ci        else:
907db96d56Sopenharmony_ci            write_line(b'MAIL From:eggs@example')
917db96d56Sopenharmony_ci        write_line(b'RCPT To:spam@example')
927db96d56Sopenharmony_ci        write_line(b'DATA')
937db96d56Sopenharmony_ci        write_line(data)
947db96d56Sopenharmony_ci        write_line(b'.')
957db96d56Sopenharmony_ci
967db96d56Sopenharmony_ci    def test_process_message_with_decode_data_true(self):
977db96d56Sopenharmony_ci        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
987db96d56Sopenharmony_ci                                       decode_data=True)
997db96d56Sopenharmony_ci        conn, addr = server.accept()
1007db96d56Sopenharmony_ci        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
1017db96d56Sopenharmony_ci        with support.captured_stdout() as s:
1027db96d56Sopenharmony_ci            self.send_data(channel, b'From: test\n\nhello\n')
1037db96d56Sopenharmony_ci        stdout = s.getvalue()
1047db96d56Sopenharmony_ci        self.assertEqual(stdout, textwrap.dedent("""\
1057db96d56Sopenharmony_ci             ---------- MESSAGE FOLLOWS ----------
1067db96d56Sopenharmony_ci             From: test
1077db96d56Sopenharmony_ci             X-Peer: peer-address
1087db96d56Sopenharmony_ci
1097db96d56Sopenharmony_ci             hello
1107db96d56Sopenharmony_ci             ------------ END MESSAGE ------------
1117db96d56Sopenharmony_ci             """))
1127db96d56Sopenharmony_ci
1137db96d56Sopenharmony_ci    def test_process_message_with_decode_data_false(self):
1147db96d56Sopenharmony_ci        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0))
1157db96d56Sopenharmony_ci        conn, addr = server.accept()
1167db96d56Sopenharmony_ci        channel = smtpd.SMTPChannel(server, conn, addr)
1177db96d56Sopenharmony_ci        with support.captured_stdout() as s:
1187db96d56Sopenharmony_ci            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
1197db96d56Sopenharmony_ci        stdout = s.getvalue()
1207db96d56Sopenharmony_ci        self.assertEqual(stdout, textwrap.dedent("""\
1217db96d56Sopenharmony_ci             ---------- MESSAGE FOLLOWS ----------
1227db96d56Sopenharmony_ci             b'From: test'
1237db96d56Sopenharmony_ci             b'X-Peer: peer-address'
1247db96d56Sopenharmony_ci             b''
1257db96d56Sopenharmony_ci             b'h\\xc3\\xa9llo\\xff'
1267db96d56Sopenharmony_ci             ------------ END MESSAGE ------------
1277db96d56Sopenharmony_ci             """))
1287db96d56Sopenharmony_ci
1297db96d56Sopenharmony_ci    def test_process_message_with_enable_SMTPUTF8_true(self):
1307db96d56Sopenharmony_ci        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
1317db96d56Sopenharmony_ci                                       enable_SMTPUTF8=True)
1327db96d56Sopenharmony_ci        conn, addr = server.accept()
1337db96d56Sopenharmony_ci        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
1347db96d56Sopenharmony_ci        with support.captured_stdout() as s:
1357db96d56Sopenharmony_ci            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
1367db96d56Sopenharmony_ci        stdout = s.getvalue()
1377db96d56Sopenharmony_ci        self.assertEqual(stdout, textwrap.dedent("""\
1387db96d56Sopenharmony_ci             ---------- MESSAGE FOLLOWS ----------
1397db96d56Sopenharmony_ci             b'From: test'
1407db96d56Sopenharmony_ci             b'X-Peer: peer-address'
1417db96d56Sopenharmony_ci             b''
1427db96d56Sopenharmony_ci             b'h\\xc3\\xa9llo\\xff'
1437db96d56Sopenharmony_ci             ------------ END MESSAGE ------------
1447db96d56Sopenharmony_ci             """))
1457db96d56Sopenharmony_ci
1467db96d56Sopenharmony_ci    def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
1477db96d56Sopenharmony_ci        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
1487db96d56Sopenharmony_ci                                       enable_SMTPUTF8=True)
1497db96d56Sopenharmony_ci        conn, addr = server.accept()
1507db96d56Sopenharmony_ci        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
1517db96d56Sopenharmony_ci        with support.captured_stdout() as s:
1527db96d56Sopenharmony_ci            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',
1537db96d56Sopenharmony_ci                           enable_SMTPUTF8=True)
1547db96d56Sopenharmony_ci        stdout = s.getvalue()
1557db96d56Sopenharmony_ci        self.assertEqual(stdout, textwrap.dedent("""\
1567db96d56Sopenharmony_ci             ---------- MESSAGE FOLLOWS ----------
1577db96d56Sopenharmony_ci             mail options: ['BODY=8BITMIME', 'SMTPUTF8']
1587db96d56Sopenharmony_ci             b'From: test'
1597db96d56Sopenharmony_ci             b'X-Peer: peer-address'
1607db96d56Sopenharmony_ci             b''
1617db96d56Sopenharmony_ci             b'h\\xc3\\xa9llo\\xff'
1627db96d56Sopenharmony_ci             ------------ END MESSAGE ------------
1637db96d56Sopenharmony_ci             """))
1647db96d56Sopenharmony_ci
1657db96d56Sopenharmony_ci    def tearDown(self):
1667db96d56Sopenharmony_ci        asyncore.close_all()
1677db96d56Sopenharmony_ci        asyncore.socket = smtpd.socket = socket
1687db96d56Sopenharmony_ci
1697db96d56Sopenharmony_ci
1707db96d56Sopenharmony_ciclass TestFamilyDetection(unittest.TestCase):
1717db96d56Sopenharmony_ci    def setUp(self):
1727db96d56Sopenharmony_ci        smtpd.socket = asyncore.socket = mock_socket
1737db96d56Sopenharmony_ci
1747db96d56Sopenharmony_ci    def tearDown(self):
1757db96d56Sopenharmony_ci        asyncore.close_all()
1767db96d56Sopenharmony_ci        asyncore.socket = smtpd.socket = socket
1777db96d56Sopenharmony_ci
1787db96d56Sopenharmony_ci    @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
1797db96d56Sopenharmony_ci    def test_socket_uses_IPv6(self):
1807db96d56Sopenharmony_ci        server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0))
1817db96d56Sopenharmony_ci        self.assertEqual(server.socket.family, socket.AF_INET6)
1827db96d56Sopenharmony_ci
1837db96d56Sopenharmony_ci    def test_socket_uses_IPv4(self):
1847db96d56Sopenharmony_ci        server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0))
1857db96d56Sopenharmony_ci        self.assertEqual(server.socket.family, socket.AF_INET)
1867db96d56Sopenharmony_ci
1877db96d56Sopenharmony_ci
1887db96d56Sopenharmony_ciclass TestRcptOptionParsing(unittest.TestCase):
1897db96d56Sopenharmony_ci    error_response = (b'555 RCPT TO parameters not recognized or not '
1907db96d56Sopenharmony_ci                      b'implemented\r\n')
1917db96d56Sopenharmony_ci
1927db96d56Sopenharmony_ci    def setUp(self):
1937db96d56Sopenharmony_ci        smtpd.socket = asyncore.socket = mock_socket
1947db96d56Sopenharmony_ci        self.old_debugstream = smtpd.DEBUGSTREAM
1957db96d56Sopenharmony_ci        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
1967db96d56Sopenharmony_ci
1977db96d56Sopenharmony_ci    def tearDown(self):
1987db96d56Sopenharmony_ci        asyncore.close_all()
1997db96d56Sopenharmony_ci        asyncore.socket = smtpd.socket = socket
2007db96d56Sopenharmony_ci        smtpd.DEBUGSTREAM = self.old_debugstream
2017db96d56Sopenharmony_ci
2027db96d56Sopenharmony_ci    def write_line(self, channel, line):
2037db96d56Sopenharmony_ci        channel.socket.queue_recv(line)
2047db96d56Sopenharmony_ci        channel.handle_read()
2057db96d56Sopenharmony_ci
2067db96d56Sopenharmony_ci    def test_params_rejected(self):
2077db96d56Sopenharmony_ci        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
2087db96d56Sopenharmony_ci        conn, addr = server.accept()
2097db96d56Sopenharmony_ci        channel = smtpd.SMTPChannel(server, conn, addr)
2107db96d56Sopenharmony_ci        self.write_line(channel, b'EHLO example')
2117db96d56Sopenharmony_ci        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
2127db96d56Sopenharmony_ci        self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar')
2137db96d56Sopenharmony_ci        self.assertEqual(channel.socket.last, self.error_response)
2147db96d56Sopenharmony_ci
2157db96d56Sopenharmony_ci    def test_nothing_accepted(self):
2167db96d56Sopenharmony_ci        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
2177db96d56Sopenharmony_ci        conn, addr = server.accept()
2187db96d56Sopenharmony_ci        channel = smtpd.SMTPChannel(server, conn, addr)
2197db96d56Sopenharmony_ci        self.write_line(channel, b'EHLO example')
2207db96d56Sopenharmony_ci        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
2217db96d56Sopenharmony_ci        self.write_line(channel, b'RCPT to: <foo@example.com>')
2227db96d56Sopenharmony_ci        self.assertEqual(channel.socket.last, b'250 OK\r\n')
2237db96d56Sopenharmony_ci
2247db96d56Sopenharmony_ci
2257db96d56Sopenharmony_ciclass TestMailOptionParsing(unittest.TestCase):
2267db96d56Sopenharmony_ci    error_response = (b'555 MAIL FROM parameters not recognized or not '
2277db96d56Sopenharmony_ci                      b'implemented\r\n')
2287db96d56Sopenharmony_ci
2297db96d56Sopenharmony_ci    def setUp(self):
2307db96d56Sopenharmony_ci        smtpd.socket = asyncore.socket = mock_socket
2317db96d56Sopenharmony_ci        self.old_debugstream = smtpd.DEBUGSTREAM
2327db96d56Sopenharmony_ci        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
2337db96d56Sopenharmony_ci
2347db96d56Sopenharmony_ci    def tearDown(self):
2357db96d56Sopenharmony_ci        asyncore.close_all()
2367db96d56Sopenharmony_ci        asyncore.socket = smtpd.socket = socket
2377db96d56Sopenharmony_ci        smtpd.DEBUGSTREAM = self.old_debugstream
2387db96d56Sopenharmony_ci
2397db96d56Sopenharmony_ci    def write_line(self, channel, line):
2407db96d56Sopenharmony_ci        channel.socket.queue_recv(line)
2417db96d56Sopenharmony_ci        channel.handle_read()
2427db96d56Sopenharmony_ci
2437db96d56Sopenharmony_ci    def test_with_decode_data_true(self):
2447db96d56Sopenharmony_ci        server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True)
2457db96d56Sopenharmony_ci        conn, addr = server.accept()
2467db96d56Sopenharmony_ci        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
2477db96d56Sopenharmony_ci        self.write_line(channel, b'EHLO example')
2487db96d56Sopenharmony_ci        for line in [
2497db96d56Sopenharmony_ci            b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
2507db96d56Sopenharmony_ci            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
2517db96d56Sopenharmony_ci            b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN',
2527db96d56Sopenharmony_ci            b'MAIL from: <foo@example.com> size=20 body=8bitmime',
2537db96d56Sopenharmony_ci        ]:
2547db96d56Sopenharmony_ci            self.write_line(channel, line)
2557db96d56Sopenharmony_ci            self.assertEqual(channel.socket.last, self.error_response)
2567db96d56Sopenharmony_ci        self.write_line(channel, b'MAIL from: <foo@example.com> size=20')
2577db96d56Sopenharmony_ci        self.assertEqual(channel.socket.last, b'250 OK\r\n')
2587db96d56Sopenharmony_ci
2597db96d56Sopenharmony_ci    def test_with_decode_data_false(self):
2607db96d56Sopenharmony_ci        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
2617db96d56Sopenharmony_ci        conn, addr = server.accept()
2627db96d56Sopenharmony_ci        channel = smtpd.SMTPChannel(server, conn, addr)
2637db96d56Sopenharmony_ci        self.write_line(channel, b'EHLO example')
2647db96d56Sopenharmony_ci        for line in [
2657db96d56Sopenharmony_ci            b'MAIL from: <foo@example.com> size=20 SMTPUTF8',
2667db96d56Sopenharmony_ci            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME',
2677db96d56Sopenharmony_ci        ]:
2687db96d56Sopenharmony_ci            self.write_line(channel, line)
2697db96d56Sopenharmony_ci            self.assertEqual(channel.socket.last, self.error_response)
2707db96d56Sopenharmony_ci        self.write_line(
2717db96d56Sopenharmony_ci            channel,
2727db96d56Sopenharmony_ci            b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN')
2737db96d56Sopenharmony_ci        self.assertEqual(
2747db96d56Sopenharmony_ci            channel.socket.last,
2757db96d56Sopenharmony_ci            b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n')
2767db96d56Sopenharmony_ci        self.write_line(
2777db96d56Sopenharmony_ci            channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime')
2787db96d56Sopenharmony_ci        self.assertEqual(channel.socket.last, b'250 OK\r\n')
2797db96d56Sopenharmony_ci
2807db96d56Sopenharmony_ci    def test_with_enable_smtputf8_true(self):
2817db96d56Sopenharmony_ci        server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True)
2827db96d56Sopenharmony_ci        conn, addr = server.accept()
2837db96d56Sopenharmony_ci        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
2847db96d56Sopenharmony_ci        self.write_line(channel, b'EHLO example')
2857db96d56Sopenharmony_ci        self.write_line(
2867db96d56Sopenharmony_ci            channel,
2877db96d56Sopenharmony_ci            b'MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8')
2887db96d56Sopenharmony_ci        self.assertEqual(channel.socket.last, b'250 OK\r\n')
2897db96d56Sopenharmony_ci
2907db96d56Sopenharmony_ci
2917db96d56Sopenharmony_ciclass SMTPDChannelTest(unittest.TestCase):
2927db96d56Sopenharmony_ci    def setUp(self):
2937db96d56Sopenharmony_ci        smtpd.socket = asyncore.socket = mock_socket
2947db96d56Sopenharmony_ci        self.old_debugstream = smtpd.DEBUGSTREAM
2957db96d56Sopenharmony_ci        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
2967db96d56Sopenharmony_ci        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
2977db96d56Sopenharmony_ci                                  decode_data=True)
2987db96d56Sopenharmony_ci        conn, addr = self.server.accept()
2997db96d56Sopenharmony_ci        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
3007db96d56Sopenharmony_ci                                         decode_data=True)
3017db96d56Sopenharmony_ci
3027db96d56Sopenharmony_ci    def tearDown(self):
3037db96d56Sopenharmony_ci        asyncore.close_all()
3047db96d56Sopenharmony_ci        asyncore.socket = smtpd.socket = socket
3057db96d56Sopenharmony_ci        smtpd.DEBUGSTREAM = self.old_debugstream
3067db96d56Sopenharmony_ci
3077db96d56Sopenharmony_ci    def write_line(self, line):
3087db96d56Sopenharmony_ci        self.channel.socket.queue_recv(line)
3097db96d56Sopenharmony_ci        self.channel.handle_read()
3107db96d56Sopenharmony_ci
3117db96d56Sopenharmony_ci    def test_broken_connect(self):
3127db96d56Sopenharmony_ci        self.assertRaises(
3137db96d56Sopenharmony_ci            DummyDispatcherBroken, BrokenDummyServer,
3147db96d56Sopenharmony_ci            (socket_helper.HOST, 0), ('b', 0), decode_data=True)
3157db96d56Sopenharmony_ci
3167db96d56Sopenharmony_ci    def test_decode_data_and_enable_SMTPUTF8_raises(self):
3177db96d56Sopenharmony_ci        self.assertRaises(
3187db96d56Sopenharmony_ci            ValueError, smtpd.SMTPChannel,
3197db96d56Sopenharmony_ci            self.server, self.channel.conn, self.channel.addr,
3207db96d56Sopenharmony_ci            enable_SMTPUTF8=True, decode_data=True)
3217db96d56Sopenharmony_ci
3227db96d56Sopenharmony_ci    def test_server_accept(self):
3237db96d56Sopenharmony_ci        self.server.handle_accept()
3247db96d56Sopenharmony_ci
3257db96d56Sopenharmony_ci    def test_missing_data(self):
3267db96d56Sopenharmony_ci        self.write_line(b'')
3277db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3287db96d56Sopenharmony_ci                         b'500 Error: bad syntax\r\n')
3297db96d56Sopenharmony_ci
3307db96d56Sopenharmony_ci    def test_EHLO(self):
3317db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
3327db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 HELP\r\n')
3337db96d56Sopenharmony_ci
3347db96d56Sopenharmony_ci    def test_EHLO_bad_syntax(self):
3357db96d56Sopenharmony_ci        self.write_line(b'EHLO')
3367db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3377db96d56Sopenharmony_ci                         b'501 Syntax: EHLO hostname\r\n')
3387db96d56Sopenharmony_ci
3397db96d56Sopenharmony_ci    def test_EHLO_duplicate(self):
3407db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
3417db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
3427db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3437db96d56Sopenharmony_ci                         b'503 Duplicate HELO/EHLO\r\n')
3447db96d56Sopenharmony_ci
3457db96d56Sopenharmony_ci    def test_EHLO_HELO_duplicate(self):
3467db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
3477db96d56Sopenharmony_ci        self.write_line(b'HELO example')
3487db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3497db96d56Sopenharmony_ci                         b'503 Duplicate HELO/EHLO\r\n')
3507db96d56Sopenharmony_ci
3517db96d56Sopenharmony_ci    def test_HELO(self):
3527db96d56Sopenharmony_ci        name = smtpd.socket.getfqdn()
3537db96d56Sopenharmony_ci        self.write_line(b'HELO example')
3547db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3557db96d56Sopenharmony_ci                         '250 {}\r\n'.format(name).encode('ascii'))
3567db96d56Sopenharmony_ci
3577db96d56Sopenharmony_ci    def test_HELO_EHLO_duplicate(self):
3587db96d56Sopenharmony_ci        self.write_line(b'HELO example')
3597db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
3607db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3617db96d56Sopenharmony_ci                         b'503 Duplicate HELO/EHLO\r\n')
3627db96d56Sopenharmony_ci
3637db96d56Sopenharmony_ci    def test_HELP(self):
3647db96d56Sopenharmony_ci        self.write_line(b'HELP')
3657db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3667db96d56Sopenharmony_ci                         b'250 Supported commands: EHLO HELO MAIL RCPT ' + \
3677db96d56Sopenharmony_ci                         b'DATA RSET NOOP QUIT VRFY\r\n')
3687db96d56Sopenharmony_ci
3697db96d56Sopenharmony_ci    def test_HELP_command(self):
3707db96d56Sopenharmony_ci        self.write_line(b'HELP MAIL')
3717db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3727db96d56Sopenharmony_ci                         b'250 Syntax: MAIL FROM: <address>\r\n')
3737db96d56Sopenharmony_ci
3747db96d56Sopenharmony_ci    def test_HELP_command_unknown(self):
3757db96d56Sopenharmony_ci        self.write_line(b'HELP SPAM')
3767db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3777db96d56Sopenharmony_ci                         b'501 Supported commands: EHLO HELO MAIL RCPT ' + \
3787db96d56Sopenharmony_ci                         b'DATA RSET NOOP QUIT VRFY\r\n')
3797db96d56Sopenharmony_ci
3807db96d56Sopenharmony_ci    def test_HELO_bad_syntax(self):
3817db96d56Sopenharmony_ci        self.write_line(b'HELO')
3827db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3837db96d56Sopenharmony_ci                         b'501 Syntax: HELO hostname\r\n')
3847db96d56Sopenharmony_ci
3857db96d56Sopenharmony_ci    def test_HELO_duplicate(self):
3867db96d56Sopenharmony_ci        self.write_line(b'HELO example')
3877db96d56Sopenharmony_ci        self.write_line(b'HELO example')
3887db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3897db96d56Sopenharmony_ci                         b'503 Duplicate HELO/EHLO\r\n')
3907db96d56Sopenharmony_ci
3917db96d56Sopenharmony_ci    def test_HELO_parameter_rejected_when_extensions_not_enabled(self):
3927db96d56Sopenharmony_ci        self.extended_smtp = False
3937db96d56Sopenharmony_ci        self.write_line(b'HELO example')
3947db96d56Sopenharmony_ci        self.write_line(b'MAIL from:<foo@example.com> SIZE=1234')
3957db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
3967db96d56Sopenharmony_ci                         b'501 Syntax: MAIL FROM: <address>\r\n')
3977db96d56Sopenharmony_ci
3987db96d56Sopenharmony_ci    def test_MAIL_allows_space_after_colon(self):
3997db96d56Sopenharmony_ci        self.write_line(b'HELO example')
4007db96d56Sopenharmony_ci        self.write_line(b'MAIL from:   <foo@example.com>')
4017db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
4027db96d56Sopenharmony_ci                         b'250 OK\r\n')
4037db96d56Sopenharmony_ci
4047db96d56Sopenharmony_ci    def test_extended_MAIL_allows_space_after_colon(self):
4057db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
4067db96d56Sopenharmony_ci        self.write_line(b'MAIL from:   <foo@example.com> size=20')
4077db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
4087db96d56Sopenharmony_ci                         b'250 OK\r\n')
4097db96d56Sopenharmony_ci
4107db96d56Sopenharmony_ci    def test_NOOP(self):
4117db96d56Sopenharmony_ci        self.write_line(b'NOOP')
4127db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
4137db96d56Sopenharmony_ci
4147db96d56Sopenharmony_ci    def test_HELO_NOOP(self):
4157db96d56Sopenharmony_ci        self.write_line(b'HELO example')
4167db96d56Sopenharmony_ci        self.write_line(b'NOOP')
4177db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
4187db96d56Sopenharmony_ci
4197db96d56Sopenharmony_ci    def test_NOOP_bad_syntax(self):
4207db96d56Sopenharmony_ci        self.write_line(b'NOOP hi')
4217db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
4227db96d56Sopenharmony_ci                         b'501 Syntax: NOOP\r\n')
4237db96d56Sopenharmony_ci
4247db96d56Sopenharmony_ci    def test_QUIT(self):
4257db96d56Sopenharmony_ci        self.write_line(b'QUIT')
4267db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
4277db96d56Sopenharmony_ci
4287db96d56Sopenharmony_ci    def test_HELO_QUIT(self):
4297db96d56Sopenharmony_ci        self.write_line(b'HELO example')
4307db96d56Sopenharmony_ci        self.write_line(b'QUIT')
4317db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
4327db96d56Sopenharmony_ci
4337db96d56Sopenharmony_ci    def test_QUIT_arg_ignored(self):
4347db96d56Sopenharmony_ci        self.write_line(b'QUIT bye bye')
4357db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
4367db96d56Sopenharmony_ci
4377db96d56Sopenharmony_ci    def test_bad_state(self):
4387db96d56Sopenharmony_ci        self.channel.smtp_state = 'BAD STATE'
4397db96d56Sopenharmony_ci        self.write_line(b'HELO example')
4407db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
4417db96d56Sopenharmony_ci                         b'451 Internal confusion\r\n')
4427db96d56Sopenharmony_ci
4437db96d56Sopenharmony_ci    def test_command_too_long(self):
4447db96d56Sopenharmony_ci        self.write_line(b'HELO example')
4457db96d56Sopenharmony_ci        self.write_line(b'MAIL from: ' +
4467db96d56Sopenharmony_ci                        b'a' * self.channel.command_size_limit +
4477db96d56Sopenharmony_ci                        b'@example')
4487db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
4497db96d56Sopenharmony_ci                         b'500 Error: line too long\r\n')
4507db96d56Sopenharmony_ci
4517db96d56Sopenharmony_ci    def test_MAIL_command_limit_extended_with_SIZE(self):
4527db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
4537db96d56Sopenharmony_ci        fill_len = self.channel.command_size_limit - len('MAIL from:<@example>')
4547db96d56Sopenharmony_ci        self.write_line(b'MAIL from:<' +
4557db96d56Sopenharmony_ci                        b'a' * fill_len +
4567db96d56Sopenharmony_ci                        b'@example> SIZE=1234')
4577db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
4587db96d56Sopenharmony_ci
4597db96d56Sopenharmony_ci        self.write_line(b'MAIL from:<' +
4607db96d56Sopenharmony_ci                        b'a' * (fill_len + 26) +
4617db96d56Sopenharmony_ci                        b'@example> SIZE=1234')
4627db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
4637db96d56Sopenharmony_ci                         b'500 Error: line too long\r\n')
4647db96d56Sopenharmony_ci
4657db96d56Sopenharmony_ci    def test_MAIL_command_rejects_SMTPUTF8_by_default(self):
4667db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
4677db96d56Sopenharmony_ci        self.write_line(
4687db96d56Sopenharmony_ci            b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8')
4697db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last[0:1], b'5')
4707db96d56Sopenharmony_ci
4717db96d56Sopenharmony_ci    def test_data_longer_than_default_data_size_limit(self):
4727db96d56Sopenharmony_ci        # Hack the default so we don't have to generate so much data.
4737db96d56Sopenharmony_ci        self.channel.data_size_limit = 1048
4747db96d56Sopenharmony_ci        self.write_line(b'HELO example')
4757db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
4767db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
4777db96d56Sopenharmony_ci        self.write_line(b'DATA')
4787db96d56Sopenharmony_ci        self.write_line(b'A' * self.channel.data_size_limit +
4797db96d56Sopenharmony_ci                        b'A\r\n.')
4807db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
4817db96d56Sopenharmony_ci                         b'552 Error: Too much mail data\r\n')
4827db96d56Sopenharmony_ci
4837db96d56Sopenharmony_ci    def test_MAIL_size_parameter(self):
4847db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
4857db96d56Sopenharmony_ci        self.write_line(b'MAIL FROM:<eggs@example> SIZE=512')
4867db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
4877db96d56Sopenharmony_ci                         b'250 OK\r\n')
4887db96d56Sopenharmony_ci
4897db96d56Sopenharmony_ci    def test_MAIL_invalid_size_parameter(self):
4907db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
4917db96d56Sopenharmony_ci        self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid')
4927db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
4937db96d56Sopenharmony_ci            b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
4947db96d56Sopenharmony_ci
4957db96d56Sopenharmony_ci    def test_MAIL_RCPT_unknown_parameters(self):
4967db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
4977db96d56Sopenharmony_ci        self.write_line(b'MAIL FROM:<eggs@example> ham=green')
4987db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
4997db96d56Sopenharmony_ci            b'555 MAIL FROM parameters not recognized or not implemented\r\n')
5007db96d56Sopenharmony_ci
5017db96d56Sopenharmony_ci        self.write_line(b'MAIL FROM:<eggs@example>')
5027db96d56Sopenharmony_ci        self.write_line(b'RCPT TO:<eggs@example> ham=green')
5037db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
5047db96d56Sopenharmony_ci            b'555 RCPT TO parameters not recognized or not implemented\r\n')
5057db96d56Sopenharmony_ci
5067db96d56Sopenharmony_ci    def test_MAIL_size_parameter_larger_than_default_data_size_limit(self):
5077db96d56Sopenharmony_ci        self.channel.data_size_limit = 1048
5087db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
5097db96d56Sopenharmony_ci        self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096')
5107db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
5117db96d56Sopenharmony_ci            b'552 Error: message size exceeds fixed maximum message size\r\n')
5127db96d56Sopenharmony_ci
5137db96d56Sopenharmony_ci    def test_need_MAIL(self):
5147db96d56Sopenharmony_ci        self.write_line(b'HELO example')
5157db96d56Sopenharmony_ci        self.write_line(b'RCPT to:spam@example')
5167db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
5177db96d56Sopenharmony_ci            b'503 Error: need MAIL command\r\n')
5187db96d56Sopenharmony_ci
5197db96d56Sopenharmony_ci    def test_MAIL_syntax_HELO(self):
5207db96d56Sopenharmony_ci        self.write_line(b'HELO example')
5217db96d56Sopenharmony_ci        self.write_line(b'MAIL from eggs@example')
5227db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
5237db96d56Sopenharmony_ci            b'501 Syntax: MAIL FROM: <address>\r\n')
5247db96d56Sopenharmony_ci
5257db96d56Sopenharmony_ci    def test_MAIL_syntax_EHLO(self):
5267db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
5277db96d56Sopenharmony_ci        self.write_line(b'MAIL from eggs@example')
5287db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
5297db96d56Sopenharmony_ci            b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
5307db96d56Sopenharmony_ci
5317db96d56Sopenharmony_ci    def test_MAIL_missing_address(self):
5327db96d56Sopenharmony_ci        self.write_line(b'HELO example')
5337db96d56Sopenharmony_ci        self.write_line(b'MAIL from:')
5347db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
5357db96d56Sopenharmony_ci            b'501 Syntax: MAIL FROM: <address>\r\n')
5367db96d56Sopenharmony_ci
5377db96d56Sopenharmony_ci    def test_MAIL_chevrons(self):
5387db96d56Sopenharmony_ci        self.write_line(b'HELO example')
5397db96d56Sopenharmony_ci        self.write_line(b'MAIL from:<eggs@example>')
5407db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
5417db96d56Sopenharmony_ci
5427db96d56Sopenharmony_ci    def test_MAIL_empty_chevrons(self):
5437db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
5447db96d56Sopenharmony_ci        self.write_line(b'MAIL from:<>')
5457db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
5467db96d56Sopenharmony_ci
5477db96d56Sopenharmony_ci    def test_MAIL_quoted_localpart(self):
5487db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
5497db96d56Sopenharmony_ci        self.write_line(b'MAIL from: <"Fred Blogs"@example.com>')
5507db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
5517db96d56Sopenharmony_ci        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
5527db96d56Sopenharmony_ci
5537db96d56Sopenharmony_ci    def test_MAIL_quoted_localpart_no_angles(self):
5547db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
5557db96d56Sopenharmony_ci        self.write_line(b'MAIL from: "Fred Blogs"@example.com')
5567db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
5577db96d56Sopenharmony_ci        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
5587db96d56Sopenharmony_ci
5597db96d56Sopenharmony_ci    def test_MAIL_quoted_localpart_with_size(self):
5607db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
5617db96d56Sopenharmony_ci        self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000')
5627db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
5637db96d56Sopenharmony_ci        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
5647db96d56Sopenharmony_ci
5657db96d56Sopenharmony_ci    def test_MAIL_quoted_localpart_with_size_no_angles(self):
5667db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
5677db96d56Sopenharmony_ci        self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000')
5687db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
5697db96d56Sopenharmony_ci        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
5707db96d56Sopenharmony_ci
5717db96d56Sopenharmony_ci    def test_nested_MAIL(self):
5727db96d56Sopenharmony_ci        self.write_line(b'HELO example')
5737db96d56Sopenharmony_ci        self.write_line(b'MAIL from:eggs@example')
5747db96d56Sopenharmony_ci        self.write_line(b'MAIL from:spam@example')
5757db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
5767db96d56Sopenharmony_ci            b'503 Error: nested MAIL command\r\n')
5777db96d56Sopenharmony_ci
5787db96d56Sopenharmony_ci    def test_VRFY(self):
5797db96d56Sopenharmony_ci        self.write_line(b'VRFY eggs@example')
5807db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
5817db96d56Sopenharmony_ci            b'252 Cannot VRFY user, but will accept message and attempt ' + \
5827db96d56Sopenharmony_ci            b'delivery\r\n')
5837db96d56Sopenharmony_ci
5847db96d56Sopenharmony_ci    def test_VRFY_syntax(self):
5857db96d56Sopenharmony_ci        self.write_line(b'VRFY')
5867db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
5877db96d56Sopenharmony_ci            b'501 Syntax: VRFY <address>\r\n')
5887db96d56Sopenharmony_ci
5897db96d56Sopenharmony_ci    def test_EXPN_not_implemented(self):
5907db96d56Sopenharmony_ci        self.write_line(b'EXPN')
5917db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
5927db96d56Sopenharmony_ci            b'502 EXPN not implemented\r\n')
5937db96d56Sopenharmony_ci
5947db96d56Sopenharmony_ci    def test_no_HELO_MAIL(self):
5957db96d56Sopenharmony_ci        self.write_line(b'MAIL from:<foo@example.com>')
5967db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
5977db96d56Sopenharmony_ci                         b'503 Error: send HELO first\r\n')
5987db96d56Sopenharmony_ci
5997db96d56Sopenharmony_ci    def test_need_RCPT(self):
6007db96d56Sopenharmony_ci        self.write_line(b'HELO example')
6017db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
6027db96d56Sopenharmony_ci        self.write_line(b'DATA')
6037db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
6047db96d56Sopenharmony_ci            b'503 Error: need RCPT command\r\n')
6057db96d56Sopenharmony_ci
6067db96d56Sopenharmony_ci    def test_RCPT_syntax_HELO(self):
6077db96d56Sopenharmony_ci        self.write_line(b'HELO example')
6087db96d56Sopenharmony_ci        self.write_line(b'MAIL From: eggs@example')
6097db96d56Sopenharmony_ci        self.write_line(b'RCPT to eggs@example')
6107db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
6117db96d56Sopenharmony_ci            b'501 Syntax: RCPT TO: <address>\r\n')
6127db96d56Sopenharmony_ci
6137db96d56Sopenharmony_ci    def test_RCPT_syntax_EHLO(self):
6147db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
6157db96d56Sopenharmony_ci        self.write_line(b'MAIL From: eggs@example')
6167db96d56Sopenharmony_ci        self.write_line(b'RCPT to eggs@example')
6177db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
6187db96d56Sopenharmony_ci            b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n')
6197db96d56Sopenharmony_ci
6207db96d56Sopenharmony_ci    def test_RCPT_lowercase_to_OK(self):
6217db96d56Sopenharmony_ci        self.write_line(b'HELO example')
6227db96d56Sopenharmony_ci        self.write_line(b'MAIL From: eggs@example')
6237db96d56Sopenharmony_ci        self.write_line(b'RCPT to: <eggs@example>')
6247db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
6257db96d56Sopenharmony_ci
6267db96d56Sopenharmony_ci    def test_no_HELO_RCPT(self):
6277db96d56Sopenharmony_ci        self.write_line(b'RCPT to eggs@example')
6287db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
6297db96d56Sopenharmony_ci                         b'503 Error: send HELO first\r\n')
6307db96d56Sopenharmony_ci
6317db96d56Sopenharmony_ci    def test_data_dialog(self):
6327db96d56Sopenharmony_ci        self.write_line(b'HELO example')
6337db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
6347db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
6357db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
6367db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
6377db96d56Sopenharmony_ci
6387db96d56Sopenharmony_ci        self.write_line(b'DATA')
6397db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
6407db96d56Sopenharmony_ci            b'354 End data with <CR><LF>.<CR><LF>\r\n')
6417db96d56Sopenharmony_ci        self.write_line(b'data\r\nmore\r\n.')
6427db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
6437db96d56Sopenharmony_ci        self.assertEqual(self.server.messages,
6447db96d56Sopenharmony_ci            [(('peer-address', 'peer-port'),
6457db96d56Sopenharmony_ci              'eggs@example',
6467db96d56Sopenharmony_ci              ['spam@example'],
6477db96d56Sopenharmony_ci              'data\nmore')])
6487db96d56Sopenharmony_ci
6497db96d56Sopenharmony_ci    def test_DATA_syntax(self):
6507db96d56Sopenharmony_ci        self.write_line(b'HELO example')
6517db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
6527db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
6537db96d56Sopenharmony_ci        self.write_line(b'DATA spam')
6547db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n')
6557db96d56Sopenharmony_ci
6567db96d56Sopenharmony_ci    def test_no_HELO_DATA(self):
6577db96d56Sopenharmony_ci        self.write_line(b'DATA spam')
6587db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
6597db96d56Sopenharmony_ci                         b'503 Error: send HELO first\r\n')
6607db96d56Sopenharmony_ci
6617db96d56Sopenharmony_ci    def test_data_transparency_section_4_5_2(self):
6627db96d56Sopenharmony_ci        self.write_line(b'HELO example')
6637db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
6647db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
6657db96d56Sopenharmony_ci        self.write_line(b'DATA')
6667db96d56Sopenharmony_ci        self.write_line(b'..\r\n.\r\n')
6677db96d56Sopenharmony_ci        self.assertEqual(self.channel.received_data, '.')
6687db96d56Sopenharmony_ci
6697db96d56Sopenharmony_ci    def test_multiple_RCPT(self):
6707db96d56Sopenharmony_ci        self.write_line(b'HELO example')
6717db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
6727db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
6737db96d56Sopenharmony_ci        self.write_line(b'RCPT To:ham@example')
6747db96d56Sopenharmony_ci        self.write_line(b'DATA')
6757db96d56Sopenharmony_ci        self.write_line(b'data\r\n.')
6767db96d56Sopenharmony_ci        self.assertEqual(self.server.messages,
6777db96d56Sopenharmony_ci            [(('peer-address', 'peer-port'),
6787db96d56Sopenharmony_ci              'eggs@example',
6797db96d56Sopenharmony_ci              ['spam@example','ham@example'],
6807db96d56Sopenharmony_ci              'data')])
6817db96d56Sopenharmony_ci
6827db96d56Sopenharmony_ci    def test_manual_status(self):
6837db96d56Sopenharmony_ci        # checks that the Channel is able to return a custom status message
6847db96d56Sopenharmony_ci        self.write_line(b'HELO example')
6857db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
6867db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
6877db96d56Sopenharmony_ci        self.write_line(b'DATA')
6887db96d56Sopenharmony_ci        self.write_line(b'return status\r\n.')
6897db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 Okish\r\n')
6907db96d56Sopenharmony_ci
6917db96d56Sopenharmony_ci    def test_RSET(self):
6927db96d56Sopenharmony_ci        self.write_line(b'HELO example')
6937db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
6947db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
6957db96d56Sopenharmony_ci        self.write_line(b'RSET')
6967db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
6977db96d56Sopenharmony_ci        self.write_line(b'MAIL From:foo@example')
6987db96d56Sopenharmony_ci        self.write_line(b'RCPT To:eggs@example')
6997db96d56Sopenharmony_ci        self.write_line(b'DATA')
7007db96d56Sopenharmony_ci        self.write_line(b'data\r\n.')
7017db96d56Sopenharmony_ci        self.assertEqual(self.server.messages,
7027db96d56Sopenharmony_ci            [(('peer-address', 'peer-port'),
7037db96d56Sopenharmony_ci               'foo@example',
7047db96d56Sopenharmony_ci               ['eggs@example'],
7057db96d56Sopenharmony_ci               'data')])
7067db96d56Sopenharmony_ci
7077db96d56Sopenharmony_ci    def test_HELO_RSET(self):
7087db96d56Sopenharmony_ci        self.write_line(b'HELO example')
7097db96d56Sopenharmony_ci        self.write_line(b'RSET')
7107db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
7117db96d56Sopenharmony_ci
7127db96d56Sopenharmony_ci    def test_RSET_syntax(self):
7137db96d56Sopenharmony_ci        self.write_line(b'RSET hi')
7147db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n')
7157db96d56Sopenharmony_ci
7167db96d56Sopenharmony_ci    def test_unknown_command(self):
7177db96d56Sopenharmony_ci        self.write_line(b'UNKNOWN_CMD')
7187db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
7197db96d56Sopenharmony_ci                         b'500 Error: command "UNKNOWN_CMD" not ' + \
7207db96d56Sopenharmony_ci                         b'recognized\r\n')
7217db96d56Sopenharmony_ci
7227db96d56Sopenharmony_ci    def test_attribute_deprecations(self):
7237db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7247db96d56Sopenharmony_ci            spam = self.channel._SMTPChannel__server
7257db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7267db96d56Sopenharmony_ci            self.channel._SMTPChannel__server = 'spam'
7277db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7287db96d56Sopenharmony_ci            spam = self.channel._SMTPChannel__line
7297db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7307db96d56Sopenharmony_ci            self.channel._SMTPChannel__line = 'spam'
7317db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7327db96d56Sopenharmony_ci            spam = self.channel._SMTPChannel__state
7337db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7347db96d56Sopenharmony_ci            self.channel._SMTPChannel__state = 'spam'
7357db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7367db96d56Sopenharmony_ci            spam = self.channel._SMTPChannel__greeting
7377db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7387db96d56Sopenharmony_ci            self.channel._SMTPChannel__greeting = 'spam'
7397db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7407db96d56Sopenharmony_ci            spam = self.channel._SMTPChannel__mailfrom
7417db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7427db96d56Sopenharmony_ci            self.channel._SMTPChannel__mailfrom = 'spam'
7437db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7447db96d56Sopenharmony_ci            spam = self.channel._SMTPChannel__rcpttos
7457db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7467db96d56Sopenharmony_ci            self.channel._SMTPChannel__rcpttos = 'spam'
7477db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7487db96d56Sopenharmony_ci            spam = self.channel._SMTPChannel__data
7497db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7507db96d56Sopenharmony_ci            self.channel._SMTPChannel__data = 'spam'
7517db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7527db96d56Sopenharmony_ci            spam = self.channel._SMTPChannel__fqdn
7537db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7547db96d56Sopenharmony_ci            self.channel._SMTPChannel__fqdn = 'spam'
7557db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7567db96d56Sopenharmony_ci            spam = self.channel._SMTPChannel__peer
7577db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7587db96d56Sopenharmony_ci            self.channel._SMTPChannel__peer = 'spam'
7597db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7607db96d56Sopenharmony_ci            spam = self.channel._SMTPChannel__conn
7617db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7627db96d56Sopenharmony_ci            self.channel._SMTPChannel__conn = 'spam'
7637db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7647db96d56Sopenharmony_ci            spam = self.channel._SMTPChannel__addr
7657db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', DeprecationWarning)):
7667db96d56Sopenharmony_ci            self.channel._SMTPChannel__addr = 'spam'
7677db96d56Sopenharmony_ci
7687db96d56Sopenharmony_ci@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
7697db96d56Sopenharmony_ciclass SMTPDChannelIPv6Test(SMTPDChannelTest):
7707db96d56Sopenharmony_ci    def setUp(self):
7717db96d56Sopenharmony_ci        smtpd.socket = asyncore.socket = mock_socket
7727db96d56Sopenharmony_ci        self.old_debugstream = smtpd.DEBUGSTREAM
7737db96d56Sopenharmony_ci        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
7747db96d56Sopenharmony_ci        self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0),
7757db96d56Sopenharmony_ci                                  decode_data=True)
7767db96d56Sopenharmony_ci        conn, addr = self.server.accept()
7777db96d56Sopenharmony_ci        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
7787db96d56Sopenharmony_ci                                         decode_data=True)
7797db96d56Sopenharmony_ci
7807db96d56Sopenharmony_ciclass SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
7817db96d56Sopenharmony_ci
7827db96d56Sopenharmony_ci    def setUp(self):
7837db96d56Sopenharmony_ci        smtpd.socket = asyncore.socket = mock_socket
7847db96d56Sopenharmony_ci        self.old_debugstream = smtpd.DEBUGSTREAM
7857db96d56Sopenharmony_ci        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
7867db96d56Sopenharmony_ci        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
7877db96d56Sopenharmony_ci                                  decode_data=True)
7887db96d56Sopenharmony_ci        conn, addr = self.server.accept()
7897db96d56Sopenharmony_ci        # Set DATA size limit to 32 bytes for easy testing
7907db96d56Sopenharmony_ci        self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32,
7917db96d56Sopenharmony_ci                                         decode_data=True)
7927db96d56Sopenharmony_ci
7937db96d56Sopenharmony_ci    def tearDown(self):
7947db96d56Sopenharmony_ci        asyncore.close_all()
7957db96d56Sopenharmony_ci        asyncore.socket = smtpd.socket = socket
7967db96d56Sopenharmony_ci        smtpd.DEBUGSTREAM = self.old_debugstream
7977db96d56Sopenharmony_ci
7987db96d56Sopenharmony_ci    def write_line(self, line):
7997db96d56Sopenharmony_ci        self.channel.socket.queue_recv(line)
8007db96d56Sopenharmony_ci        self.channel.handle_read()
8017db96d56Sopenharmony_ci
8027db96d56Sopenharmony_ci    def test_data_limit_dialog(self):
8037db96d56Sopenharmony_ci        self.write_line(b'HELO example')
8047db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
8057db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
8067db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
8077db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
8087db96d56Sopenharmony_ci
8097db96d56Sopenharmony_ci        self.write_line(b'DATA')
8107db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
8117db96d56Sopenharmony_ci            b'354 End data with <CR><LF>.<CR><LF>\r\n')
8127db96d56Sopenharmony_ci        self.write_line(b'data\r\nmore\r\n.')
8137db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
8147db96d56Sopenharmony_ci        self.assertEqual(self.server.messages,
8157db96d56Sopenharmony_ci            [(('peer-address', 'peer-port'),
8167db96d56Sopenharmony_ci              'eggs@example',
8177db96d56Sopenharmony_ci              ['spam@example'],
8187db96d56Sopenharmony_ci              'data\nmore')])
8197db96d56Sopenharmony_ci
8207db96d56Sopenharmony_ci    def test_data_limit_dialog_too_much_data(self):
8217db96d56Sopenharmony_ci        self.write_line(b'HELO example')
8227db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
8237db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
8247db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
8257db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
8267db96d56Sopenharmony_ci
8277db96d56Sopenharmony_ci        self.write_line(b'DATA')
8287db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
8297db96d56Sopenharmony_ci            b'354 End data with <CR><LF>.<CR><LF>\r\n')
8307db96d56Sopenharmony_ci        self.write_line(b'This message is longer than 32 bytes\r\n.')
8317db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
8327db96d56Sopenharmony_ci                         b'552 Error: Too much mail data\r\n')
8337db96d56Sopenharmony_ci
8347db96d56Sopenharmony_ci
8357db96d56Sopenharmony_ciclass SMTPDChannelWithDecodeDataFalse(unittest.TestCase):
8367db96d56Sopenharmony_ci
8377db96d56Sopenharmony_ci    def setUp(self):
8387db96d56Sopenharmony_ci        smtpd.socket = asyncore.socket = mock_socket
8397db96d56Sopenharmony_ci        self.old_debugstream = smtpd.DEBUGSTREAM
8407db96d56Sopenharmony_ci        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
8417db96d56Sopenharmony_ci        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0))
8427db96d56Sopenharmony_ci        conn, addr = self.server.accept()
8437db96d56Sopenharmony_ci        self.channel = smtpd.SMTPChannel(self.server, conn, addr)
8447db96d56Sopenharmony_ci
8457db96d56Sopenharmony_ci    def tearDown(self):
8467db96d56Sopenharmony_ci        asyncore.close_all()
8477db96d56Sopenharmony_ci        asyncore.socket = smtpd.socket = socket
8487db96d56Sopenharmony_ci        smtpd.DEBUGSTREAM = self.old_debugstream
8497db96d56Sopenharmony_ci
8507db96d56Sopenharmony_ci    def write_line(self, line):
8517db96d56Sopenharmony_ci        self.channel.socket.queue_recv(line)
8527db96d56Sopenharmony_ci        self.channel.handle_read()
8537db96d56Sopenharmony_ci
8547db96d56Sopenharmony_ci    def test_ascii_data(self):
8557db96d56Sopenharmony_ci        self.write_line(b'HELO example')
8567db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
8577db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
8587db96d56Sopenharmony_ci        self.write_line(b'DATA')
8597db96d56Sopenharmony_ci        self.write_line(b'plain ascii text')
8607db96d56Sopenharmony_ci        self.write_line(b'.')
8617db96d56Sopenharmony_ci        self.assertEqual(self.channel.received_data, b'plain ascii text')
8627db96d56Sopenharmony_ci
8637db96d56Sopenharmony_ci    def test_utf8_data(self):
8647db96d56Sopenharmony_ci        self.write_line(b'HELO example')
8657db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
8667db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
8677db96d56Sopenharmony_ci        self.write_line(b'DATA')
8687db96d56Sopenharmony_ci        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
8697db96d56Sopenharmony_ci        self.write_line(b'and some plain ascii')
8707db96d56Sopenharmony_ci        self.write_line(b'.')
8717db96d56Sopenharmony_ci        self.assertEqual(
8727db96d56Sopenharmony_ci            self.channel.received_data,
8737db96d56Sopenharmony_ci            b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n'
8747db96d56Sopenharmony_ci                b'and some plain ascii')
8757db96d56Sopenharmony_ci
8767db96d56Sopenharmony_ci
8777db96d56Sopenharmony_ciclass SMTPDChannelWithDecodeDataTrue(unittest.TestCase):
8787db96d56Sopenharmony_ci
8797db96d56Sopenharmony_ci    def setUp(self):
8807db96d56Sopenharmony_ci        smtpd.socket = asyncore.socket = mock_socket
8817db96d56Sopenharmony_ci        self.old_debugstream = smtpd.DEBUGSTREAM
8827db96d56Sopenharmony_ci        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
8837db96d56Sopenharmony_ci        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
8847db96d56Sopenharmony_ci                                  decode_data=True)
8857db96d56Sopenharmony_ci        conn, addr = self.server.accept()
8867db96d56Sopenharmony_ci        # Set decode_data to True
8877db96d56Sopenharmony_ci        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
8887db96d56Sopenharmony_ci                decode_data=True)
8897db96d56Sopenharmony_ci
8907db96d56Sopenharmony_ci    def tearDown(self):
8917db96d56Sopenharmony_ci        asyncore.close_all()
8927db96d56Sopenharmony_ci        asyncore.socket = smtpd.socket = socket
8937db96d56Sopenharmony_ci        smtpd.DEBUGSTREAM = self.old_debugstream
8947db96d56Sopenharmony_ci
8957db96d56Sopenharmony_ci    def write_line(self, line):
8967db96d56Sopenharmony_ci        self.channel.socket.queue_recv(line)
8977db96d56Sopenharmony_ci        self.channel.handle_read()
8987db96d56Sopenharmony_ci
8997db96d56Sopenharmony_ci    def test_ascii_data(self):
9007db96d56Sopenharmony_ci        self.write_line(b'HELO example')
9017db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
9027db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
9037db96d56Sopenharmony_ci        self.write_line(b'DATA')
9047db96d56Sopenharmony_ci        self.write_line(b'plain ascii text')
9057db96d56Sopenharmony_ci        self.write_line(b'.')
9067db96d56Sopenharmony_ci        self.assertEqual(self.channel.received_data, 'plain ascii text')
9077db96d56Sopenharmony_ci
9087db96d56Sopenharmony_ci    def test_utf8_data(self):
9097db96d56Sopenharmony_ci        self.write_line(b'HELO example')
9107db96d56Sopenharmony_ci        self.write_line(b'MAIL From:eggs@example')
9117db96d56Sopenharmony_ci        self.write_line(b'RCPT To:spam@example')
9127db96d56Sopenharmony_ci        self.write_line(b'DATA')
9137db96d56Sopenharmony_ci        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
9147db96d56Sopenharmony_ci        self.write_line(b'and some plain ascii')
9157db96d56Sopenharmony_ci        self.write_line(b'.')
9167db96d56Sopenharmony_ci        self.assertEqual(
9177db96d56Sopenharmony_ci            self.channel.received_data,
9187db96d56Sopenharmony_ci            'utf8 enriched text: żźć\nand some plain ascii')
9197db96d56Sopenharmony_ci
9207db96d56Sopenharmony_ci
9217db96d56Sopenharmony_ciclass SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase):
9227db96d56Sopenharmony_ci    def setUp(self):
9237db96d56Sopenharmony_ci        smtpd.socket = asyncore.socket = mock_socket
9247db96d56Sopenharmony_ci        self.old_debugstream = smtpd.DEBUGSTREAM
9257db96d56Sopenharmony_ci        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
9267db96d56Sopenharmony_ci        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
9277db96d56Sopenharmony_ci                                  enable_SMTPUTF8=True)
9287db96d56Sopenharmony_ci        conn, addr = self.server.accept()
9297db96d56Sopenharmony_ci        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
9307db96d56Sopenharmony_ci                                         enable_SMTPUTF8=True)
9317db96d56Sopenharmony_ci
9327db96d56Sopenharmony_ci    def tearDown(self):
9337db96d56Sopenharmony_ci        asyncore.close_all()
9347db96d56Sopenharmony_ci        asyncore.socket = smtpd.socket = socket
9357db96d56Sopenharmony_ci        smtpd.DEBUGSTREAM = self.old_debugstream
9367db96d56Sopenharmony_ci
9377db96d56Sopenharmony_ci    def write_line(self, line):
9387db96d56Sopenharmony_ci        self.channel.socket.queue_recv(line)
9397db96d56Sopenharmony_ci        self.channel.handle_read()
9407db96d56Sopenharmony_ci
9417db96d56Sopenharmony_ci    def test_MAIL_command_accepts_SMTPUTF8_when_announced(self):
9427db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
9437db96d56Sopenharmony_ci        self.write_line(
9447db96d56Sopenharmony_ci            'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode(
9457db96d56Sopenharmony_ci                'utf-8')
9467db96d56Sopenharmony_ci        )
9477db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
9487db96d56Sopenharmony_ci
9497db96d56Sopenharmony_ci    def test_process_smtputf8_message(self):
9507db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
9517db96d56Sopenharmony_ci        for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']:
9527db96d56Sopenharmony_ci            self.write_line(b'MAIL from: <a@example> ' + mail_parameters)
9537db96d56Sopenharmony_ci            self.assertEqual(self.channel.socket.last[0:3], b'250')
9547db96d56Sopenharmony_ci            self.write_line(b'rcpt to:<b@example.com>')
9557db96d56Sopenharmony_ci            self.assertEqual(self.channel.socket.last[0:3], b'250')
9567db96d56Sopenharmony_ci            self.write_line(b'data')
9577db96d56Sopenharmony_ci            self.assertEqual(self.channel.socket.last[0:3], b'354')
9587db96d56Sopenharmony_ci            self.write_line(b'c\r\n.')
9597db96d56Sopenharmony_ci            if mail_parameters == b'':
9607db96d56Sopenharmony_ci                self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
9617db96d56Sopenharmony_ci            else:
9627db96d56Sopenharmony_ci                self.assertEqual(self.channel.socket.last,
9637db96d56Sopenharmony_ci                                 b'250 SMTPUTF8 message okish\r\n')
9647db96d56Sopenharmony_ci
9657db96d56Sopenharmony_ci    def test_utf8_data(self):
9667db96d56Sopenharmony_ci        self.write_line(b'EHLO example')
9677db96d56Sopenharmony_ci        self.write_line(
9687db96d56Sopenharmony_ci            'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8'))
9697db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last[0:3], b'250')
9707db96d56Sopenharmony_ci        self.write_line('RCPT To:späm@examplé'.encode('utf-8'))
9717db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last[0:3], b'250')
9727db96d56Sopenharmony_ci        self.write_line(b'DATA')
9737db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last[0:3], b'354')
9747db96d56Sopenharmony_ci        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
9757db96d56Sopenharmony_ci        self.write_line(b'.')
9767db96d56Sopenharmony_ci        self.assertEqual(
9777db96d56Sopenharmony_ci            self.channel.received_data,
9787db96d56Sopenharmony_ci            b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
9797db96d56Sopenharmony_ci
9807db96d56Sopenharmony_ci    def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self):
9817db96d56Sopenharmony_ci        self.write_line(b'ehlo example')
9827db96d56Sopenharmony_ci        fill_len = (512 + 26 + 10) - len('mail from:<@example>')
9837db96d56Sopenharmony_ci        self.write_line(b'MAIL from:<' +
9847db96d56Sopenharmony_ci                        b'a' * (fill_len + 1) +
9857db96d56Sopenharmony_ci                        b'@example>')
9867db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last,
9877db96d56Sopenharmony_ci                         b'500 Error: line too long\r\n')
9887db96d56Sopenharmony_ci        self.write_line(b'MAIL from:<' +
9897db96d56Sopenharmony_ci                        b'a' * fill_len +
9907db96d56Sopenharmony_ci                        b'@example>')
9917db96d56Sopenharmony_ci        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
9927db96d56Sopenharmony_ci
9937db96d56Sopenharmony_ci    def test_multiple_emails_with_extended_command_length(self):
9947db96d56Sopenharmony_ci        self.write_line(b'ehlo example')
9957db96d56Sopenharmony_ci        fill_len = (512 + 26 + 10) - len('mail from:<@example>')
9967db96d56Sopenharmony_ci        for char in [b'a', b'b', b'c']:
9977db96d56Sopenharmony_ci            self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>')
9987db96d56Sopenharmony_ci            self.assertEqual(self.channel.socket.last[0:3], b'500')
9997db96d56Sopenharmony_ci            self.write_line(b'MAIL from:<' + char * fill_len + b'@example>')
10007db96d56Sopenharmony_ci            self.assertEqual(self.channel.socket.last[0:3], b'250')
10017db96d56Sopenharmony_ci            self.write_line(b'rcpt to:<hans@example.com>')
10027db96d56Sopenharmony_ci            self.assertEqual(self.channel.socket.last[0:3], b'250')
10037db96d56Sopenharmony_ci            self.write_line(b'data')
10047db96d56Sopenharmony_ci            self.assertEqual(self.channel.socket.last[0:3], b'354')
10057db96d56Sopenharmony_ci            self.write_line(b'test\r\n.')
10067db96d56Sopenharmony_ci            self.assertEqual(self.channel.socket.last[0:3], b'250')
10077db96d56Sopenharmony_ci
10087db96d56Sopenharmony_ci
10097db96d56Sopenharmony_ciclass MiscTestCase(unittest.TestCase):
10107db96d56Sopenharmony_ci    def test__all__(self):
10117db96d56Sopenharmony_ci        not_exported = {
10127db96d56Sopenharmony_ci            "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE",
10137db96d56Sopenharmony_ci            "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs",
10147db96d56Sopenharmony_ci        }
10157db96d56Sopenharmony_ci        support.check__all__(self, smtpd, not_exported=not_exported)
10167db96d56Sopenharmony_ci
10177db96d56Sopenharmony_ci
10187db96d56Sopenharmony_ciif __name__ == "__main__":
10197db96d56Sopenharmony_ci    unittest.main()
1020