17db96d56Sopenharmony_ci# test asynchat
27db96d56Sopenharmony_ci
37db96d56Sopenharmony_cifrom test import support
47db96d56Sopenharmony_cifrom test.support import socket_helper
57db96d56Sopenharmony_cifrom test.support import threading_helper
67db96d56Sopenharmony_cifrom test.support import warnings_helper
77db96d56Sopenharmony_ci
87db96d56Sopenharmony_ciimport errno
97db96d56Sopenharmony_ciimport socket
107db96d56Sopenharmony_ciimport sys
117db96d56Sopenharmony_ciimport threading
127db96d56Sopenharmony_ciimport time
137db96d56Sopenharmony_ciimport unittest
147db96d56Sopenharmony_ciimport unittest.mock
157db96d56Sopenharmony_ci
167db96d56Sopenharmony_ci
177db96d56Sopenharmony_ciasynchat = warnings_helper.import_deprecated('asynchat')
187db96d56Sopenharmony_ciasyncore = warnings_helper.import_deprecated('asyncore')
197db96d56Sopenharmony_ci
207db96d56Sopenharmony_cisupport.requires_working_socket(module=True)
217db96d56Sopenharmony_ci
227db96d56Sopenharmony_ciHOST = socket_helper.HOST
237db96d56Sopenharmony_ciSERVER_QUIT = b'QUIT\n'
247db96d56Sopenharmony_ci
257db96d56Sopenharmony_ci
267db96d56Sopenharmony_ciclass echo_server(threading.Thread):
277db96d56Sopenharmony_ci    # parameter to determine the number of bytes passed back to the
287db96d56Sopenharmony_ci    # client each send
297db96d56Sopenharmony_ci    chunk_size = 1
307db96d56Sopenharmony_ci
317db96d56Sopenharmony_ci    def __init__(self, event):
327db96d56Sopenharmony_ci        threading.Thread.__init__(self)
337db96d56Sopenharmony_ci        self.event = event
347db96d56Sopenharmony_ci        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
357db96d56Sopenharmony_ci        self.port = socket_helper.bind_port(self.sock)
367db96d56Sopenharmony_ci        # This will be set if the client wants us to wait before echoing
377db96d56Sopenharmony_ci        # data back.
387db96d56Sopenharmony_ci        self.start_resend_event = None
397db96d56Sopenharmony_ci
407db96d56Sopenharmony_ci    def run(self):
417db96d56Sopenharmony_ci        self.sock.listen()
427db96d56Sopenharmony_ci        self.event.set()
437db96d56Sopenharmony_ci        conn, client = self.sock.accept()
447db96d56Sopenharmony_ci        self.buffer = b""
457db96d56Sopenharmony_ci        # collect data until quit message is seen
467db96d56Sopenharmony_ci        while SERVER_QUIT not in self.buffer:
477db96d56Sopenharmony_ci            data = conn.recv(1)
487db96d56Sopenharmony_ci            if not data:
497db96d56Sopenharmony_ci                break
507db96d56Sopenharmony_ci            self.buffer = self.buffer + data
517db96d56Sopenharmony_ci
527db96d56Sopenharmony_ci        # remove the SERVER_QUIT message
537db96d56Sopenharmony_ci        self.buffer = self.buffer.replace(SERVER_QUIT, b'')
547db96d56Sopenharmony_ci
557db96d56Sopenharmony_ci        if self.start_resend_event:
567db96d56Sopenharmony_ci            self.start_resend_event.wait()
577db96d56Sopenharmony_ci
587db96d56Sopenharmony_ci        # re-send entire set of collected data
597db96d56Sopenharmony_ci        try:
607db96d56Sopenharmony_ci            # this may fail on some tests, such as test_close_when_done,
617db96d56Sopenharmony_ci            # since the client closes the channel when it's done sending
627db96d56Sopenharmony_ci            while self.buffer:
637db96d56Sopenharmony_ci                n = conn.send(self.buffer[:self.chunk_size])
647db96d56Sopenharmony_ci                time.sleep(0.001)
657db96d56Sopenharmony_ci                self.buffer = self.buffer[n:]
667db96d56Sopenharmony_ci        except:
677db96d56Sopenharmony_ci            pass
687db96d56Sopenharmony_ci
697db96d56Sopenharmony_ci        conn.close()
707db96d56Sopenharmony_ci        self.sock.close()
717db96d56Sopenharmony_ci
727db96d56Sopenharmony_ciclass echo_client(asynchat.async_chat):
737db96d56Sopenharmony_ci
747db96d56Sopenharmony_ci    def __init__(self, terminator, server_port):
757db96d56Sopenharmony_ci        asynchat.async_chat.__init__(self)
767db96d56Sopenharmony_ci        self.contents = []
777db96d56Sopenharmony_ci        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
787db96d56Sopenharmony_ci        self.connect((HOST, server_port))
797db96d56Sopenharmony_ci        self.set_terminator(terminator)
807db96d56Sopenharmony_ci        self.buffer = b""
817db96d56Sopenharmony_ci
827db96d56Sopenharmony_ci    def handle_connect(self):
837db96d56Sopenharmony_ci        pass
847db96d56Sopenharmony_ci
857db96d56Sopenharmony_ci    if sys.platform == 'darwin':
867db96d56Sopenharmony_ci        # select.poll returns a select.POLLHUP at the end of the tests
877db96d56Sopenharmony_ci        # on darwin, so just ignore it
887db96d56Sopenharmony_ci        def handle_expt(self):
897db96d56Sopenharmony_ci            pass
907db96d56Sopenharmony_ci
917db96d56Sopenharmony_ci    def collect_incoming_data(self, data):
927db96d56Sopenharmony_ci        self.buffer += data
937db96d56Sopenharmony_ci
947db96d56Sopenharmony_ci    def found_terminator(self):
957db96d56Sopenharmony_ci        self.contents.append(self.buffer)
967db96d56Sopenharmony_ci        self.buffer = b""
977db96d56Sopenharmony_ci
987db96d56Sopenharmony_cidef start_echo_server():
997db96d56Sopenharmony_ci    event = threading.Event()
1007db96d56Sopenharmony_ci    s = echo_server(event)
1017db96d56Sopenharmony_ci    s.start()
1027db96d56Sopenharmony_ci    event.wait()
1037db96d56Sopenharmony_ci    event.clear()
1047db96d56Sopenharmony_ci    time.sleep(0.01)   # Give server time to start accepting.
1057db96d56Sopenharmony_ci    return s, event
1067db96d56Sopenharmony_ci
1077db96d56Sopenharmony_ci
1087db96d56Sopenharmony_ciclass TestAsynchat(unittest.TestCase):
1097db96d56Sopenharmony_ci    usepoll = False
1107db96d56Sopenharmony_ci
1117db96d56Sopenharmony_ci    def setUp(self):
1127db96d56Sopenharmony_ci        self._threads = threading_helper.threading_setup()
1137db96d56Sopenharmony_ci
1147db96d56Sopenharmony_ci    def tearDown(self):
1157db96d56Sopenharmony_ci        threading_helper.threading_cleanup(*self._threads)
1167db96d56Sopenharmony_ci
1177db96d56Sopenharmony_ci    def line_terminator_check(self, term, server_chunk):
1187db96d56Sopenharmony_ci        event = threading.Event()
1197db96d56Sopenharmony_ci        s = echo_server(event)
1207db96d56Sopenharmony_ci        s.chunk_size = server_chunk
1217db96d56Sopenharmony_ci        s.start()
1227db96d56Sopenharmony_ci        event.wait()
1237db96d56Sopenharmony_ci        event.clear()
1247db96d56Sopenharmony_ci        time.sleep(0.01)   # Give server time to start accepting.
1257db96d56Sopenharmony_ci        c = echo_client(term, s.port)
1267db96d56Sopenharmony_ci        c.push(b"hello ")
1277db96d56Sopenharmony_ci        c.push(b"world" + term)
1287db96d56Sopenharmony_ci        c.push(b"I'm not dead yet!" + term)
1297db96d56Sopenharmony_ci        c.push(SERVER_QUIT)
1307db96d56Sopenharmony_ci        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
1317db96d56Sopenharmony_ci        threading_helper.join_thread(s)
1327db96d56Sopenharmony_ci
1337db96d56Sopenharmony_ci        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
1347db96d56Sopenharmony_ci
1357db96d56Sopenharmony_ci    # the line terminator tests below check receiving variously-sized
1367db96d56Sopenharmony_ci    # chunks back from the server in order to exercise all branches of
1377db96d56Sopenharmony_ci    # async_chat.handle_read
1387db96d56Sopenharmony_ci
1397db96d56Sopenharmony_ci    def test_line_terminator1(self):
1407db96d56Sopenharmony_ci        # test one-character terminator
1417db96d56Sopenharmony_ci        for l in (1, 2, 3):
1427db96d56Sopenharmony_ci            self.line_terminator_check(b'\n', l)
1437db96d56Sopenharmony_ci
1447db96d56Sopenharmony_ci    def test_line_terminator2(self):
1457db96d56Sopenharmony_ci        # test two-character terminator
1467db96d56Sopenharmony_ci        for l in (1, 2, 3):
1477db96d56Sopenharmony_ci            self.line_terminator_check(b'\r\n', l)
1487db96d56Sopenharmony_ci
1497db96d56Sopenharmony_ci    def test_line_terminator3(self):
1507db96d56Sopenharmony_ci        # test three-character terminator
1517db96d56Sopenharmony_ci        for l in (1, 2, 3):
1527db96d56Sopenharmony_ci            self.line_terminator_check(b'qqq', l)
1537db96d56Sopenharmony_ci
1547db96d56Sopenharmony_ci    def numeric_terminator_check(self, termlen):
1557db96d56Sopenharmony_ci        # Try reading a fixed number of bytes
1567db96d56Sopenharmony_ci        s, event = start_echo_server()
1577db96d56Sopenharmony_ci        c = echo_client(termlen, s.port)
1587db96d56Sopenharmony_ci        data = b"hello world, I'm not dead yet!\n"
1597db96d56Sopenharmony_ci        c.push(data)
1607db96d56Sopenharmony_ci        c.push(SERVER_QUIT)
1617db96d56Sopenharmony_ci        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
1627db96d56Sopenharmony_ci        threading_helper.join_thread(s)
1637db96d56Sopenharmony_ci
1647db96d56Sopenharmony_ci        self.assertEqual(c.contents, [data[:termlen]])
1657db96d56Sopenharmony_ci
1667db96d56Sopenharmony_ci    def test_numeric_terminator1(self):
1677db96d56Sopenharmony_ci        # check that ints & longs both work (since type is
1687db96d56Sopenharmony_ci        # explicitly checked in async_chat.handle_read)
1697db96d56Sopenharmony_ci        self.numeric_terminator_check(1)
1707db96d56Sopenharmony_ci
1717db96d56Sopenharmony_ci    def test_numeric_terminator2(self):
1727db96d56Sopenharmony_ci        self.numeric_terminator_check(6)
1737db96d56Sopenharmony_ci
1747db96d56Sopenharmony_ci    def test_none_terminator(self):
1757db96d56Sopenharmony_ci        # Try reading a fixed number of bytes
1767db96d56Sopenharmony_ci        s, event = start_echo_server()
1777db96d56Sopenharmony_ci        c = echo_client(None, s.port)
1787db96d56Sopenharmony_ci        data = b"hello world, I'm not dead yet!\n"
1797db96d56Sopenharmony_ci        c.push(data)
1807db96d56Sopenharmony_ci        c.push(SERVER_QUIT)
1817db96d56Sopenharmony_ci        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
1827db96d56Sopenharmony_ci        threading_helper.join_thread(s)
1837db96d56Sopenharmony_ci
1847db96d56Sopenharmony_ci        self.assertEqual(c.contents, [])
1857db96d56Sopenharmony_ci        self.assertEqual(c.buffer, data)
1867db96d56Sopenharmony_ci
1877db96d56Sopenharmony_ci    def test_simple_producer(self):
1887db96d56Sopenharmony_ci        s, event = start_echo_server()
1897db96d56Sopenharmony_ci        c = echo_client(b'\n', s.port)
1907db96d56Sopenharmony_ci        data = b"hello world\nI'm not dead yet!\n"
1917db96d56Sopenharmony_ci        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
1927db96d56Sopenharmony_ci        c.push_with_producer(p)
1937db96d56Sopenharmony_ci        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
1947db96d56Sopenharmony_ci        threading_helper.join_thread(s)
1957db96d56Sopenharmony_ci
1967db96d56Sopenharmony_ci        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
1977db96d56Sopenharmony_ci
1987db96d56Sopenharmony_ci    def test_string_producer(self):
1997db96d56Sopenharmony_ci        s, event = start_echo_server()
2007db96d56Sopenharmony_ci        c = echo_client(b'\n', s.port)
2017db96d56Sopenharmony_ci        data = b"hello world\nI'm not dead yet!\n"
2027db96d56Sopenharmony_ci        c.push_with_producer(data+SERVER_QUIT)
2037db96d56Sopenharmony_ci        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
2047db96d56Sopenharmony_ci        threading_helper.join_thread(s)
2057db96d56Sopenharmony_ci
2067db96d56Sopenharmony_ci        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
2077db96d56Sopenharmony_ci
2087db96d56Sopenharmony_ci    def test_empty_line(self):
2097db96d56Sopenharmony_ci        # checks that empty lines are handled correctly
2107db96d56Sopenharmony_ci        s, event = start_echo_server()
2117db96d56Sopenharmony_ci        c = echo_client(b'\n', s.port)
2127db96d56Sopenharmony_ci        c.push(b"hello world\n\nI'm not dead yet!\n")
2137db96d56Sopenharmony_ci        c.push(SERVER_QUIT)
2147db96d56Sopenharmony_ci        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
2157db96d56Sopenharmony_ci        threading_helper.join_thread(s)
2167db96d56Sopenharmony_ci
2177db96d56Sopenharmony_ci        self.assertEqual(c.contents,
2187db96d56Sopenharmony_ci                         [b"hello world", b"", b"I'm not dead yet!"])
2197db96d56Sopenharmony_ci
2207db96d56Sopenharmony_ci    def test_close_when_done(self):
2217db96d56Sopenharmony_ci        s, event = start_echo_server()
2227db96d56Sopenharmony_ci        s.start_resend_event = threading.Event()
2237db96d56Sopenharmony_ci        c = echo_client(b'\n', s.port)
2247db96d56Sopenharmony_ci        c.push(b"hello world\nI'm not dead yet!\n")
2257db96d56Sopenharmony_ci        c.push(SERVER_QUIT)
2267db96d56Sopenharmony_ci        c.close_when_done()
2277db96d56Sopenharmony_ci        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
2287db96d56Sopenharmony_ci
2297db96d56Sopenharmony_ci        # Only allow the server to start echoing data back to the client after
2307db96d56Sopenharmony_ci        # the client has closed its connection.  This prevents a race condition
2317db96d56Sopenharmony_ci        # where the server echoes all of its data before we can check that it
2327db96d56Sopenharmony_ci        # got any down below.
2337db96d56Sopenharmony_ci        s.start_resend_event.set()
2347db96d56Sopenharmony_ci        threading_helper.join_thread(s)
2357db96d56Sopenharmony_ci
2367db96d56Sopenharmony_ci        self.assertEqual(c.contents, [])
2377db96d56Sopenharmony_ci        # the server might have been able to send a byte or two back, but this
2387db96d56Sopenharmony_ci        # at least checks that it received something and didn't just fail
2397db96d56Sopenharmony_ci        # (which could still result in the client not having received anything)
2407db96d56Sopenharmony_ci        self.assertGreater(len(s.buffer), 0)
2417db96d56Sopenharmony_ci
2427db96d56Sopenharmony_ci    def test_push(self):
2437db96d56Sopenharmony_ci        # Issue #12523: push() should raise a TypeError if it doesn't get
2447db96d56Sopenharmony_ci        # a bytes string
2457db96d56Sopenharmony_ci        s, event = start_echo_server()
2467db96d56Sopenharmony_ci        c = echo_client(b'\n', s.port)
2477db96d56Sopenharmony_ci        data = b'bytes\n'
2487db96d56Sopenharmony_ci        c.push(data)
2497db96d56Sopenharmony_ci        c.push(bytearray(data))
2507db96d56Sopenharmony_ci        c.push(memoryview(data))
2517db96d56Sopenharmony_ci        self.assertRaises(TypeError, c.push, 10)
2527db96d56Sopenharmony_ci        self.assertRaises(TypeError, c.push, 'unicode')
2537db96d56Sopenharmony_ci        c.push(SERVER_QUIT)
2547db96d56Sopenharmony_ci        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
2557db96d56Sopenharmony_ci        threading_helper.join_thread(s)
2567db96d56Sopenharmony_ci        self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])
2577db96d56Sopenharmony_ci
2587db96d56Sopenharmony_ci
2597db96d56Sopenharmony_ciclass TestAsynchat_WithPoll(TestAsynchat):
2607db96d56Sopenharmony_ci    usepoll = True
2617db96d56Sopenharmony_ci
2627db96d56Sopenharmony_ci
2637db96d56Sopenharmony_ciclass TestAsynchatMocked(unittest.TestCase):
2647db96d56Sopenharmony_ci    def test_blockingioerror(self):
2657db96d56Sopenharmony_ci        # Issue #16133: handle_read() must ignore BlockingIOError
2667db96d56Sopenharmony_ci        sock = unittest.mock.Mock()
2677db96d56Sopenharmony_ci        sock.recv.side_effect = BlockingIOError(errno.EAGAIN)
2687db96d56Sopenharmony_ci
2697db96d56Sopenharmony_ci        dispatcher = asynchat.async_chat()
2707db96d56Sopenharmony_ci        dispatcher.set_socket(sock)
2717db96d56Sopenharmony_ci        self.addCleanup(dispatcher.del_channel)
2727db96d56Sopenharmony_ci
2737db96d56Sopenharmony_ci        with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
2747db96d56Sopenharmony_ci            dispatcher.handle_read()
2757db96d56Sopenharmony_ci        self.assertFalse(error.called)
2767db96d56Sopenharmony_ci
2777db96d56Sopenharmony_ci
2787db96d56Sopenharmony_ciclass TestHelperFunctions(unittest.TestCase):
2797db96d56Sopenharmony_ci    def test_find_prefix_at_end(self):
2807db96d56Sopenharmony_ci        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
2817db96d56Sopenharmony_ci        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
2827db96d56Sopenharmony_ci
2837db96d56Sopenharmony_ci
2847db96d56Sopenharmony_ciclass TestNotConnected(unittest.TestCase):
2857db96d56Sopenharmony_ci    def test_disallow_negative_terminator(self):
2867db96d56Sopenharmony_ci        # Issue #11259
2877db96d56Sopenharmony_ci        client = asynchat.async_chat()
2887db96d56Sopenharmony_ci        self.assertRaises(ValueError, client.set_terminator, -1)
2897db96d56Sopenharmony_ci
2907db96d56Sopenharmony_ci
2917db96d56Sopenharmony_ci
2927db96d56Sopenharmony_ciif __name__ == "__main__":
2937db96d56Sopenharmony_ci    unittest.main()
294