17db96d56Sopenharmony_ci# test asynchat 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_cifrom test import support 47db96d56Sopenharmony_cifrom test.support import socket_helper 57db96d56Sopenharmony_cifrom test.support import threading_helper 67db96d56Sopenharmony_cifrom test.support import warnings_helper 77db96d56Sopenharmony_ci 87db96d56Sopenharmony_ciimport errno 97db96d56Sopenharmony_ciimport socket 107db96d56Sopenharmony_ciimport sys 117db96d56Sopenharmony_ciimport threading 127db96d56Sopenharmony_ciimport time 137db96d56Sopenharmony_ciimport unittest 147db96d56Sopenharmony_ciimport unittest.mock 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_ci 177db96d56Sopenharmony_ciasynchat = warnings_helper.import_deprecated('asynchat') 187db96d56Sopenharmony_ciasyncore = warnings_helper.import_deprecated('asyncore') 197db96d56Sopenharmony_ci 207db96d56Sopenharmony_cisupport.requires_working_socket(module=True) 217db96d56Sopenharmony_ci 227db96d56Sopenharmony_ciHOST = socket_helper.HOST 237db96d56Sopenharmony_ciSERVER_QUIT = b'QUIT\n' 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ciclass echo_server(threading.Thread): 277db96d56Sopenharmony_ci # parameter to determine the number of bytes passed back to the 287db96d56Sopenharmony_ci # client each send 297db96d56Sopenharmony_ci chunk_size = 1 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci def __init__(self, event): 327db96d56Sopenharmony_ci threading.Thread.__init__(self) 337db96d56Sopenharmony_ci self.event = event 347db96d56Sopenharmony_ci self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 357db96d56Sopenharmony_ci self.port = socket_helper.bind_port(self.sock) 367db96d56Sopenharmony_ci # This will be set if the client wants us to wait before echoing 377db96d56Sopenharmony_ci # data back. 387db96d56Sopenharmony_ci self.start_resend_event = None 397db96d56Sopenharmony_ci 407db96d56Sopenharmony_ci def run(self): 417db96d56Sopenharmony_ci self.sock.listen() 427db96d56Sopenharmony_ci self.event.set() 437db96d56Sopenharmony_ci conn, client = self.sock.accept() 447db96d56Sopenharmony_ci self.buffer = b"" 457db96d56Sopenharmony_ci # collect data until quit message is seen 467db96d56Sopenharmony_ci while SERVER_QUIT not in self.buffer: 477db96d56Sopenharmony_ci data = conn.recv(1) 487db96d56Sopenharmony_ci if not data: 497db96d56Sopenharmony_ci break 507db96d56Sopenharmony_ci self.buffer = self.buffer + data 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ci # remove the SERVER_QUIT message 537db96d56Sopenharmony_ci self.buffer = self.buffer.replace(SERVER_QUIT, b'') 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ci if self.start_resend_event: 567db96d56Sopenharmony_ci self.start_resend_event.wait() 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ci # re-send entire set of collected data 597db96d56Sopenharmony_ci try: 607db96d56Sopenharmony_ci # this may fail on some tests, such as test_close_when_done, 617db96d56Sopenharmony_ci # since the client closes the channel when it's done sending 627db96d56Sopenharmony_ci while self.buffer: 637db96d56Sopenharmony_ci n = conn.send(self.buffer[:self.chunk_size]) 647db96d56Sopenharmony_ci time.sleep(0.001) 657db96d56Sopenharmony_ci self.buffer = self.buffer[n:] 667db96d56Sopenharmony_ci except: 677db96d56Sopenharmony_ci pass 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ci conn.close() 707db96d56Sopenharmony_ci self.sock.close() 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ciclass echo_client(asynchat.async_chat): 737db96d56Sopenharmony_ci 747db96d56Sopenharmony_ci def __init__(self, terminator, server_port): 757db96d56Sopenharmony_ci asynchat.async_chat.__init__(self) 767db96d56Sopenharmony_ci self.contents = [] 777db96d56Sopenharmony_ci self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 787db96d56Sopenharmony_ci self.connect((HOST, server_port)) 797db96d56Sopenharmony_ci self.set_terminator(terminator) 807db96d56Sopenharmony_ci self.buffer = b"" 817db96d56Sopenharmony_ci 827db96d56Sopenharmony_ci def handle_connect(self): 837db96d56Sopenharmony_ci pass 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ci if sys.platform == 'darwin': 867db96d56Sopenharmony_ci # select.poll returns a select.POLLHUP at the end of the tests 877db96d56Sopenharmony_ci # on darwin, so just ignore it 887db96d56Sopenharmony_ci def handle_expt(self): 897db96d56Sopenharmony_ci pass 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci def collect_incoming_data(self, data): 927db96d56Sopenharmony_ci self.buffer += data 937db96d56Sopenharmony_ci 947db96d56Sopenharmony_ci def found_terminator(self): 957db96d56Sopenharmony_ci self.contents.append(self.buffer) 967db96d56Sopenharmony_ci self.buffer = b"" 977db96d56Sopenharmony_ci 987db96d56Sopenharmony_cidef start_echo_server(): 997db96d56Sopenharmony_ci event = threading.Event() 1007db96d56Sopenharmony_ci s = echo_server(event) 1017db96d56Sopenharmony_ci s.start() 1027db96d56Sopenharmony_ci event.wait() 1037db96d56Sopenharmony_ci event.clear() 1047db96d56Sopenharmony_ci time.sleep(0.01) # Give server time to start accepting. 1057db96d56Sopenharmony_ci return s, event 1067db96d56Sopenharmony_ci 1077db96d56Sopenharmony_ci 1087db96d56Sopenharmony_ciclass TestAsynchat(unittest.TestCase): 1097db96d56Sopenharmony_ci usepoll = False 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ci def setUp(self): 1127db96d56Sopenharmony_ci self._threads = threading_helper.threading_setup() 1137db96d56Sopenharmony_ci 1147db96d56Sopenharmony_ci def tearDown(self): 1157db96d56Sopenharmony_ci threading_helper.threading_cleanup(*self._threads) 1167db96d56Sopenharmony_ci 1177db96d56Sopenharmony_ci def line_terminator_check(self, term, server_chunk): 1187db96d56Sopenharmony_ci event = threading.Event() 1197db96d56Sopenharmony_ci s = echo_server(event) 1207db96d56Sopenharmony_ci s.chunk_size = server_chunk 1217db96d56Sopenharmony_ci s.start() 1227db96d56Sopenharmony_ci event.wait() 1237db96d56Sopenharmony_ci event.clear() 1247db96d56Sopenharmony_ci time.sleep(0.01) # Give server time to start accepting. 1257db96d56Sopenharmony_ci c = echo_client(term, s.port) 1267db96d56Sopenharmony_ci c.push(b"hello ") 1277db96d56Sopenharmony_ci c.push(b"world" + term) 1287db96d56Sopenharmony_ci c.push(b"I'm not dead yet!" + term) 1297db96d56Sopenharmony_ci c.push(SERVER_QUIT) 1307db96d56Sopenharmony_ci asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 1317db96d56Sopenharmony_ci threading_helper.join_thread(s) 1327db96d56Sopenharmony_ci 1337db96d56Sopenharmony_ci self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) 1347db96d56Sopenharmony_ci 1357db96d56Sopenharmony_ci # the line terminator tests below check receiving variously-sized 1367db96d56Sopenharmony_ci # chunks back from the server in order to exercise all branches of 1377db96d56Sopenharmony_ci # async_chat.handle_read 1387db96d56Sopenharmony_ci 1397db96d56Sopenharmony_ci def test_line_terminator1(self): 1407db96d56Sopenharmony_ci # test one-character terminator 1417db96d56Sopenharmony_ci for l in (1, 2, 3): 1427db96d56Sopenharmony_ci self.line_terminator_check(b'\n', l) 1437db96d56Sopenharmony_ci 1447db96d56Sopenharmony_ci def test_line_terminator2(self): 1457db96d56Sopenharmony_ci # test two-character terminator 1467db96d56Sopenharmony_ci for l in (1, 2, 3): 1477db96d56Sopenharmony_ci self.line_terminator_check(b'\r\n', l) 1487db96d56Sopenharmony_ci 1497db96d56Sopenharmony_ci def test_line_terminator3(self): 1507db96d56Sopenharmony_ci # test three-character terminator 1517db96d56Sopenharmony_ci for l in (1, 2, 3): 1527db96d56Sopenharmony_ci self.line_terminator_check(b'qqq', l) 1537db96d56Sopenharmony_ci 1547db96d56Sopenharmony_ci def numeric_terminator_check(self, termlen): 1557db96d56Sopenharmony_ci # Try reading a fixed number of bytes 1567db96d56Sopenharmony_ci s, event = start_echo_server() 1577db96d56Sopenharmony_ci c = echo_client(termlen, s.port) 1587db96d56Sopenharmony_ci data = b"hello world, I'm not dead yet!\n" 1597db96d56Sopenharmony_ci c.push(data) 1607db96d56Sopenharmony_ci c.push(SERVER_QUIT) 1617db96d56Sopenharmony_ci asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 1627db96d56Sopenharmony_ci threading_helper.join_thread(s) 1637db96d56Sopenharmony_ci 1647db96d56Sopenharmony_ci self.assertEqual(c.contents, [data[:termlen]]) 1657db96d56Sopenharmony_ci 1667db96d56Sopenharmony_ci def test_numeric_terminator1(self): 1677db96d56Sopenharmony_ci # check that ints & longs both work (since type is 1687db96d56Sopenharmony_ci # explicitly checked in async_chat.handle_read) 1697db96d56Sopenharmony_ci self.numeric_terminator_check(1) 1707db96d56Sopenharmony_ci 1717db96d56Sopenharmony_ci def test_numeric_terminator2(self): 1727db96d56Sopenharmony_ci self.numeric_terminator_check(6) 1737db96d56Sopenharmony_ci 1747db96d56Sopenharmony_ci def test_none_terminator(self): 1757db96d56Sopenharmony_ci # Try reading a fixed number of bytes 1767db96d56Sopenharmony_ci s, event = start_echo_server() 1777db96d56Sopenharmony_ci c = echo_client(None, s.port) 1787db96d56Sopenharmony_ci data = b"hello world, I'm not dead yet!\n" 1797db96d56Sopenharmony_ci c.push(data) 1807db96d56Sopenharmony_ci c.push(SERVER_QUIT) 1817db96d56Sopenharmony_ci asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 1827db96d56Sopenharmony_ci threading_helper.join_thread(s) 1837db96d56Sopenharmony_ci 1847db96d56Sopenharmony_ci self.assertEqual(c.contents, []) 1857db96d56Sopenharmony_ci self.assertEqual(c.buffer, data) 1867db96d56Sopenharmony_ci 1877db96d56Sopenharmony_ci def test_simple_producer(self): 1887db96d56Sopenharmony_ci s, event = start_echo_server() 1897db96d56Sopenharmony_ci c = echo_client(b'\n', s.port) 1907db96d56Sopenharmony_ci data = b"hello world\nI'm not dead yet!\n" 1917db96d56Sopenharmony_ci p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) 1927db96d56Sopenharmony_ci c.push_with_producer(p) 1937db96d56Sopenharmony_ci asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 1947db96d56Sopenharmony_ci threading_helper.join_thread(s) 1957db96d56Sopenharmony_ci 1967db96d56Sopenharmony_ci self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) 1977db96d56Sopenharmony_ci 1987db96d56Sopenharmony_ci def test_string_producer(self): 1997db96d56Sopenharmony_ci s, event = start_echo_server() 2007db96d56Sopenharmony_ci c = echo_client(b'\n', s.port) 2017db96d56Sopenharmony_ci data = b"hello world\nI'm not dead yet!\n" 2027db96d56Sopenharmony_ci c.push_with_producer(data+SERVER_QUIT) 2037db96d56Sopenharmony_ci asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 2047db96d56Sopenharmony_ci threading_helper.join_thread(s) 2057db96d56Sopenharmony_ci 2067db96d56Sopenharmony_ci self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) 2077db96d56Sopenharmony_ci 2087db96d56Sopenharmony_ci def test_empty_line(self): 2097db96d56Sopenharmony_ci # checks that empty lines are handled correctly 2107db96d56Sopenharmony_ci s, event = start_echo_server() 2117db96d56Sopenharmony_ci c = echo_client(b'\n', s.port) 2127db96d56Sopenharmony_ci c.push(b"hello world\n\nI'm not dead yet!\n") 2137db96d56Sopenharmony_ci c.push(SERVER_QUIT) 2147db96d56Sopenharmony_ci asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 2157db96d56Sopenharmony_ci threading_helper.join_thread(s) 2167db96d56Sopenharmony_ci 2177db96d56Sopenharmony_ci self.assertEqual(c.contents, 2187db96d56Sopenharmony_ci [b"hello world", b"", b"I'm not dead yet!"]) 2197db96d56Sopenharmony_ci 2207db96d56Sopenharmony_ci def test_close_when_done(self): 2217db96d56Sopenharmony_ci s, event = start_echo_server() 2227db96d56Sopenharmony_ci s.start_resend_event = threading.Event() 2237db96d56Sopenharmony_ci c = echo_client(b'\n', s.port) 2247db96d56Sopenharmony_ci c.push(b"hello world\nI'm not dead yet!\n") 2257db96d56Sopenharmony_ci c.push(SERVER_QUIT) 2267db96d56Sopenharmony_ci c.close_when_done() 2277db96d56Sopenharmony_ci asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 2287db96d56Sopenharmony_ci 2297db96d56Sopenharmony_ci # Only allow the server to start echoing data back to the client after 2307db96d56Sopenharmony_ci # the client has closed its connection. This prevents a race condition 2317db96d56Sopenharmony_ci # where the server echoes all of its data before we can check that it 2327db96d56Sopenharmony_ci # got any down below. 2337db96d56Sopenharmony_ci s.start_resend_event.set() 2347db96d56Sopenharmony_ci threading_helper.join_thread(s) 2357db96d56Sopenharmony_ci 2367db96d56Sopenharmony_ci self.assertEqual(c.contents, []) 2377db96d56Sopenharmony_ci # the server might have been able to send a byte or two back, but this 2387db96d56Sopenharmony_ci # at least checks that it received something and didn't just fail 2397db96d56Sopenharmony_ci # (which could still result in the client not having received anything) 2407db96d56Sopenharmony_ci self.assertGreater(len(s.buffer), 0) 2417db96d56Sopenharmony_ci 2427db96d56Sopenharmony_ci def test_push(self): 2437db96d56Sopenharmony_ci # Issue #12523: push() should raise a TypeError if it doesn't get 2447db96d56Sopenharmony_ci # a bytes string 2457db96d56Sopenharmony_ci s, event = start_echo_server() 2467db96d56Sopenharmony_ci c = echo_client(b'\n', s.port) 2477db96d56Sopenharmony_ci data = b'bytes\n' 2487db96d56Sopenharmony_ci c.push(data) 2497db96d56Sopenharmony_ci c.push(bytearray(data)) 2507db96d56Sopenharmony_ci c.push(memoryview(data)) 2517db96d56Sopenharmony_ci self.assertRaises(TypeError, c.push, 10) 2527db96d56Sopenharmony_ci self.assertRaises(TypeError, c.push, 'unicode') 2537db96d56Sopenharmony_ci c.push(SERVER_QUIT) 2547db96d56Sopenharmony_ci asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 2557db96d56Sopenharmony_ci threading_helper.join_thread(s) 2567db96d56Sopenharmony_ci self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes']) 2577db96d56Sopenharmony_ci 2587db96d56Sopenharmony_ci 2597db96d56Sopenharmony_ciclass TestAsynchat_WithPoll(TestAsynchat): 2607db96d56Sopenharmony_ci usepoll = True 2617db96d56Sopenharmony_ci 2627db96d56Sopenharmony_ci 2637db96d56Sopenharmony_ciclass TestAsynchatMocked(unittest.TestCase): 2647db96d56Sopenharmony_ci def test_blockingioerror(self): 2657db96d56Sopenharmony_ci # Issue #16133: handle_read() must ignore BlockingIOError 2667db96d56Sopenharmony_ci sock = unittest.mock.Mock() 2677db96d56Sopenharmony_ci sock.recv.side_effect = BlockingIOError(errno.EAGAIN) 2687db96d56Sopenharmony_ci 2697db96d56Sopenharmony_ci dispatcher = asynchat.async_chat() 2707db96d56Sopenharmony_ci dispatcher.set_socket(sock) 2717db96d56Sopenharmony_ci self.addCleanup(dispatcher.del_channel) 2727db96d56Sopenharmony_ci 2737db96d56Sopenharmony_ci with unittest.mock.patch.object(dispatcher, 'handle_error') as error: 2747db96d56Sopenharmony_ci dispatcher.handle_read() 2757db96d56Sopenharmony_ci self.assertFalse(error.called) 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci 2787db96d56Sopenharmony_ciclass TestHelperFunctions(unittest.TestCase): 2797db96d56Sopenharmony_ci def test_find_prefix_at_end(self): 2807db96d56Sopenharmony_ci self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1) 2817db96d56Sopenharmony_ci self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0) 2827db96d56Sopenharmony_ci 2837db96d56Sopenharmony_ci 2847db96d56Sopenharmony_ciclass TestNotConnected(unittest.TestCase): 2857db96d56Sopenharmony_ci def test_disallow_negative_terminator(self): 2867db96d56Sopenharmony_ci # Issue #11259 2877db96d56Sopenharmony_ci client = asynchat.async_chat() 2887db96d56Sopenharmony_ci self.assertRaises(ValueError, client.set_terminator, -1) 2897db96d56Sopenharmony_ci 2907db96d56Sopenharmony_ci 2917db96d56Sopenharmony_ci 2927db96d56Sopenharmony_ciif __name__ == "__main__": 2937db96d56Sopenharmony_ci unittest.main() 294