17db96d56Sopenharmony_ciimport unittest 27db96d56Sopenharmony_ciimport textwrap 37db96d56Sopenharmony_cifrom test import support, mock_socket 47db96d56Sopenharmony_cifrom test.support import socket_helper 57db96d56Sopenharmony_cifrom test.support import warnings_helper 67db96d56Sopenharmony_ciimport socket 77db96d56Sopenharmony_ciimport io 87db96d56Sopenharmony_ci 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_cismtpd = warnings_helper.import_deprecated('smtpd') 117db96d56Sopenharmony_ciasyncore = warnings_helper.import_deprecated('asyncore') 127db96d56Sopenharmony_ci 137db96d56Sopenharmony_ciif not socket_helper.has_gethostname: 147db96d56Sopenharmony_ci raise unittest.SkipTest("test requires gethostname()") 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_ci 177db96d56Sopenharmony_ciclass DummyServer(smtpd.SMTPServer): 187db96d56Sopenharmony_ci def __init__(self, *args, **kwargs): 197db96d56Sopenharmony_ci smtpd.SMTPServer.__init__(self, *args, **kwargs) 207db96d56Sopenharmony_ci self.messages = [] 217db96d56Sopenharmony_ci if self._decode_data: 227db96d56Sopenharmony_ci self.return_status = 'return status' 237db96d56Sopenharmony_ci else: 247db96d56Sopenharmony_ci self.return_status = b'return status' 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ci def process_message(self, peer, mailfrom, rcpttos, data, **kw): 277db96d56Sopenharmony_ci self.messages.append((peer, mailfrom, rcpttos, data)) 287db96d56Sopenharmony_ci if data == self.return_status: 297db96d56Sopenharmony_ci return '250 Okish' 307db96d56Sopenharmony_ci if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']: 317db96d56Sopenharmony_ci return '250 SMTPUTF8 message okish' 327db96d56Sopenharmony_ci 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ciclass DummyDispatcherBroken(Exception): 357db96d56Sopenharmony_ci pass 367db96d56Sopenharmony_ci 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ciclass BrokenDummyServer(DummyServer): 397db96d56Sopenharmony_ci def listen(self, num): 407db96d56Sopenharmony_ci raise DummyDispatcherBroken() 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_ciclass SMTPDServerTest(unittest.TestCase): 447db96d56Sopenharmony_ci def setUp(self): 457db96d56Sopenharmony_ci smtpd.socket = asyncore.socket = mock_socket 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_ci def test_process_message_unimplemented(self): 487db96d56Sopenharmony_ci server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0), 497db96d56Sopenharmony_ci decode_data=True) 507db96d56Sopenharmony_ci conn, addr = server.accept() 517db96d56Sopenharmony_ci channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ci def write_line(line): 547db96d56Sopenharmony_ci channel.socket.queue_recv(line) 557db96d56Sopenharmony_ci channel.handle_read() 567db96d56Sopenharmony_ci 577db96d56Sopenharmony_ci write_line(b'HELO example') 587db96d56Sopenharmony_ci write_line(b'MAIL From:eggs@example') 597db96d56Sopenharmony_ci write_line(b'RCPT To:spam@example') 607db96d56Sopenharmony_ci write_line(b'DATA') 617db96d56Sopenharmony_ci self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ci def test_decode_data_and_enable_SMTPUTF8_raises(self): 647db96d56Sopenharmony_ci self.assertRaises( 657db96d56Sopenharmony_ci ValueError, 667db96d56Sopenharmony_ci smtpd.SMTPServer, 677db96d56Sopenharmony_ci (socket_helper.HOST, 0), 687db96d56Sopenharmony_ci ('b', 0), 697db96d56Sopenharmony_ci enable_SMTPUTF8=True, 707db96d56Sopenharmony_ci decode_data=True) 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci def tearDown(self): 737db96d56Sopenharmony_ci asyncore.close_all() 747db96d56Sopenharmony_ci asyncore.socket = smtpd.socket = socket 757db96d56Sopenharmony_ci 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ciclass DebuggingServerTest(unittest.TestCase): 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_ci def setUp(self): 807db96d56Sopenharmony_ci smtpd.socket = asyncore.socket = mock_socket 817db96d56Sopenharmony_ci 827db96d56Sopenharmony_ci def send_data(self, channel, data, enable_SMTPUTF8=False): 837db96d56Sopenharmony_ci def write_line(line): 847db96d56Sopenharmony_ci channel.socket.queue_recv(line) 857db96d56Sopenharmony_ci channel.handle_read() 867db96d56Sopenharmony_ci write_line(b'EHLO example') 877db96d56Sopenharmony_ci if enable_SMTPUTF8: 887db96d56Sopenharmony_ci write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8') 897db96d56Sopenharmony_ci else: 907db96d56Sopenharmony_ci write_line(b'MAIL From:eggs@example') 917db96d56Sopenharmony_ci write_line(b'RCPT To:spam@example') 927db96d56Sopenharmony_ci write_line(b'DATA') 937db96d56Sopenharmony_ci write_line(data) 947db96d56Sopenharmony_ci write_line(b'.') 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_ci def test_process_message_with_decode_data_true(self): 977db96d56Sopenharmony_ci server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 987db96d56Sopenharmony_ci decode_data=True) 997db96d56Sopenharmony_ci conn, addr = server.accept() 1007db96d56Sopenharmony_ci channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 1017db96d56Sopenharmony_ci with support.captured_stdout() as s: 1027db96d56Sopenharmony_ci self.send_data(channel, b'From: test\n\nhello\n') 1037db96d56Sopenharmony_ci stdout = s.getvalue() 1047db96d56Sopenharmony_ci self.assertEqual(stdout, textwrap.dedent("""\ 1057db96d56Sopenharmony_ci ---------- MESSAGE FOLLOWS ---------- 1067db96d56Sopenharmony_ci From: test 1077db96d56Sopenharmony_ci X-Peer: peer-address 1087db96d56Sopenharmony_ci 1097db96d56Sopenharmony_ci hello 1107db96d56Sopenharmony_ci ------------ END MESSAGE ------------ 1117db96d56Sopenharmony_ci """)) 1127db96d56Sopenharmony_ci 1137db96d56Sopenharmony_ci def test_process_message_with_decode_data_false(self): 1147db96d56Sopenharmony_ci server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0)) 1157db96d56Sopenharmony_ci conn, addr = server.accept() 1167db96d56Sopenharmony_ci channel = smtpd.SMTPChannel(server, conn, addr) 1177db96d56Sopenharmony_ci with support.captured_stdout() as s: 1187db96d56Sopenharmony_ci self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') 1197db96d56Sopenharmony_ci stdout = s.getvalue() 1207db96d56Sopenharmony_ci self.assertEqual(stdout, textwrap.dedent("""\ 1217db96d56Sopenharmony_ci ---------- MESSAGE FOLLOWS ---------- 1227db96d56Sopenharmony_ci b'From: test' 1237db96d56Sopenharmony_ci b'X-Peer: peer-address' 1247db96d56Sopenharmony_ci b'' 1257db96d56Sopenharmony_ci b'h\\xc3\\xa9llo\\xff' 1267db96d56Sopenharmony_ci ------------ END MESSAGE ------------ 1277db96d56Sopenharmony_ci """)) 1287db96d56Sopenharmony_ci 1297db96d56Sopenharmony_ci def test_process_message_with_enable_SMTPUTF8_true(self): 1307db96d56Sopenharmony_ci server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 1317db96d56Sopenharmony_ci enable_SMTPUTF8=True) 1327db96d56Sopenharmony_ci conn, addr = server.accept() 1337db96d56Sopenharmony_ci channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 1347db96d56Sopenharmony_ci with support.captured_stdout() as s: 1357db96d56Sopenharmony_ci self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') 1367db96d56Sopenharmony_ci stdout = s.getvalue() 1377db96d56Sopenharmony_ci self.assertEqual(stdout, textwrap.dedent("""\ 1387db96d56Sopenharmony_ci ---------- MESSAGE FOLLOWS ---------- 1397db96d56Sopenharmony_ci b'From: test' 1407db96d56Sopenharmony_ci b'X-Peer: peer-address' 1417db96d56Sopenharmony_ci b'' 1427db96d56Sopenharmony_ci b'h\\xc3\\xa9llo\\xff' 1437db96d56Sopenharmony_ci ------------ END MESSAGE ------------ 1447db96d56Sopenharmony_ci """)) 1457db96d56Sopenharmony_ci 1467db96d56Sopenharmony_ci def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self): 1477db96d56Sopenharmony_ci server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 1487db96d56Sopenharmony_ci enable_SMTPUTF8=True) 1497db96d56Sopenharmony_ci conn, addr = server.accept() 1507db96d56Sopenharmony_ci channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 1517db96d56Sopenharmony_ci with support.captured_stdout() as s: 1527db96d56Sopenharmony_ci self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n', 1537db96d56Sopenharmony_ci enable_SMTPUTF8=True) 1547db96d56Sopenharmony_ci stdout = s.getvalue() 1557db96d56Sopenharmony_ci self.assertEqual(stdout, textwrap.dedent("""\ 1567db96d56Sopenharmony_ci ---------- MESSAGE FOLLOWS ---------- 1577db96d56Sopenharmony_ci mail options: ['BODY=8BITMIME', 'SMTPUTF8'] 1587db96d56Sopenharmony_ci b'From: test' 1597db96d56Sopenharmony_ci b'X-Peer: peer-address' 1607db96d56Sopenharmony_ci b'' 1617db96d56Sopenharmony_ci b'h\\xc3\\xa9llo\\xff' 1627db96d56Sopenharmony_ci ------------ END MESSAGE ------------ 1637db96d56Sopenharmony_ci """)) 1647db96d56Sopenharmony_ci 1657db96d56Sopenharmony_ci def tearDown(self): 1667db96d56Sopenharmony_ci asyncore.close_all() 1677db96d56Sopenharmony_ci asyncore.socket = smtpd.socket = socket 1687db96d56Sopenharmony_ci 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ciclass TestFamilyDetection(unittest.TestCase): 1717db96d56Sopenharmony_ci def setUp(self): 1727db96d56Sopenharmony_ci smtpd.socket = asyncore.socket = mock_socket 1737db96d56Sopenharmony_ci 1747db96d56Sopenharmony_ci def tearDown(self): 1757db96d56Sopenharmony_ci asyncore.close_all() 1767db96d56Sopenharmony_ci asyncore.socket = smtpd.socket = socket 1777db96d56Sopenharmony_ci 1787db96d56Sopenharmony_ci @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") 1797db96d56Sopenharmony_ci def test_socket_uses_IPv6(self): 1807db96d56Sopenharmony_ci server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0)) 1817db96d56Sopenharmony_ci self.assertEqual(server.socket.family, socket.AF_INET6) 1827db96d56Sopenharmony_ci 1837db96d56Sopenharmony_ci def test_socket_uses_IPv4(self): 1847db96d56Sopenharmony_ci server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0)) 1857db96d56Sopenharmony_ci self.assertEqual(server.socket.family, socket.AF_INET) 1867db96d56Sopenharmony_ci 1877db96d56Sopenharmony_ci 1887db96d56Sopenharmony_ciclass TestRcptOptionParsing(unittest.TestCase): 1897db96d56Sopenharmony_ci error_response = (b'555 RCPT TO parameters not recognized or not ' 1907db96d56Sopenharmony_ci b'implemented\r\n') 1917db96d56Sopenharmony_ci 1927db96d56Sopenharmony_ci def setUp(self): 1937db96d56Sopenharmony_ci smtpd.socket = asyncore.socket = mock_socket 1947db96d56Sopenharmony_ci self.old_debugstream = smtpd.DEBUGSTREAM 1957db96d56Sopenharmony_ci self.debug = smtpd.DEBUGSTREAM = io.StringIO() 1967db96d56Sopenharmony_ci 1977db96d56Sopenharmony_ci def tearDown(self): 1987db96d56Sopenharmony_ci asyncore.close_all() 1997db96d56Sopenharmony_ci asyncore.socket = smtpd.socket = socket 2007db96d56Sopenharmony_ci smtpd.DEBUGSTREAM = self.old_debugstream 2017db96d56Sopenharmony_ci 2027db96d56Sopenharmony_ci def write_line(self, channel, line): 2037db96d56Sopenharmony_ci channel.socket.queue_recv(line) 2047db96d56Sopenharmony_ci channel.handle_read() 2057db96d56Sopenharmony_ci 2067db96d56Sopenharmony_ci def test_params_rejected(self): 2077db96d56Sopenharmony_ci server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 2087db96d56Sopenharmony_ci conn, addr = server.accept() 2097db96d56Sopenharmony_ci channel = smtpd.SMTPChannel(server, conn, addr) 2107db96d56Sopenharmony_ci self.write_line(channel, b'EHLO example') 2117db96d56Sopenharmony_ci self.write_line(channel, b'MAIL from: <foo@example.com> size=20') 2127db96d56Sopenharmony_ci self.write_line(channel, b'RCPT to: <foo@example.com> foo=bar') 2137db96d56Sopenharmony_ci self.assertEqual(channel.socket.last, self.error_response) 2147db96d56Sopenharmony_ci 2157db96d56Sopenharmony_ci def test_nothing_accepted(self): 2167db96d56Sopenharmony_ci server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 2177db96d56Sopenharmony_ci conn, addr = server.accept() 2187db96d56Sopenharmony_ci channel = smtpd.SMTPChannel(server, conn, addr) 2197db96d56Sopenharmony_ci self.write_line(channel, b'EHLO example') 2207db96d56Sopenharmony_ci self.write_line(channel, b'MAIL from: <foo@example.com> size=20') 2217db96d56Sopenharmony_ci self.write_line(channel, b'RCPT to: <foo@example.com>') 2227db96d56Sopenharmony_ci self.assertEqual(channel.socket.last, b'250 OK\r\n') 2237db96d56Sopenharmony_ci 2247db96d56Sopenharmony_ci 2257db96d56Sopenharmony_ciclass TestMailOptionParsing(unittest.TestCase): 2267db96d56Sopenharmony_ci error_response = (b'555 MAIL FROM parameters not recognized or not ' 2277db96d56Sopenharmony_ci b'implemented\r\n') 2287db96d56Sopenharmony_ci 2297db96d56Sopenharmony_ci def setUp(self): 2307db96d56Sopenharmony_ci smtpd.socket = asyncore.socket = mock_socket 2317db96d56Sopenharmony_ci self.old_debugstream = smtpd.DEBUGSTREAM 2327db96d56Sopenharmony_ci self.debug = smtpd.DEBUGSTREAM = io.StringIO() 2337db96d56Sopenharmony_ci 2347db96d56Sopenharmony_ci def tearDown(self): 2357db96d56Sopenharmony_ci asyncore.close_all() 2367db96d56Sopenharmony_ci asyncore.socket = smtpd.socket = socket 2377db96d56Sopenharmony_ci smtpd.DEBUGSTREAM = self.old_debugstream 2387db96d56Sopenharmony_ci 2397db96d56Sopenharmony_ci def write_line(self, channel, line): 2407db96d56Sopenharmony_ci channel.socket.queue_recv(line) 2417db96d56Sopenharmony_ci channel.handle_read() 2427db96d56Sopenharmony_ci 2437db96d56Sopenharmony_ci def test_with_decode_data_true(self): 2447db96d56Sopenharmony_ci server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True) 2457db96d56Sopenharmony_ci conn, addr = server.accept() 2467db96d56Sopenharmony_ci channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 2477db96d56Sopenharmony_ci self.write_line(channel, b'EHLO example') 2487db96d56Sopenharmony_ci for line in [ 2497db96d56Sopenharmony_ci b'MAIL from: <foo@example.com> size=20 SMTPUTF8', 2507db96d56Sopenharmony_ci b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME', 2517db96d56Sopenharmony_ci b'MAIL from: <foo@example.com> size=20 BODY=UNKNOWN', 2527db96d56Sopenharmony_ci b'MAIL from: <foo@example.com> size=20 body=8bitmime', 2537db96d56Sopenharmony_ci ]: 2547db96d56Sopenharmony_ci self.write_line(channel, line) 2557db96d56Sopenharmony_ci self.assertEqual(channel.socket.last, self.error_response) 2567db96d56Sopenharmony_ci self.write_line(channel, b'MAIL from: <foo@example.com> size=20') 2577db96d56Sopenharmony_ci self.assertEqual(channel.socket.last, b'250 OK\r\n') 2587db96d56Sopenharmony_ci 2597db96d56Sopenharmony_ci def test_with_decode_data_false(self): 2607db96d56Sopenharmony_ci server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 2617db96d56Sopenharmony_ci conn, addr = server.accept() 2627db96d56Sopenharmony_ci channel = smtpd.SMTPChannel(server, conn, addr) 2637db96d56Sopenharmony_ci self.write_line(channel, b'EHLO example') 2647db96d56Sopenharmony_ci for line in [ 2657db96d56Sopenharmony_ci b'MAIL from: <foo@example.com> size=20 SMTPUTF8', 2667db96d56Sopenharmony_ci b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=8BITMIME', 2677db96d56Sopenharmony_ci ]: 2687db96d56Sopenharmony_ci self.write_line(channel, line) 2697db96d56Sopenharmony_ci self.assertEqual(channel.socket.last, self.error_response) 2707db96d56Sopenharmony_ci self.write_line( 2717db96d56Sopenharmony_ci channel, 2727db96d56Sopenharmony_ci b'MAIL from: <foo@example.com> size=20 SMTPUTF8 BODY=UNKNOWN') 2737db96d56Sopenharmony_ci self.assertEqual( 2747db96d56Sopenharmony_ci channel.socket.last, 2757db96d56Sopenharmony_ci b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n') 2767db96d56Sopenharmony_ci self.write_line( 2777db96d56Sopenharmony_ci channel, b'MAIL from: <foo@example.com> size=20 body=8bitmime') 2787db96d56Sopenharmony_ci self.assertEqual(channel.socket.last, b'250 OK\r\n') 2797db96d56Sopenharmony_ci 2807db96d56Sopenharmony_ci def test_with_enable_smtputf8_true(self): 2817db96d56Sopenharmony_ci server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True) 2827db96d56Sopenharmony_ci conn, addr = server.accept() 2837db96d56Sopenharmony_ci channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 2847db96d56Sopenharmony_ci self.write_line(channel, b'EHLO example') 2857db96d56Sopenharmony_ci self.write_line( 2867db96d56Sopenharmony_ci channel, 2877db96d56Sopenharmony_ci b'MAIL from: <foo@example.com> size=20 body=8bitmime smtputf8') 2887db96d56Sopenharmony_ci self.assertEqual(channel.socket.last, b'250 OK\r\n') 2897db96d56Sopenharmony_ci 2907db96d56Sopenharmony_ci 2917db96d56Sopenharmony_ciclass SMTPDChannelTest(unittest.TestCase): 2927db96d56Sopenharmony_ci def setUp(self): 2937db96d56Sopenharmony_ci smtpd.socket = asyncore.socket = mock_socket 2947db96d56Sopenharmony_ci self.old_debugstream = smtpd.DEBUGSTREAM 2957db96d56Sopenharmony_ci self.debug = smtpd.DEBUGSTREAM = io.StringIO() 2967db96d56Sopenharmony_ci self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 2977db96d56Sopenharmony_ci decode_data=True) 2987db96d56Sopenharmony_ci conn, addr = self.server.accept() 2997db96d56Sopenharmony_ci self.channel = smtpd.SMTPChannel(self.server, conn, addr, 3007db96d56Sopenharmony_ci decode_data=True) 3017db96d56Sopenharmony_ci 3027db96d56Sopenharmony_ci def tearDown(self): 3037db96d56Sopenharmony_ci asyncore.close_all() 3047db96d56Sopenharmony_ci asyncore.socket = smtpd.socket = socket 3057db96d56Sopenharmony_ci smtpd.DEBUGSTREAM = self.old_debugstream 3067db96d56Sopenharmony_ci 3077db96d56Sopenharmony_ci def write_line(self, line): 3087db96d56Sopenharmony_ci self.channel.socket.queue_recv(line) 3097db96d56Sopenharmony_ci self.channel.handle_read() 3107db96d56Sopenharmony_ci 3117db96d56Sopenharmony_ci def test_broken_connect(self): 3127db96d56Sopenharmony_ci self.assertRaises( 3137db96d56Sopenharmony_ci DummyDispatcherBroken, BrokenDummyServer, 3147db96d56Sopenharmony_ci (socket_helper.HOST, 0), ('b', 0), decode_data=True) 3157db96d56Sopenharmony_ci 3167db96d56Sopenharmony_ci def test_decode_data_and_enable_SMTPUTF8_raises(self): 3177db96d56Sopenharmony_ci self.assertRaises( 3187db96d56Sopenharmony_ci ValueError, smtpd.SMTPChannel, 3197db96d56Sopenharmony_ci self.server, self.channel.conn, self.channel.addr, 3207db96d56Sopenharmony_ci enable_SMTPUTF8=True, decode_data=True) 3217db96d56Sopenharmony_ci 3227db96d56Sopenharmony_ci def test_server_accept(self): 3237db96d56Sopenharmony_ci self.server.handle_accept() 3247db96d56Sopenharmony_ci 3257db96d56Sopenharmony_ci def test_missing_data(self): 3267db96d56Sopenharmony_ci self.write_line(b'') 3277db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3287db96d56Sopenharmony_ci b'500 Error: bad syntax\r\n') 3297db96d56Sopenharmony_ci 3307db96d56Sopenharmony_ci def test_EHLO(self): 3317db96d56Sopenharmony_ci self.write_line(b'EHLO example') 3327db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 HELP\r\n') 3337db96d56Sopenharmony_ci 3347db96d56Sopenharmony_ci def test_EHLO_bad_syntax(self): 3357db96d56Sopenharmony_ci self.write_line(b'EHLO') 3367db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3377db96d56Sopenharmony_ci b'501 Syntax: EHLO hostname\r\n') 3387db96d56Sopenharmony_ci 3397db96d56Sopenharmony_ci def test_EHLO_duplicate(self): 3407db96d56Sopenharmony_ci self.write_line(b'EHLO example') 3417db96d56Sopenharmony_ci self.write_line(b'EHLO example') 3427db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3437db96d56Sopenharmony_ci b'503 Duplicate HELO/EHLO\r\n') 3447db96d56Sopenharmony_ci 3457db96d56Sopenharmony_ci def test_EHLO_HELO_duplicate(self): 3467db96d56Sopenharmony_ci self.write_line(b'EHLO example') 3477db96d56Sopenharmony_ci self.write_line(b'HELO example') 3487db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3497db96d56Sopenharmony_ci b'503 Duplicate HELO/EHLO\r\n') 3507db96d56Sopenharmony_ci 3517db96d56Sopenharmony_ci def test_HELO(self): 3527db96d56Sopenharmony_ci name = smtpd.socket.getfqdn() 3537db96d56Sopenharmony_ci self.write_line(b'HELO example') 3547db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3557db96d56Sopenharmony_ci '250 {}\r\n'.format(name).encode('ascii')) 3567db96d56Sopenharmony_ci 3577db96d56Sopenharmony_ci def test_HELO_EHLO_duplicate(self): 3587db96d56Sopenharmony_ci self.write_line(b'HELO example') 3597db96d56Sopenharmony_ci self.write_line(b'EHLO example') 3607db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3617db96d56Sopenharmony_ci b'503 Duplicate HELO/EHLO\r\n') 3627db96d56Sopenharmony_ci 3637db96d56Sopenharmony_ci def test_HELP(self): 3647db96d56Sopenharmony_ci self.write_line(b'HELP') 3657db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3667db96d56Sopenharmony_ci b'250 Supported commands: EHLO HELO MAIL RCPT ' + \ 3677db96d56Sopenharmony_ci b'DATA RSET NOOP QUIT VRFY\r\n') 3687db96d56Sopenharmony_ci 3697db96d56Sopenharmony_ci def test_HELP_command(self): 3707db96d56Sopenharmony_ci self.write_line(b'HELP MAIL') 3717db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3727db96d56Sopenharmony_ci b'250 Syntax: MAIL FROM: <address>\r\n') 3737db96d56Sopenharmony_ci 3747db96d56Sopenharmony_ci def test_HELP_command_unknown(self): 3757db96d56Sopenharmony_ci self.write_line(b'HELP SPAM') 3767db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3777db96d56Sopenharmony_ci b'501 Supported commands: EHLO HELO MAIL RCPT ' + \ 3787db96d56Sopenharmony_ci b'DATA RSET NOOP QUIT VRFY\r\n') 3797db96d56Sopenharmony_ci 3807db96d56Sopenharmony_ci def test_HELO_bad_syntax(self): 3817db96d56Sopenharmony_ci self.write_line(b'HELO') 3827db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3837db96d56Sopenharmony_ci b'501 Syntax: HELO hostname\r\n') 3847db96d56Sopenharmony_ci 3857db96d56Sopenharmony_ci def test_HELO_duplicate(self): 3867db96d56Sopenharmony_ci self.write_line(b'HELO example') 3877db96d56Sopenharmony_ci self.write_line(b'HELO example') 3887db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3897db96d56Sopenharmony_ci b'503 Duplicate HELO/EHLO\r\n') 3907db96d56Sopenharmony_ci 3917db96d56Sopenharmony_ci def test_HELO_parameter_rejected_when_extensions_not_enabled(self): 3927db96d56Sopenharmony_ci self.extended_smtp = False 3937db96d56Sopenharmony_ci self.write_line(b'HELO example') 3947db96d56Sopenharmony_ci self.write_line(b'MAIL from:<foo@example.com> SIZE=1234') 3957db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 3967db96d56Sopenharmony_ci b'501 Syntax: MAIL FROM: <address>\r\n') 3977db96d56Sopenharmony_ci 3987db96d56Sopenharmony_ci def test_MAIL_allows_space_after_colon(self): 3997db96d56Sopenharmony_ci self.write_line(b'HELO example') 4007db96d56Sopenharmony_ci self.write_line(b'MAIL from: <foo@example.com>') 4017db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 4027db96d56Sopenharmony_ci b'250 OK\r\n') 4037db96d56Sopenharmony_ci 4047db96d56Sopenharmony_ci def test_extended_MAIL_allows_space_after_colon(self): 4057db96d56Sopenharmony_ci self.write_line(b'EHLO example') 4067db96d56Sopenharmony_ci self.write_line(b'MAIL from: <foo@example.com> size=20') 4077db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 4087db96d56Sopenharmony_ci b'250 OK\r\n') 4097db96d56Sopenharmony_ci 4107db96d56Sopenharmony_ci def test_NOOP(self): 4117db96d56Sopenharmony_ci self.write_line(b'NOOP') 4127db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 4137db96d56Sopenharmony_ci 4147db96d56Sopenharmony_ci def test_HELO_NOOP(self): 4157db96d56Sopenharmony_ci self.write_line(b'HELO example') 4167db96d56Sopenharmony_ci self.write_line(b'NOOP') 4177db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 4187db96d56Sopenharmony_ci 4197db96d56Sopenharmony_ci def test_NOOP_bad_syntax(self): 4207db96d56Sopenharmony_ci self.write_line(b'NOOP hi') 4217db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 4227db96d56Sopenharmony_ci b'501 Syntax: NOOP\r\n') 4237db96d56Sopenharmony_ci 4247db96d56Sopenharmony_ci def test_QUIT(self): 4257db96d56Sopenharmony_ci self.write_line(b'QUIT') 4267db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 4277db96d56Sopenharmony_ci 4287db96d56Sopenharmony_ci def test_HELO_QUIT(self): 4297db96d56Sopenharmony_ci self.write_line(b'HELO example') 4307db96d56Sopenharmony_ci self.write_line(b'QUIT') 4317db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 4327db96d56Sopenharmony_ci 4337db96d56Sopenharmony_ci def test_QUIT_arg_ignored(self): 4347db96d56Sopenharmony_ci self.write_line(b'QUIT bye bye') 4357db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 4367db96d56Sopenharmony_ci 4377db96d56Sopenharmony_ci def test_bad_state(self): 4387db96d56Sopenharmony_ci self.channel.smtp_state = 'BAD STATE' 4397db96d56Sopenharmony_ci self.write_line(b'HELO example') 4407db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 4417db96d56Sopenharmony_ci b'451 Internal confusion\r\n') 4427db96d56Sopenharmony_ci 4437db96d56Sopenharmony_ci def test_command_too_long(self): 4447db96d56Sopenharmony_ci self.write_line(b'HELO example') 4457db96d56Sopenharmony_ci self.write_line(b'MAIL from: ' + 4467db96d56Sopenharmony_ci b'a' * self.channel.command_size_limit + 4477db96d56Sopenharmony_ci b'@example') 4487db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 4497db96d56Sopenharmony_ci b'500 Error: line too long\r\n') 4507db96d56Sopenharmony_ci 4517db96d56Sopenharmony_ci def test_MAIL_command_limit_extended_with_SIZE(self): 4527db96d56Sopenharmony_ci self.write_line(b'EHLO example') 4537db96d56Sopenharmony_ci fill_len = self.channel.command_size_limit - len('MAIL from:<@example>') 4547db96d56Sopenharmony_ci self.write_line(b'MAIL from:<' + 4557db96d56Sopenharmony_ci b'a' * fill_len + 4567db96d56Sopenharmony_ci b'@example> SIZE=1234') 4577db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 4587db96d56Sopenharmony_ci 4597db96d56Sopenharmony_ci self.write_line(b'MAIL from:<' + 4607db96d56Sopenharmony_ci b'a' * (fill_len + 26) + 4617db96d56Sopenharmony_ci b'@example> SIZE=1234') 4627db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 4637db96d56Sopenharmony_ci b'500 Error: line too long\r\n') 4647db96d56Sopenharmony_ci 4657db96d56Sopenharmony_ci def test_MAIL_command_rejects_SMTPUTF8_by_default(self): 4667db96d56Sopenharmony_ci self.write_line(b'EHLO example') 4677db96d56Sopenharmony_ci self.write_line( 4687db96d56Sopenharmony_ci b'MAIL from: <naive@example.com> BODY=8BITMIME SMTPUTF8') 4697db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:1], b'5') 4707db96d56Sopenharmony_ci 4717db96d56Sopenharmony_ci def test_data_longer_than_default_data_size_limit(self): 4727db96d56Sopenharmony_ci # Hack the default so we don't have to generate so much data. 4737db96d56Sopenharmony_ci self.channel.data_size_limit = 1048 4747db96d56Sopenharmony_ci self.write_line(b'HELO example') 4757db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 4767db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 4777db96d56Sopenharmony_ci self.write_line(b'DATA') 4787db96d56Sopenharmony_ci self.write_line(b'A' * self.channel.data_size_limit + 4797db96d56Sopenharmony_ci b'A\r\n.') 4807db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 4817db96d56Sopenharmony_ci b'552 Error: Too much mail data\r\n') 4827db96d56Sopenharmony_ci 4837db96d56Sopenharmony_ci def test_MAIL_size_parameter(self): 4847db96d56Sopenharmony_ci self.write_line(b'EHLO example') 4857db96d56Sopenharmony_ci self.write_line(b'MAIL FROM:<eggs@example> SIZE=512') 4867db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 4877db96d56Sopenharmony_ci b'250 OK\r\n') 4887db96d56Sopenharmony_ci 4897db96d56Sopenharmony_ci def test_MAIL_invalid_size_parameter(self): 4907db96d56Sopenharmony_ci self.write_line(b'EHLO example') 4917db96d56Sopenharmony_ci self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid') 4927db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 4937db96d56Sopenharmony_ci b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') 4947db96d56Sopenharmony_ci 4957db96d56Sopenharmony_ci def test_MAIL_RCPT_unknown_parameters(self): 4967db96d56Sopenharmony_ci self.write_line(b'EHLO example') 4977db96d56Sopenharmony_ci self.write_line(b'MAIL FROM:<eggs@example> ham=green') 4987db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 4997db96d56Sopenharmony_ci b'555 MAIL FROM parameters not recognized or not implemented\r\n') 5007db96d56Sopenharmony_ci 5017db96d56Sopenharmony_ci self.write_line(b'MAIL FROM:<eggs@example>') 5027db96d56Sopenharmony_ci self.write_line(b'RCPT TO:<eggs@example> ham=green') 5037db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 5047db96d56Sopenharmony_ci b'555 RCPT TO parameters not recognized or not implemented\r\n') 5057db96d56Sopenharmony_ci 5067db96d56Sopenharmony_ci def test_MAIL_size_parameter_larger_than_default_data_size_limit(self): 5077db96d56Sopenharmony_ci self.channel.data_size_limit = 1048 5087db96d56Sopenharmony_ci self.write_line(b'EHLO example') 5097db96d56Sopenharmony_ci self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096') 5107db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 5117db96d56Sopenharmony_ci b'552 Error: message size exceeds fixed maximum message size\r\n') 5127db96d56Sopenharmony_ci 5137db96d56Sopenharmony_ci def test_need_MAIL(self): 5147db96d56Sopenharmony_ci self.write_line(b'HELO example') 5157db96d56Sopenharmony_ci self.write_line(b'RCPT to:spam@example') 5167db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 5177db96d56Sopenharmony_ci b'503 Error: need MAIL command\r\n') 5187db96d56Sopenharmony_ci 5197db96d56Sopenharmony_ci def test_MAIL_syntax_HELO(self): 5207db96d56Sopenharmony_ci self.write_line(b'HELO example') 5217db96d56Sopenharmony_ci self.write_line(b'MAIL from eggs@example') 5227db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 5237db96d56Sopenharmony_ci b'501 Syntax: MAIL FROM: <address>\r\n') 5247db96d56Sopenharmony_ci 5257db96d56Sopenharmony_ci def test_MAIL_syntax_EHLO(self): 5267db96d56Sopenharmony_ci self.write_line(b'EHLO example') 5277db96d56Sopenharmony_ci self.write_line(b'MAIL from eggs@example') 5287db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 5297db96d56Sopenharmony_ci b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') 5307db96d56Sopenharmony_ci 5317db96d56Sopenharmony_ci def test_MAIL_missing_address(self): 5327db96d56Sopenharmony_ci self.write_line(b'HELO example') 5337db96d56Sopenharmony_ci self.write_line(b'MAIL from:') 5347db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 5357db96d56Sopenharmony_ci b'501 Syntax: MAIL FROM: <address>\r\n') 5367db96d56Sopenharmony_ci 5377db96d56Sopenharmony_ci def test_MAIL_chevrons(self): 5387db96d56Sopenharmony_ci self.write_line(b'HELO example') 5397db96d56Sopenharmony_ci self.write_line(b'MAIL from:<eggs@example>') 5407db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 5417db96d56Sopenharmony_ci 5427db96d56Sopenharmony_ci def test_MAIL_empty_chevrons(self): 5437db96d56Sopenharmony_ci self.write_line(b'EHLO example') 5447db96d56Sopenharmony_ci self.write_line(b'MAIL from:<>') 5457db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 5467db96d56Sopenharmony_ci 5477db96d56Sopenharmony_ci def test_MAIL_quoted_localpart(self): 5487db96d56Sopenharmony_ci self.write_line(b'EHLO example') 5497db96d56Sopenharmony_ci self.write_line(b'MAIL from: <"Fred Blogs"@example.com>') 5507db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 5517db96d56Sopenharmony_ci self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 5527db96d56Sopenharmony_ci 5537db96d56Sopenharmony_ci def test_MAIL_quoted_localpart_no_angles(self): 5547db96d56Sopenharmony_ci self.write_line(b'EHLO example') 5557db96d56Sopenharmony_ci self.write_line(b'MAIL from: "Fred Blogs"@example.com') 5567db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 5577db96d56Sopenharmony_ci self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 5587db96d56Sopenharmony_ci 5597db96d56Sopenharmony_ci def test_MAIL_quoted_localpart_with_size(self): 5607db96d56Sopenharmony_ci self.write_line(b'EHLO example') 5617db96d56Sopenharmony_ci self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000') 5627db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 5637db96d56Sopenharmony_ci self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 5647db96d56Sopenharmony_ci 5657db96d56Sopenharmony_ci def test_MAIL_quoted_localpart_with_size_no_angles(self): 5667db96d56Sopenharmony_ci self.write_line(b'EHLO example') 5677db96d56Sopenharmony_ci self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000') 5687db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 5697db96d56Sopenharmony_ci self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 5707db96d56Sopenharmony_ci 5717db96d56Sopenharmony_ci def test_nested_MAIL(self): 5727db96d56Sopenharmony_ci self.write_line(b'HELO example') 5737db96d56Sopenharmony_ci self.write_line(b'MAIL from:eggs@example') 5747db96d56Sopenharmony_ci self.write_line(b'MAIL from:spam@example') 5757db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 5767db96d56Sopenharmony_ci b'503 Error: nested MAIL command\r\n') 5777db96d56Sopenharmony_ci 5787db96d56Sopenharmony_ci def test_VRFY(self): 5797db96d56Sopenharmony_ci self.write_line(b'VRFY eggs@example') 5807db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 5817db96d56Sopenharmony_ci b'252 Cannot VRFY user, but will accept message and attempt ' + \ 5827db96d56Sopenharmony_ci b'delivery\r\n') 5837db96d56Sopenharmony_ci 5847db96d56Sopenharmony_ci def test_VRFY_syntax(self): 5857db96d56Sopenharmony_ci self.write_line(b'VRFY') 5867db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 5877db96d56Sopenharmony_ci b'501 Syntax: VRFY <address>\r\n') 5887db96d56Sopenharmony_ci 5897db96d56Sopenharmony_ci def test_EXPN_not_implemented(self): 5907db96d56Sopenharmony_ci self.write_line(b'EXPN') 5917db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 5927db96d56Sopenharmony_ci b'502 EXPN not implemented\r\n') 5937db96d56Sopenharmony_ci 5947db96d56Sopenharmony_ci def test_no_HELO_MAIL(self): 5957db96d56Sopenharmony_ci self.write_line(b'MAIL from:<foo@example.com>') 5967db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 5977db96d56Sopenharmony_ci b'503 Error: send HELO first\r\n') 5987db96d56Sopenharmony_ci 5997db96d56Sopenharmony_ci def test_need_RCPT(self): 6007db96d56Sopenharmony_ci self.write_line(b'HELO example') 6017db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 6027db96d56Sopenharmony_ci self.write_line(b'DATA') 6037db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 6047db96d56Sopenharmony_ci b'503 Error: need RCPT command\r\n') 6057db96d56Sopenharmony_ci 6067db96d56Sopenharmony_ci def test_RCPT_syntax_HELO(self): 6077db96d56Sopenharmony_ci self.write_line(b'HELO example') 6087db96d56Sopenharmony_ci self.write_line(b'MAIL From: eggs@example') 6097db96d56Sopenharmony_ci self.write_line(b'RCPT to eggs@example') 6107db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 6117db96d56Sopenharmony_ci b'501 Syntax: RCPT TO: <address>\r\n') 6127db96d56Sopenharmony_ci 6137db96d56Sopenharmony_ci def test_RCPT_syntax_EHLO(self): 6147db96d56Sopenharmony_ci self.write_line(b'EHLO example') 6157db96d56Sopenharmony_ci self.write_line(b'MAIL From: eggs@example') 6167db96d56Sopenharmony_ci self.write_line(b'RCPT to eggs@example') 6177db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 6187db96d56Sopenharmony_ci b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n') 6197db96d56Sopenharmony_ci 6207db96d56Sopenharmony_ci def test_RCPT_lowercase_to_OK(self): 6217db96d56Sopenharmony_ci self.write_line(b'HELO example') 6227db96d56Sopenharmony_ci self.write_line(b'MAIL From: eggs@example') 6237db96d56Sopenharmony_ci self.write_line(b'RCPT to: <eggs@example>') 6247db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 6257db96d56Sopenharmony_ci 6267db96d56Sopenharmony_ci def test_no_HELO_RCPT(self): 6277db96d56Sopenharmony_ci self.write_line(b'RCPT to eggs@example') 6287db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 6297db96d56Sopenharmony_ci b'503 Error: send HELO first\r\n') 6307db96d56Sopenharmony_ci 6317db96d56Sopenharmony_ci def test_data_dialog(self): 6327db96d56Sopenharmony_ci self.write_line(b'HELO example') 6337db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 6347db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 6357db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 6367db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 6377db96d56Sopenharmony_ci 6387db96d56Sopenharmony_ci self.write_line(b'DATA') 6397db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 6407db96d56Sopenharmony_ci b'354 End data with <CR><LF>.<CR><LF>\r\n') 6417db96d56Sopenharmony_ci self.write_line(b'data\r\nmore\r\n.') 6427db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 6437db96d56Sopenharmony_ci self.assertEqual(self.server.messages, 6447db96d56Sopenharmony_ci [(('peer-address', 'peer-port'), 6457db96d56Sopenharmony_ci 'eggs@example', 6467db96d56Sopenharmony_ci ['spam@example'], 6477db96d56Sopenharmony_ci 'data\nmore')]) 6487db96d56Sopenharmony_ci 6497db96d56Sopenharmony_ci def test_DATA_syntax(self): 6507db96d56Sopenharmony_ci self.write_line(b'HELO example') 6517db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 6527db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 6537db96d56Sopenharmony_ci self.write_line(b'DATA spam') 6547db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n') 6557db96d56Sopenharmony_ci 6567db96d56Sopenharmony_ci def test_no_HELO_DATA(self): 6577db96d56Sopenharmony_ci self.write_line(b'DATA spam') 6587db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 6597db96d56Sopenharmony_ci b'503 Error: send HELO first\r\n') 6607db96d56Sopenharmony_ci 6617db96d56Sopenharmony_ci def test_data_transparency_section_4_5_2(self): 6627db96d56Sopenharmony_ci self.write_line(b'HELO example') 6637db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 6647db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 6657db96d56Sopenharmony_ci self.write_line(b'DATA') 6667db96d56Sopenharmony_ci self.write_line(b'..\r\n.\r\n') 6677db96d56Sopenharmony_ci self.assertEqual(self.channel.received_data, '.') 6687db96d56Sopenharmony_ci 6697db96d56Sopenharmony_ci def test_multiple_RCPT(self): 6707db96d56Sopenharmony_ci self.write_line(b'HELO example') 6717db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 6727db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 6737db96d56Sopenharmony_ci self.write_line(b'RCPT To:ham@example') 6747db96d56Sopenharmony_ci self.write_line(b'DATA') 6757db96d56Sopenharmony_ci self.write_line(b'data\r\n.') 6767db96d56Sopenharmony_ci self.assertEqual(self.server.messages, 6777db96d56Sopenharmony_ci [(('peer-address', 'peer-port'), 6787db96d56Sopenharmony_ci 'eggs@example', 6797db96d56Sopenharmony_ci ['spam@example','ham@example'], 6807db96d56Sopenharmony_ci 'data')]) 6817db96d56Sopenharmony_ci 6827db96d56Sopenharmony_ci def test_manual_status(self): 6837db96d56Sopenharmony_ci # checks that the Channel is able to return a custom status message 6847db96d56Sopenharmony_ci self.write_line(b'HELO example') 6857db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 6867db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 6877db96d56Sopenharmony_ci self.write_line(b'DATA') 6887db96d56Sopenharmony_ci self.write_line(b'return status\r\n.') 6897db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 Okish\r\n') 6907db96d56Sopenharmony_ci 6917db96d56Sopenharmony_ci def test_RSET(self): 6927db96d56Sopenharmony_ci self.write_line(b'HELO example') 6937db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 6947db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 6957db96d56Sopenharmony_ci self.write_line(b'RSET') 6967db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 6977db96d56Sopenharmony_ci self.write_line(b'MAIL From:foo@example') 6987db96d56Sopenharmony_ci self.write_line(b'RCPT To:eggs@example') 6997db96d56Sopenharmony_ci self.write_line(b'DATA') 7007db96d56Sopenharmony_ci self.write_line(b'data\r\n.') 7017db96d56Sopenharmony_ci self.assertEqual(self.server.messages, 7027db96d56Sopenharmony_ci [(('peer-address', 'peer-port'), 7037db96d56Sopenharmony_ci 'foo@example', 7047db96d56Sopenharmony_ci ['eggs@example'], 7057db96d56Sopenharmony_ci 'data')]) 7067db96d56Sopenharmony_ci 7077db96d56Sopenharmony_ci def test_HELO_RSET(self): 7087db96d56Sopenharmony_ci self.write_line(b'HELO example') 7097db96d56Sopenharmony_ci self.write_line(b'RSET') 7107db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 7117db96d56Sopenharmony_ci 7127db96d56Sopenharmony_ci def test_RSET_syntax(self): 7137db96d56Sopenharmony_ci self.write_line(b'RSET hi') 7147db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n') 7157db96d56Sopenharmony_ci 7167db96d56Sopenharmony_ci def test_unknown_command(self): 7177db96d56Sopenharmony_ci self.write_line(b'UNKNOWN_CMD') 7187db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 7197db96d56Sopenharmony_ci b'500 Error: command "UNKNOWN_CMD" not ' + \ 7207db96d56Sopenharmony_ci b'recognized\r\n') 7217db96d56Sopenharmony_ci 7227db96d56Sopenharmony_ci def test_attribute_deprecations(self): 7237db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7247db96d56Sopenharmony_ci spam = self.channel._SMTPChannel__server 7257db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7267db96d56Sopenharmony_ci self.channel._SMTPChannel__server = 'spam' 7277db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7287db96d56Sopenharmony_ci spam = self.channel._SMTPChannel__line 7297db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7307db96d56Sopenharmony_ci self.channel._SMTPChannel__line = 'spam' 7317db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7327db96d56Sopenharmony_ci spam = self.channel._SMTPChannel__state 7337db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7347db96d56Sopenharmony_ci self.channel._SMTPChannel__state = 'spam' 7357db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7367db96d56Sopenharmony_ci spam = self.channel._SMTPChannel__greeting 7377db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7387db96d56Sopenharmony_ci self.channel._SMTPChannel__greeting = 'spam' 7397db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7407db96d56Sopenharmony_ci spam = self.channel._SMTPChannel__mailfrom 7417db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7427db96d56Sopenharmony_ci self.channel._SMTPChannel__mailfrom = 'spam' 7437db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7447db96d56Sopenharmony_ci spam = self.channel._SMTPChannel__rcpttos 7457db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7467db96d56Sopenharmony_ci self.channel._SMTPChannel__rcpttos = 'spam' 7477db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7487db96d56Sopenharmony_ci spam = self.channel._SMTPChannel__data 7497db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7507db96d56Sopenharmony_ci self.channel._SMTPChannel__data = 'spam' 7517db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7527db96d56Sopenharmony_ci spam = self.channel._SMTPChannel__fqdn 7537db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7547db96d56Sopenharmony_ci self.channel._SMTPChannel__fqdn = 'spam' 7557db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7567db96d56Sopenharmony_ci spam = self.channel._SMTPChannel__peer 7577db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7587db96d56Sopenharmony_ci self.channel._SMTPChannel__peer = 'spam' 7597db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7607db96d56Sopenharmony_ci spam = self.channel._SMTPChannel__conn 7617db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7627db96d56Sopenharmony_ci self.channel._SMTPChannel__conn = 'spam' 7637db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7647db96d56Sopenharmony_ci spam = self.channel._SMTPChannel__addr 7657db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 7667db96d56Sopenharmony_ci self.channel._SMTPChannel__addr = 'spam' 7677db96d56Sopenharmony_ci 7687db96d56Sopenharmony_ci@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") 7697db96d56Sopenharmony_ciclass SMTPDChannelIPv6Test(SMTPDChannelTest): 7707db96d56Sopenharmony_ci def setUp(self): 7717db96d56Sopenharmony_ci smtpd.socket = asyncore.socket = mock_socket 7727db96d56Sopenharmony_ci self.old_debugstream = smtpd.DEBUGSTREAM 7737db96d56Sopenharmony_ci self.debug = smtpd.DEBUGSTREAM = io.StringIO() 7747db96d56Sopenharmony_ci self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0), 7757db96d56Sopenharmony_ci decode_data=True) 7767db96d56Sopenharmony_ci conn, addr = self.server.accept() 7777db96d56Sopenharmony_ci self.channel = smtpd.SMTPChannel(self.server, conn, addr, 7787db96d56Sopenharmony_ci decode_data=True) 7797db96d56Sopenharmony_ci 7807db96d56Sopenharmony_ciclass SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): 7817db96d56Sopenharmony_ci 7827db96d56Sopenharmony_ci def setUp(self): 7837db96d56Sopenharmony_ci smtpd.socket = asyncore.socket = mock_socket 7847db96d56Sopenharmony_ci self.old_debugstream = smtpd.DEBUGSTREAM 7857db96d56Sopenharmony_ci self.debug = smtpd.DEBUGSTREAM = io.StringIO() 7867db96d56Sopenharmony_ci self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 7877db96d56Sopenharmony_ci decode_data=True) 7887db96d56Sopenharmony_ci conn, addr = self.server.accept() 7897db96d56Sopenharmony_ci # Set DATA size limit to 32 bytes for easy testing 7907db96d56Sopenharmony_ci self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32, 7917db96d56Sopenharmony_ci decode_data=True) 7927db96d56Sopenharmony_ci 7937db96d56Sopenharmony_ci def tearDown(self): 7947db96d56Sopenharmony_ci asyncore.close_all() 7957db96d56Sopenharmony_ci asyncore.socket = smtpd.socket = socket 7967db96d56Sopenharmony_ci smtpd.DEBUGSTREAM = self.old_debugstream 7977db96d56Sopenharmony_ci 7987db96d56Sopenharmony_ci def write_line(self, line): 7997db96d56Sopenharmony_ci self.channel.socket.queue_recv(line) 8007db96d56Sopenharmony_ci self.channel.handle_read() 8017db96d56Sopenharmony_ci 8027db96d56Sopenharmony_ci def test_data_limit_dialog(self): 8037db96d56Sopenharmony_ci self.write_line(b'HELO example') 8047db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 8057db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 8067db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 8077db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 8087db96d56Sopenharmony_ci 8097db96d56Sopenharmony_ci self.write_line(b'DATA') 8107db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 8117db96d56Sopenharmony_ci b'354 End data with <CR><LF>.<CR><LF>\r\n') 8127db96d56Sopenharmony_ci self.write_line(b'data\r\nmore\r\n.') 8137db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 8147db96d56Sopenharmony_ci self.assertEqual(self.server.messages, 8157db96d56Sopenharmony_ci [(('peer-address', 'peer-port'), 8167db96d56Sopenharmony_ci 'eggs@example', 8177db96d56Sopenharmony_ci ['spam@example'], 8187db96d56Sopenharmony_ci 'data\nmore')]) 8197db96d56Sopenharmony_ci 8207db96d56Sopenharmony_ci def test_data_limit_dialog_too_much_data(self): 8217db96d56Sopenharmony_ci self.write_line(b'HELO example') 8227db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 8237db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 8247db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 8257db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 8267db96d56Sopenharmony_ci 8277db96d56Sopenharmony_ci self.write_line(b'DATA') 8287db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 8297db96d56Sopenharmony_ci b'354 End data with <CR><LF>.<CR><LF>\r\n') 8307db96d56Sopenharmony_ci self.write_line(b'This message is longer than 32 bytes\r\n.') 8317db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 8327db96d56Sopenharmony_ci b'552 Error: Too much mail data\r\n') 8337db96d56Sopenharmony_ci 8347db96d56Sopenharmony_ci 8357db96d56Sopenharmony_ciclass SMTPDChannelWithDecodeDataFalse(unittest.TestCase): 8367db96d56Sopenharmony_ci 8377db96d56Sopenharmony_ci def setUp(self): 8387db96d56Sopenharmony_ci smtpd.socket = asyncore.socket = mock_socket 8397db96d56Sopenharmony_ci self.old_debugstream = smtpd.DEBUGSTREAM 8407db96d56Sopenharmony_ci self.debug = smtpd.DEBUGSTREAM = io.StringIO() 8417db96d56Sopenharmony_ci self.server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 8427db96d56Sopenharmony_ci conn, addr = self.server.accept() 8437db96d56Sopenharmony_ci self.channel = smtpd.SMTPChannel(self.server, conn, addr) 8447db96d56Sopenharmony_ci 8457db96d56Sopenharmony_ci def tearDown(self): 8467db96d56Sopenharmony_ci asyncore.close_all() 8477db96d56Sopenharmony_ci asyncore.socket = smtpd.socket = socket 8487db96d56Sopenharmony_ci smtpd.DEBUGSTREAM = self.old_debugstream 8497db96d56Sopenharmony_ci 8507db96d56Sopenharmony_ci def write_line(self, line): 8517db96d56Sopenharmony_ci self.channel.socket.queue_recv(line) 8527db96d56Sopenharmony_ci self.channel.handle_read() 8537db96d56Sopenharmony_ci 8547db96d56Sopenharmony_ci def test_ascii_data(self): 8557db96d56Sopenharmony_ci self.write_line(b'HELO example') 8567db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 8577db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 8587db96d56Sopenharmony_ci self.write_line(b'DATA') 8597db96d56Sopenharmony_ci self.write_line(b'plain ascii text') 8607db96d56Sopenharmony_ci self.write_line(b'.') 8617db96d56Sopenharmony_ci self.assertEqual(self.channel.received_data, b'plain ascii text') 8627db96d56Sopenharmony_ci 8637db96d56Sopenharmony_ci def test_utf8_data(self): 8647db96d56Sopenharmony_ci self.write_line(b'HELO example') 8657db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 8667db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 8677db96d56Sopenharmony_ci self.write_line(b'DATA') 8687db96d56Sopenharmony_ci self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 8697db96d56Sopenharmony_ci self.write_line(b'and some plain ascii') 8707db96d56Sopenharmony_ci self.write_line(b'.') 8717db96d56Sopenharmony_ci self.assertEqual( 8727db96d56Sopenharmony_ci self.channel.received_data, 8737db96d56Sopenharmony_ci b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n' 8747db96d56Sopenharmony_ci b'and some plain ascii') 8757db96d56Sopenharmony_ci 8767db96d56Sopenharmony_ci 8777db96d56Sopenharmony_ciclass SMTPDChannelWithDecodeDataTrue(unittest.TestCase): 8787db96d56Sopenharmony_ci 8797db96d56Sopenharmony_ci def setUp(self): 8807db96d56Sopenharmony_ci smtpd.socket = asyncore.socket = mock_socket 8817db96d56Sopenharmony_ci self.old_debugstream = smtpd.DEBUGSTREAM 8827db96d56Sopenharmony_ci self.debug = smtpd.DEBUGSTREAM = io.StringIO() 8837db96d56Sopenharmony_ci self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 8847db96d56Sopenharmony_ci decode_data=True) 8857db96d56Sopenharmony_ci conn, addr = self.server.accept() 8867db96d56Sopenharmony_ci # Set decode_data to True 8877db96d56Sopenharmony_ci self.channel = smtpd.SMTPChannel(self.server, conn, addr, 8887db96d56Sopenharmony_ci decode_data=True) 8897db96d56Sopenharmony_ci 8907db96d56Sopenharmony_ci def tearDown(self): 8917db96d56Sopenharmony_ci asyncore.close_all() 8927db96d56Sopenharmony_ci asyncore.socket = smtpd.socket = socket 8937db96d56Sopenharmony_ci smtpd.DEBUGSTREAM = self.old_debugstream 8947db96d56Sopenharmony_ci 8957db96d56Sopenharmony_ci def write_line(self, line): 8967db96d56Sopenharmony_ci self.channel.socket.queue_recv(line) 8977db96d56Sopenharmony_ci self.channel.handle_read() 8987db96d56Sopenharmony_ci 8997db96d56Sopenharmony_ci def test_ascii_data(self): 9007db96d56Sopenharmony_ci self.write_line(b'HELO example') 9017db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 9027db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 9037db96d56Sopenharmony_ci self.write_line(b'DATA') 9047db96d56Sopenharmony_ci self.write_line(b'plain ascii text') 9057db96d56Sopenharmony_ci self.write_line(b'.') 9067db96d56Sopenharmony_ci self.assertEqual(self.channel.received_data, 'plain ascii text') 9077db96d56Sopenharmony_ci 9087db96d56Sopenharmony_ci def test_utf8_data(self): 9097db96d56Sopenharmony_ci self.write_line(b'HELO example') 9107db96d56Sopenharmony_ci self.write_line(b'MAIL From:eggs@example') 9117db96d56Sopenharmony_ci self.write_line(b'RCPT To:spam@example') 9127db96d56Sopenharmony_ci self.write_line(b'DATA') 9137db96d56Sopenharmony_ci self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 9147db96d56Sopenharmony_ci self.write_line(b'and some plain ascii') 9157db96d56Sopenharmony_ci self.write_line(b'.') 9167db96d56Sopenharmony_ci self.assertEqual( 9177db96d56Sopenharmony_ci self.channel.received_data, 9187db96d56Sopenharmony_ci 'utf8 enriched text: żźć\nand some plain ascii') 9197db96d56Sopenharmony_ci 9207db96d56Sopenharmony_ci 9217db96d56Sopenharmony_ciclass SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase): 9227db96d56Sopenharmony_ci def setUp(self): 9237db96d56Sopenharmony_ci smtpd.socket = asyncore.socket = mock_socket 9247db96d56Sopenharmony_ci self.old_debugstream = smtpd.DEBUGSTREAM 9257db96d56Sopenharmony_ci self.debug = smtpd.DEBUGSTREAM = io.StringIO() 9267db96d56Sopenharmony_ci self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 9277db96d56Sopenharmony_ci enable_SMTPUTF8=True) 9287db96d56Sopenharmony_ci conn, addr = self.server.accept() 9297db96d56Sopenharmony_ci self.channel = smtpd.SMTPChannel(self.server, conn, addr, 9307db96d56Sopenharmony_ci enable_SMTPUTF8=True) 9317db96d56Sopenharmony_ci 9327db96d56Sopenharmony_ci def tearDown(self): 9337db96d56Sopenharmony_ci asyncore.close_all() 9347db96d56Sopenharmony_ci asyncore.socket = smtpd.socket = socket 9357db96d56Sopenharmony_ci smtpd.DEBUGSTREAM = self.old_debugstream 9367db96d56Sopenharmony_ci 9377db96d56Sopenharmony_ci def write_line(self, line): 9387db96d56Sopenharmony_ci self.channel.socket.queue_recv(line) 9397db96d56Sopenharmony_ci self.channel.handle_read() 9407db96d56Sopenharmony_ci 9417db96d56Sopenharmony_ci def test_MAIL_command_accepts_SMTPUTF8_when_announced(self): 9427db96d56Sopenharmony_ci self.write_line(b'EHLO example') 9437db96d56Sopenharmony_ci self.write_line( 9447db96d56Sopenharmony_ci 'MAIL from: <naïve@example.com> BODY=8BITMIME SMTPUTF8'.encode( 9457db96d56Sopenharmony_ci 'utf-8') 9467db96d56Sopenharmony_ci ) 9477db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 9487db96d56Sopenharmony_ci 9497db96d56Sopenharmony_ci def test_process_smtputf8_message(self): 9507db96d56Sopenharmony_ci self.write_line(b'EHLO example') 9517db96d56Sopenharmony_ci for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']: 9527db96d56Sopenharmony_ci self.write_line(b'MAIL from: <a@example> ' + mail_parameters) 9537db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:3], b'250') 9547db96d56Sopenharmony_ci self.write_line(b'rcpt to:<b@example.com>') 9557db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:3], b'250') 9567db96d56Sopenharmony_ci self.write_line(b'data') 9577db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:3], b'354') 9587db96d56Sopenharmony_ci self.write_line(b'c\r\n.') 9597db96d56Sopenharmony_ci if mail_parameters == b'': 9607db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 9617db96d56Sopenharmony_ci else: 9627db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 9637db96d56Sopenharmony_ci b'250 SMTPUTF8 message okish\r\n') 9647db96d56Sopenharmony_ci 9657db96d56Sopenharmony_ci def test_utf8_data(self): 9667db96d56Sopenharmony_ci self.write_line(b'EHLO example') 9677db96d56Sopenharmony_ci self.write_line( 9687db96d56Sopenharmony_ci 'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8')) 9697db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:3], b'250') 9707db96d56Sopenharmony_ci self.write_line('RCPT To:späm@examplé'.encode('utf-8')) 9717db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:3], b'250') 9727db96d56Sopenharmony_ci self.write_line(b'DATA') 9737db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:3], b'354') 9747db96d56Sopenharmony_ci self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 9757db96d56Sopenharmony_ci self.write_line(b'.') 9767db96d56Sopenharmony_ci self.assertEqual( 9777db96d56Sopenharmony_ci self.channel.received_data, 9787db96d56Sopenharmony_ci b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 9797db96d56Sopenharmony_ci 9807db96d56Sopenharmony_ci def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self): 9817db96d56Sopenharmony_ci self.write_line(b'ehlo example') 9827db96d56Sopenharmony_ci fill_len = (512 + 26 + 10) - len('mail from:<@example>') 9837db96d56Sopenharmony_ci self.write_line(b'MAIL from:<' + 9847db96d56Sopenharmony_ci b'a' * (fill_len + 1) + 9857db96d56Sopenharmony_ci b'@example>') 9867db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, 9877db96d56Sopenharmony_ci b'500 Error: line too long\r\n') 9887db96d56Sopenharmony_ci self.write_line(b'MAIL from:<' + 9897db96d56Sopenharmony_ci b'a' * fill_len + 9907db96d56Sopenharmony_ci b'@example>') 9917db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 9927db96d56Sopenharmony_ci 9937db96d56Sopenharmony_ci def test_multiple_emails_with_extended_command_length(self): 9947db96d56Sopenharmony_ci self.write_line(b'ehlo example') 9957db96d56Sopenharmony_ci fill_len = (512 + 26 + 10) - len('mail from:<@example>') 9967db96d56Sopenharmony_ci for char in [b'a', b'b', b'c']: 9977db96d56Sopenharmony_ci self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>') 9987db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:3], b'500') 9997db96d56Sopenharmony_ci self.write_line(b'MAIL from:<' + char * fill_len + b'@example>') 10007db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:3], b'250') 10017db96d56Sopenharmony_ci self.write_line(b'rcpt to:<hans@example.com>') 10027db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:3], b'250') 10037db96d56Sopenharmony_ci self.write_line(b'data') 10047db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:3], b'354') 10057db96d56Sopenharmony_ci self.write_line(b'test\r\n.') 10067db96d56Sopenharmony_ci self.assertEqual(self.channel.socket.last[0:3], b'250') 10077db96d56Sopenharmony_ci 10087db96d56Sopenharmony_ci 10097db96d56Sopenharmony_ciclass MiscTestCase(unittest.TestCase): 10107db96d56Sopenharmony_ci def test__all__(self): 10117db96d56Sopenharmony_ci not_exported = { 10127db96d56Sopenharmony_ci "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE", 10137db96d56Sopenharmony_ci "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs", 10147db96d56Sopenharmony_ci } 10157db96d56Sopenharmony_ci support.check__all__(self, smtpd, not_exported=not_exported) 10167db96d56Sopenharmony_ci 10177db96d56Sopenharmony_ci 10187db96d56Sopenharmony_ciif __name__ == "__main__": 10197db96d56Sopenharmony_ci unittest.main() 1020