17db96d56Sopenharmony_ciimport unittest
27db96d56Sopenharmony_ciimport select
37db96d56Sopenharmony_ciimport os
47db96d56Sopenharmony_ciimport socket
57db96d56Sopenharmony_ciimport sys
67db96d56Sopenharmony_ciimport time
77db96d56Sopenharmony_ciimport errno
87db96d56Sopenharmony_ciimport struct
97db96d56Sopenharmony_ciimport threading
107db96d56Sopenharmony_ci
117db96d56Sopenharmony_cifrom test import support
127db96d56Sopenharmony_cifrom test.support import os_helper
137db96d56Sopenharmony_cifrom test.support import socket_helper
147db96d56Sopenharmony_cifrom test.support import threading_helper
157db96d56Sopenharmony_cifrom test.support import warnings_helper
167db96d56Sopenharmony_cifrom io import BytesIO
177db96d56Sopenharmony_ci
187db96d56Sopenharmony_ciif support.PGO:
197db96d56Sopenharmony_ci    raise unittest.SkipTest("test is not helpful for PGO")
207db96d56Sopenharmony_ci
217db96d56Sopenharmony_cisupport.requires_working_socket(module=True)
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_ciasyncore = warnings_helper.import_deprecated('asyncore')
247db96d56Sopenharmony_ci
257db96d56Sopenharmony_ci
267db96d56Sopenharmony_ciHAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX')
277db96d56Sopenharmony_ci
287db96d56Sopenharmony_ciclass dummysocket:
297db96d56Sopenharmony_ci    def __init__(self):
307db96d56Sopenharmony_ci        self.closed = False
317db96d56Sopenharmony_ci
327db96d56Sopenharmony_ci    def close(self):
337db96d56Sopenharmony_ci        self.closed = True
347db96d56Sopenharmony_ci
357db96d56Sopenharmony_ci    def fileno(self):
367db96d56Sopenharmony_ci        return 42
377db96d56Sopenharmony_ci
387db96d56Sopenharmony_ciclass dummychannel:
397db96d56Sopenharmony_ci    def __init__(self):
407db96d56Sopenharmony_ci        self.socket = dummysocket()
417db96d56Sopenharmony_ci
427db96d56Sopenharmony_ci    def close(self):
437db96d56Sopenharmony_ci        self.socket.close()
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ciclass exitingdummy:
467db96d56Sopenharmony_ci    def __init__(self):
477db96d56Sopenharmony_ci        pass
487db96d56Sopenharmony_ci
497db96d56Sopenharmony_ci    def handle_read_event(self):
507db96d56Sopenharmony_ci        raise asyncore.ExitNow()
517db96d56Sopenharmony_ci
527db96d56Sopenharmony_ci    handle_write_event = handle_read_event
537db96d56Sopenharmony_ci    handle_close = handle_read_event
547db96d56Sopenharmony_ci    handle_expt_event = handle_read_event
557db96d56Sopenharmony_ci
567db96d56Sopenharmony_ciclass crashingdummy:
577db96d56Sopenharmony_ci    def __init__(self):
587db96d56Sopenharmony_ci        self.error_handled = False
597db96d56Sopenharmony_ci
607db96d56Sopenharmony_ci    def handle_read_event(self):
617db96d56Sopenharmony_ci        raise Exception()
627db96d56Sopenharmony_ci
637db96d56Sopenharmony_ci    handle_write_event = handle_read_event
647db96d56Sopenharmony_ci    handle_close = handle_read_event
657db96d56Sopenharmony_ci    handle_expt_event = handle_read_event
667db96d56Sopenharmony_ci
677db96d56Sopenharmony_ci    def handle_error(self):
687db96d56Sopenharmony_ci        self.error_handled = True
697db96d56Sopenharmony_ci
707db96d56Sopenharmony_ci# used when testing senders; just collects what it gets until newline is sent
717db96d56Sopenharmony_cidef capture_server(evt, buf, serv):
727db96d56Sopenharmony_ci    try:
737db96d56Sopenharmony_ci        serv.listen()
747db96d56Sopenharmony_ci        conn, addr = serv.accept()
757db96d56Sopenharmony_ci    except TimeoutError:
767db96d56Sopenharmony_ci        pass
777db96d56Sopenharmony_ci    else:
787db96d56Sopenharmony_ci        n = 200
797db96d56Sopenharmony_ci        start = time.monotonic()
807db96d56Sopenharmony_ci        while n > 0 and time.monotonic() - start < 3.0:
817db96d56Sopenharmony_ci            r, w, e = select.select([conn], [], [], 0.1)
827db96d56Sopenharmony_ci            if r:
837db96d56Sopenharmony_ci                n -= 1
847db96d56Sopenharmony_ci                data = conn.recv(10)
857db96d56Sopenharmony_ci                # keep everything except for the newline terminator
867db96d56Sopenharmony_ci                buf.write(data.replace(b'\n', b''))
877db96d56Sopenharmony_ci                if b'\n' in data:
887db96d56Sopenharmony_ci                    break
897db96d56Sopenharmony_ci            time.sleep(0.01)
907db96d56Sopenharmony_ci
917db96d56Sopenharmony_ci        conn.close()
927db96d56Sopenharmony_ci    finally:
937db96d56Sopenharmony_ci        serv.close()
947db96d56Sopenharmony_ci        evt.set()
957db96d56Sopenharmony_ci
967db96d56Sopenharmony_cidef bind_af_aware(sock, addr):
977db96d56Sopenharmony_ci    """Helper function to bind a socket according to its family."""
987db96d56Sopenharmony_ci    if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
997db96d56Sopenharmony_ci        # Make sure the path doesn't exist.
1007db96d56Sopenharmony_ci        os_helper.unlink(addr)
1017db96d56Sopenharmony_ci        socket_helper.bind_unix_socket(sock, addr)
1027db96d56Sopenharmony_ci    else:
1037db96d56Sopenharmony_ci        sock.bind(addr)
1047db96d56Sopenharmony_ci
1057db96d56Sopenharmony_ci
1067db96d56Sopenharmony_ciclass HelperFunctionTests(unittest.TestCase):
1077db96d56Sopenharmony_ci    def test_readwriteexc(self):
1087db96d56Sopenharmony_ci        # Check exception handling behavior of read, write and _exception
1097db96d56Sopenharmony_ci
1107db96d56Sopenharmony_ci        # check that ExitNow exceptions in the object handler method
1117db96d56Sopenharmony_ci        # bubbles all the way up through asyncore read/write/_exception calls
1127db96d56Sopenharmony_ci        tr1 = exitingdummy()
1137db96d56Sopenharmony_ci        self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
1147db96d56Sopenharmony_ci        self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
1157db96d56Sopenharmony_ci        self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
1167db96d56Sopenharmony_ci
1177db96d56Sopenharmony_ci        # check that an exception other than ExitNow in the object handler
1187db96d56Sopenharmony_ci        # method causes the handle_error method to get called
1197db96d56Sopenharmony_ci        tr2 = crashingdummy()
1207db96d56Sopenharmony_ci        asyncore.read(tr2)
1217db96d56Sopenharmony_ci        self.assertEqual(tr2.error_handled, True)
1227db96d56Sopenharmony_ci
1237db96d56Sopenharmony_ci        tr2 = crashingdummy()
1247db96d56Sopenharmony_ci        asyncore.write(tr2)
1257db96d56Sopenharmony_ci        self.assertEqual(tr2.error_handled, True)
1267db96d56Sopenharmony_ci
1277db96d56Sopenharmony_ci        tr2 = crashingdummy()
1287db96d56Sopenharmony_ci        asyncore._exception(tr2)
1297db96d56Sopenharmony_ci        self.assertEqual(tr2.error_handled, True)
1307db96d56Sopenharmony_ci
1317db96d56Sopenharmony_ci    # asyncore.readwrite uses constants in the select module that
1327db96d56Sopenharmony_ci    # are not present in Windows systems (see this thread:
1337db96d56Sopenharmony_ci    # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
1347db96d56Sopenharmony_ci    # These constants should be present as long as poll is available
1357db96d56Sopenharmony_ci
1367db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
1377db96d56Sopenharmony_ci    def test_readwrite(self):
1387db96d56Sopenharmony_ci        # Check that correct methods are called by readwrite()
1397db96d56Sopenharmony_ci
1407db96d56Sopenharmony_ci        attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
1417db96d56Sopenharmony_ci
1427db96d56Sopenharmony_ci        expected = (
1437db96d56Sopenharmony_ci            (select.POLLIN, 'read'),
1447db96d56Sopenharmony_ci            (select.POLLPRI, 'expt'),
1457db96d56Sopenharmony_ci            (select.POLLOUT, 'write'),
1467db96d56Sopenharmony_ci            (select.POLLERR, 'closed'),
1477db96d56Sopenharmony_ci            (select.POLLHUP, 'closed'),
1487db96d56Sopenharmony_ci            (select.POLLNVAL, 'closed'),
1497db96d56Sopenharmony_ci            )
1507db96d56Sopenharmony_ci
1517db96d56Sopenharmony_ci        class testobj:
1527db96d56Sopenharmony_ci            def __init__(self):
1537db96d56Sopenharmony_ci                self.read = False
1547db96d56Sopenharmony_ci                self.write = False
1557db96d56Sopenharmony_ci                self.closed = False
1567db96d56Sopenharmony_ci                self.expt = False
1577db96d56Sopenharmony_ci                self.error_handled = False
1587db96d56Sopenharmony_ci
1597db96d56Sopenharmony_ci            def handle_read_event(self):
1607db96d56Sopenharmony_ci                self.read = True
1617db96d56Sopenharmony_ci
1627db96d56Sopenharmony_ci            def handle_write_event(self):
1637db96d56Sopenharmony_ci                self.write = True
1647db96d56Sopenharmony_ci
1657db96d56Sopenharmony_ci            def handle_close(self):
1667db96d56Sopenharmony_ci                self.closed = True
1677db96d56Sopenharmony_ci
1687db96d56Sopenharmony_ci            def handle_expt_event(self):
1697db96d56Sopenharmony_ci                self.expt = True
1707db96d56Sopenharmony_ci
1717db96d56Sopenharmony_ci            def handle_error(self):
1727db96d56Sopenharmony_ci                self.error_handled = True
1737db96d56Sopenharmony_ci
1747db96d56Sopenharmony_ci        for flag, expectedattr in expected:
1757db96d56Sopenharmony_ci            tobj = testobj()
1767db96d56Sopenharmony_ci            self.assertEqual(getattr(tobj, expectedattr), False)
1777db96d56Sopenharmony_ci            asyncore.readwrite(tobj, flag)
1787db96d56Sopenharmony_ci
1797db96d56Sopenharmony_ci            # Only the attribute modified by the routine we expect to be
1807db96d56Sopenharmony_ci            # called should be True.
1817db96d56Sopenharmony_ci            for attr in attributes:
1827db96d56Sopenharmony_ci                self.assertEqual(getattr(tobj, attr), attr==expectedattr)
1837db96d56Sopenharmony_ci
1847db96d56Sopenharmony_ci            # check that ExitNow exceptions in the object handler method
1857db96d56Sopenharmony_ci            # bubbles all the way up through asyncore readwrite call
1867db96d56Sopenharmony_ci            tr1 = exitingdummy()
1877db96d56Sopenharmony_ci            self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
1887db96d56Sopenharmony_ci
1897db96d56Sopenharmony_ci            # check that an exception other than ExitNow in the object handler
1907db96d56Sopenharmony_ci            # method causes the handle_error method to get called
1917db96d56Sopenharmony_ci            tr2 = crashingdummy()
1927db96d56Sopenharmony_ci            self.assertEqual(tr2.error_handled, False)
1937db96d56Sopenharmony_ci            asyncore.readwrite(tr2, flag)
1947db96d56Sopenharmony_ci            self.assertEqual(tr2.error_handled, True)
1957db96d56Sopenharmony_ci
1967db96d56Sopenharmony_ci    def test_closeall(self):
1977db96d56Sopenharmony_ci        self.closeall_check(False)
1987db96d56Sopenharmony_ci
1997db96d56Sopenharmony_ci    def test_closeall_default(self):
2007db96d56Sopenharmony_ci        self.closeall_check(True)
2017db96d56Sopenharmony_ci
2027db96d56Sopenharmony_ci    def closeall_check(self, usedefault):
2037db96d56Sopenharmony_ci        # Check that close_all() closes everything in a given map
2047db96d56Sopenharmony_ci
2057db96d56Sopenharmony_ci        l = []
2067db96d56Sopenharmony_ci        testmap = {}
2077db96d56Sopenharmony_ci        for i in range(10):
2087db96d56Sopenharmony_ci            c = dummychannel()
2097db96d56Sopenharmony_ci            l.append(c)
2107db96d56Sopenharmony_ci            self.assertEqual(c.socket.closed, False)
2117db96d56Sopenharmony_ci            testmap[i] = c
2127db96d56Sopenharmony_ci
2137db96d56Sopenharmony_ci        if usedefault:
2147db96d56Sopenharmony_ci            socketmap = asyncore.socket_map
2157db96d56Sopenharmony_ci            try:
2167db96d56Sopenharmony_ci                asyncore.socket_map = testmap
2177db96d56Sopenharmony_ci                asyncore.close_all()
2187db96d56Sopenharmony_ci            finally:
2197db96d56Sopenharmony_ci                testmap, asyncore.socket_map = asyncore.socket_map, socketmap
2207db96d56Sopenharmony_ci        else:
2217db96d56Sopenharmony_ci            asyncore.close_all(testmap)
2227db96d56Sopenharmony_ci
2237db96d56Sopenharmony_ci        self.assertEqual(len(testmap), 0)
2247db96d56Sopenharmony_ci
2257db96d56Sopenharmony_ci        for c in l:
2267db96d56Sopenharmony_ci            self.assertEqual(c.socket.closed, True)
2277db96d56Sopenharmony_ci
2287db96d56Sopenharmony_ci    def test_compact_traceback(self):
2297db96d56Sopenharmony_ci        try:
2307db96d56Sopenharmony_ci            raise Exception("I don't like spam!")
2317db96d56Sopenharmony_ci        except:
2327db96d56Sopenharmony_ci            real_t, real_v, real_tb = sys.exc_info()
2337db96d56Sopenharmony_ci            r = asyncore.compact_traceback()
2347db96d56Sopenharmony_ci        else:
2357db96d56Sopenharmony_ci            self.fail("Expected exception")
2367db96d56Sopenharmony_ci
2377db96d56Sopenharmony_ci        (f, function, line), t, v, info = r
2387db96d56Sopenharmony_ci        self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
2397db96d56Sopenharmony_ci        self.assertEqual(function, 'test_compact_traceback')
2407db96d56Sopenharmony_ci        self.assertEqual(t, real_t)
2417db96d56Sopenharmony_ci        self.assertEqual(v, real_v)
2427db96d56Sopenharmony_ci        self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
2437db96d56Sopenharmony_ci
2447db96d56Sopenharmony_ci
2457db96d56Sopenharmony_ciclass DispatcherTests(unittest.TestCase):
2467db96d56Sopenharmony_ci    def setUp(self):
2477db96d56Sopenharmony_ci        pass
2487db96d56Sopenharmony_ci
2497db96d56Sopenharmony_ci    def tearDown(self):
2507db96d56Sopenharmony_ci        asyncore.close_all()
2517db96d56Sopenharmony_ci
2527db96d56Sopenharmony_ci    def test_basic(self):
2537db96d56Sopenharmony_ci        d = asyncore.dispatcher()
2547db96d56Sopenharmony_ci        self.assertEqual(d.readable(), True)
2557db96d56Sopenharmony_ci        self.assertEqual(d.writable(), True)
2567db96d56Sopenharmony_ci
2577db96d56Sopenharmony_ci    def test_repr(self):
2587db96d56Sopenharmony_ci        d = asyncore.dispatcher()
2597db96d56Sopenharmony_ci        self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
2607db96d56Sopenharmony_ci
2617db96d56Sopenharmony_ci    def test_log(self):
2627db96d56Sopenharmony_ci        d = asyncore.dispatcher()
2637db96d56Sopenharmony_ci
2647db96d56Sopenharmony_ci        # capture output of dispatcher.log() (to stderr)
2657db96d56Sopenharmony_ci        l1 = "Lovely spam! Wonderful spam!"
2667db96d56Sopenharmony_ci        l2 = "I don't like spam!"
2677db96d56Sopenharmony_ci        with support.captured_stderr() as stderr:
2687db96d56Sopenharmony_ci            d.log(l1)
2697db96d56Sopenharmony_ci            d.log(l2)
2707db96d56Sopenharmony_ci
2717db96d56Sopenharmony_ci        lines = stderr.getvalue().splitlines()
2727db96d56Sopenharmony_ci        self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
2737db96d56Sopenharmony_ci
2747db96d56Sopenharmony_ci    def test_log_info(self):
2757db96d56Sopenharmony_ci        d = asyncore.dispatcher()
2767db96d56Sopenharmony_ci
2777db96d56Sopenharmony_ci        # capture output of dispatcher.log_info() (to stdout via print)
2787db96d56Sopenharmony_ci        l1 = "Have you got anything without spam?"
2797db96d56Sopenharmony_ci        l2 = "Why can't she have egg bacon spam and sausage?"
2807db96d56Sopenharmony_ci        l3 = "THAT'S got spam in it!"
2817db96d56Sopenharmony_ci        with support.captured_stdout() as stdout:
2827db96d56Sopenharmony_ci            d.log_info(l1, 'EGGS')
2837db96d56Sopenharmony_ci            d.log_info(l2)
2847db96d56Sopenharmony_ci            d.log_info(l3, 'SPAM')
2857db96d56Sopenharmony_ci
2867db96d56Sopenharmony_ci        lines = stdout.getvalue().splitlines()
2877db96d56Sopenharmony_ci        expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
2887db96d56Sopenharmony_ci        self.assertEqual(lines, expected)
2897db96d56Sopenharmony_ci
2907db96d56Sopenharmony_ci    def test_unhandled(self):
2917db96d56Sopenharmony_ci        d = asyncore.dispatcher()
2927db96d56Sopenharmony_ci        d.ignore_log_types = ()
2937db96d56Sopenharmony_ci
2947db96d56Sopenharmony_ci        # capture output of dispatcher.log_info() (to stdout via print)
2957db96d56Sopenharmony_ci        with support.captured_stdout() as stdout:
2967db96d56Sopenharmony_ci            d.handle_expt()
2977db96d56Sopenharmony_ci            d.handle_read()
2987db96d56Sopenharmony_ci            d.handle_write()
2997db96d56Sopenharmony_ci            d.handle_connect()
3007db96d56Sopenharmony_ci
3017db96d56Sopenharmony_ci        lines = stdout.getvalue().splitlines()
3027db96d56Sopenharmony_ci        expected = ['warning: unhandled incoming priority event',
3037db96d56Sopenharmony_ci                    'warning: unhandled read event',
3047db96d56Sopenharmony_ci                    'warning: unhandled write event',
3057db96d56Sopenharmony_ci                    'warning: unhandled connect event']
3067db96d56Sopenharmony_ci        self.assertEqual(lines, expected)
3077db96d56Sopenharmony_ci
3087db96d56Sopenharmony_ci    def test_strerror(self):
3097db96d56Sopenharmony_ci        # refers to bug #8573
3107db96d56Sopenharmony_ci        err = asyncore._strerror(errno.EPERM)
3117db96d56Sopenharmony_ci        if hasattr(os, 'strerror'):
3127db96d56Sopenharmony_ci            self.assertEqual(err, os.strerror(errno.EPERM))
3137db96d56Sopenharmony_ci        err = asyncore._strerror(-1)
3147db96d56Sopenharmony_ci        self.assertTrue(err != "")
3157db96d56Sopenharmony_ci
3167db96d56Sopenharmony_ci
3177db96d56Sopenharmony_ciclass dispatcherwithsend_noread(asyncore.dispatcher_with_send):
3187db96d56Sopenharmony_ci    def readable(self):
3197db96d56Sopenharmony_ci        return False
3207db96d56Sopenharmony_ci
3217db96d56Sopenharmony_ci    def handle_connect(self):
3227db96d56Sopenharmony_ci        pass
3237db96d56Sopenharmony_ci
3247db96d56Sopenharmony_ci
3257db96d56Sopenharmony_ciclass DispatcherWithSendTests(unittest.TestCase):
3267db96d56Sopenharmony_ci    def setUp(self):
3277db96d56Sopenharmony_ci        pass
3287db96d56Sopenharmony_ci
3297db96d56Sopenharmony_ci    def tearDown(self):
3307db96d56Sopenharmony_ci        asyncore.close_all()
3317db96d56Sopenharmony_ci
3327db96d56Sopenharmony_ci    @threading_helper.reap_threads
3337db96d56Sopenharmony_ci    def test_send(self):
3347db96d56Sopenharmony_ci        evt = threading.Event()
3357db96d56Sopenharmony_ci        sock = socket.socket()
3367db96d56Sopenharmony_ci        sock.settimeout(3)
3377db96d56Sopenharmony_ci        port = socket_helper.bind_port(sock)
3387db96d56Sopenharmony_ci
3397db96d56Sopenharmony_ci        cap = BytesIO()
3407db96d56Sopenharmony_ci        args = (evt, cap, sock)
3417db96d56Sopenharmony_ci        t = threading.Thread(target=capture_server, args=args)
3427db96d56Sopenharmony_ci        t.start()
3437db96d56Sopenharmony_ci        try:
3447db96d56Sopenharmony_ci            # wait a little longer for the server to initialize (it sometimes
3457db96d56Sopenharmony_ci            # refuses connections on slow machines without this wait)
3467db96d56Sopenharmony_ci            time.sleep(0.2)
3477db96d56Sopenharmony_ci
3487db96d56Sopenharmony_ci            data = b"Suppose there isn't a 16-ton weight?"
3497db96d56Sopenharmony_ci            d = dispatcherwithsend_noread()
3507db96d56Sopenharmony_ci            d.create_socket()
3517db96d56Sopenharmony_ci            d.connect((socket_helper.HOST, port))
3527db96d56Sopenharmony_ci
3537db96d56Sopenharmony_ci            # give time for socket to connect
3547db96d56Sopenharmony_ci            time.sleep(0.1)
3557db96d56Sopenharmony_ci
3567db96d56Sopenharmony_ci            d.send(data)
3577db96d56Sopenharmony_ci            d.send(data)
3587db96d56Sopenharmony_ci            d.send(b'\n')
3597db96d56Sopenharmony_ci
3607db96d56Sopenharmony_ci            n = 1000
3617db96d56Sopenharmony_ci            while d.out_buffer and n > 0:
3627db96d56Sopenharmony_ci                asyncore.poll()
3637db96d56Sopenharmony_ci                n -= 1
3647db96d56Sopenharmony_ci
3657db96d56Sopenharmony_ci            evt.wait()
3667db96d56Sopenharmony_ci
3677db96d56Sopenharmony_ci            self.assertEqual(cap.getvalue(), data*2)
3687db96d56Sopenharmony_ci        finally:
3697db96d56Sopenharmony_ci            threading_helper.join_thread(t)
3707db96d56Sopenharmony_ci
3717db96d56Sopenharmony_ci
3727db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
3737db96d56Sopenharmony_ci                     'asyncore.file_wrapper required')
3747db96d56Sopenharmony_ciclass FileWrapperTest(unittest.TestCase):
3757db96d56Sopenharmony_ci    def setUp(self):
3767db96d56Sopenharmony_ci        self.d = b"It's not dead, it's sleeping!"
3777db96d56Sopenharmony_ci        with open(os_helper.TESTFN, 'wb') as file:
3787db96d56Sopenharmony_ci            file.write(self.d)
3797db96d56Sopenharmony_ci
3807db96d56Sopenharmony_ci    def tearDown(self):
3817db96d56Sopenharmony_ci        os_helper.unlink(os_helper.TESTFN)
3827db96d56Sopenharmony_ci
3837db96d56Sopenharmony_ci    def test_recv(self):
3847db96d56Sopenharmony_ci        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
3857db96d56Sopenharmony_ci        w = asyncore.file_wrapper(fd)
3867db96d56Sopenharmony_ci        os.close(fd)
3877db96d56Sopenharmony_ci
3887db96d56Sopenharmony_ci        self.assertNotEqual(w.fd, fd)
3897db96d56Sopenharmony_ci        self.assertNotEqual(w.fileno(), fd)
3907db96d56Sopenharmony_ci        self.assertEqual(w.recv(13), b"It's not dead")
3917db96d56Sopenharmony_ci        self.assertEqual(w.read(6), b", it's")
3927db96d56Sopenharmony_ci        w.close()
3937db96d56Sopenharmony_ci        self.assertRaises(OSError, w.read, 1)
3947db96d56Sopenharmony_ci
3957db96d56Sopenharmony_ci    def test_send(self):
3967db96d56Sopenharmony_ci        d1 = b"Come again?"
3977db96d56Sopenharmony_ci        d2 = b"I want to buy some cheese."
3987db96d56Sopenharmony_ci        fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_APPEND)
3997db96d56Sopenharmony_ci        w = asyncore.file_wrapper(fd)
4007db96d56Sopenharmony_ci        os.close(fd)
4017db96d56Sopenharmony_ci
4027db96d56Sopenharmony_ci        w.write(d1)
4037db96d56Sopenharmony_ci        w.send(d2)
4047db96d56Sopenharmony_ci        w.close()
4057db96d56Sopenharmony_ci        with open(os_helper.TESTFN, 'rb') as file:
4067db96d56Sopenharmony_ci            self.assertEqual(file.read(), self.d + d1 + d2)
4077db96d56Sopenharmony_ci
4087db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
4097db96d56Sopenharmony_ci                         'asyncore.file_dispatcher required')
4107db96d56Sopenharmony_ci    def test_dispatcher(self):
4117db96d56Sopenharmony_ci        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
4127db96d56Sopenharmony_ci        data = []
4137db96d56Sopenharmony_ci        class FileDispatcher(asyncore.file_dispatcher):
4147db96d56Sopenharmony_ci            def handle_read(self):
4157db96d56Sopenharmony_ci                data.append(self.recv(29))
4167db96d56Sopenharmony_ci        s = FileDispatcher(fd)
4177db96d56Sopenharmony_ci        os.close(fd)
4187db96d56Sopenharmony_ci        asyncore.loop(timeout=0.01, use_poll=True, count=2)
4197db96d56Sopenharmony_ci        self.assertEqual(b"".join(data), self.d)
4207db96d56Sopenharmony_ci
4217db96d56Sopenharmony_ci    def test_resource_warning(self):
4227db96d56Sopenharmony_ci        # Issue #11453
4237db96d56Sopenharmony_ci        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
4247db96d56Sopenharmony_ci        f = asyncore.file_wrapper(fd)
4257db96d56Sopenharmony_ci
4267db96d56Sopenharmony_ci        os.close(fd)
4277db96d56Sopenharmony_ci        with warnings_helper.check_warnings(('', ResourceWarning)):
4287db96d56Sopenharmony_ci            f = None
4297db96d56Sopenharmony_ci            support.gc_collect()
4307db96d56Sopenharmony_ci
4317db96d56Sopenharmony_ci    def test_close_twice(self):
4327db96d56Sopenharmony_ci        fd = os.open(os_helper.TESTFN, os.O_RDONLY)
4337db96d56Sopenharmony_ci        f = asyncore.file_wrapper(fd)
4347db96d56Sopenharmony_ci        os.close(fd)
4357db96d56Sopenharmony_ci
4367db96d56Sopenharmony_ci        os.close(f.fd)  # file_wrapper dupped fd
4377db96d56Sopenharmony_ci        with self.assertRaises(OSError):
4387db96d56Sopenharmony_ci            f.close()
4397db96d56Sopenharmony_ci
4407db96d56Sopenharmony_ci        self.assertEqual(f.fd, -1)
4417db96d56Sopenharmony_ci        # calling close twice should not fail
4427db96d56Sopenharmony_ci        f.close()
4437db96d56Sopenharmony_ci
4447db96d56Sopenharmony_ci
4457db96d56Sopenharmony_ciclass BaseTestHandler(asyncore.dispatcher):
4467db96d56Sopenharmony_ci
4477db96d56Sopenharmony_ci    def __init__(self, sock=None):
4487db96d56Sopenharmony_ci        asyncore.dispatcher.__init__(self, sock)
4497db96d56Sopenharmony_ci        self.flag = False
4507db96d56Sopenharmony_ci
4517db96d56Sopenharmony_ci    def handle_accept(self):
4527db96d56Sopenharmony_ci        raise Exception("handle_accept not supposed to be called")
4537db96d56Sopenharmony_ci
4547db96d56Sopenharmony_ci    def handle_accepted(self):
4557db96d56Sopenharmony_ci        raise Exception("handle_accepted not supposed to be called")
4567db96d56Sopenharmony_ci
4577db96d56Sopenharmony_ci    def handle_connect(self):
4587db96d56Sopenharmony_ci        raise Exception("handle_connect not supposed to be called")
4597db96d56Sopenharmony_ci
4607db96d56Sopenharmony_ci    def handle_expt(self):
4617db96d56Sopenharmony_ci        raise Exception("handle_expt not supposed to be called")
4627db96d56Sopenharmony_ci
4637db96d56Sopenharmony_ci    def handle_close(self):
4647db96d56Sopenharmony_ci        raise Exception("handle_close not supposed to be called")
4657db96d56Sopenharmony_ci
4667db96d56Sopenharmony_ci    def handle_error(self):
4677db96d56Sopenharmony_ci        raise
4687db96d56Sopenharmony_ci
4697db96d56Sopenharmony_ci
4707db96d56Sopenharmony_ciclass BaseServer(asyncore.dispatcher):
4717db96d56Sopenharmony_ci    """A server which listens on an address and dispatches the
4727db96d56Sopenharmony_ci    connection to a handler.
4737db96d56Sopenharmony_ci    """
4747db96d56Sopenharmony_ci
4757db96d56Sopenharmony_ci    def __init__(self, family, addr, handler=BaseTestHandler):
4767db96d56Sopenharmony_ci        asyncore.dispatcher.__init__(self)
4777db96d56Sopenharmony_ci        self.create_socket(family)
4787db96d56Sopenharmony_ci        self.set_reuse_addr()
4797db96d56Sopenharmony_ci        bind_af_aware(self.socket, addr)
4807db96d56Sopenharmony_ci        self.listen(5)
4817db96d56Sopenharmony_ci        self.handler = handler
4827db96d56Sopenharmony_ci
4837db96d56Sopenharmony_ci    @property
4847db96d56Sopenharmony_ci    def address(self):
4857db96d56Sopenharmony_ci        return self.socket.getsockname()
4867db96d56Sopenharmony_ci
4877db96d56Sopenharmony_ci    def handle_accepted(self, sock, addr):
4887db96d56Sopenharmony_ci        self.handler(sock)
4897db96d56Sopenharmony_ci
4907db96d56Sopenharmony_ci    def handle_error(self):
4917db96d56Sopenharmony_ci        raise
4927db96d56Sopenharmony_ci
4937db96d56Sopenharmony_ci
4947db96d56Sopenharmony_ciclass BaseClient(BaseTestHandler):
4957db96d56Sopenharmony_ci
4967db96d56Sopenharmony_ci    def __init__(self, family, address):
4977db96d56Sopenharmony_ci        BaseTestHandler.__init__(self)
4987db96d56Sopenharmony_ci        self.create_socket(family)
4997db96d56Sopenharmony_ci        self.connect(address)
5007db96d56Sopenharmony_ci
5017db96d56Sopenharmony_ci    def handle_connect(self):
5027db96d56Sopenharmony_ci        pass
5037db96d56Sopenharmony_ci
5047db96d56Sopenharmony_ci
5057db96d56Sopenharmony_ciclass BaseTestAPI:
5067db96d56Sopenharmony_ci
5077db96d56Sopenharmony_ci    def tearDown(self):
5087db96d56Sopenharmony_ci        asyncore.close_all(ignore_all=True)
5097db96d56Sopenharmony_ci
5107db96d56Sopenharmony_ci    def loop_waiting_for_flag(self, instance, timeout=5):
5117db96d56Sopenharmony_ci        timeout = float(timeout) / 100
5127db96d56Sopenharmony_ci        count = 100
5137db96d56Sopenharmony_ci        while asyncore.socket_map and count > 0:
5147db96d56Sopenharmony_ci            asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
5157db96d56Sopenharmony_ci            if instance.flag:
5167db96d56Sopenharmony_ci                return
5177db96d56Sopenharmony_ci            count -= 1
5187db96d56Sopenharmony_ci            time.sleep(timeout)
5197db96d56Sopenharmony_ci        self.fail("flag not set")
5207db96d56Sopenharmony_ci
5217db96d56Sopenharmony_ci    def test_handle_connect(self):
5227db96d56Sopenharmony_ci        # make sure handle_connect is called on connect()
5237db96d56Sopenharmony_ci
5247db96d56Sopenharmony_ci        class TestClient(BaseClient):
5257db96d56Sopenharmony_ci            def handle_connect(self):
5267db96d56Sopenharmony_ci                self.flag = True
5277db96d56Sopenharmony_ci
5287db96d56Sopenharmony_ci        server = BaseServer(self.family, self.addr)
5297db96d56Sopenharmony_ci        client = TestClient(self.family, server.address)
5307db96d56Sopenharmony_ci        self.loop_waiting_for_flag(client)
5317db96d56Sopenharmony_ci
5327db96d56Sopenharmony_ci    def test_handle_accept(self):
5337db96d56Sopenharmony_ci        # make sure handle_accept() is called when a client connects
5347db96d56Sopenharmony_ci
5357db96d56Sopenharmony_ci        class TestListener(BaseTestHandler):
5367db96d56Sopenharmony_ci
5377db96d56Sopenharmony_ci            def __init__(self, family, addr):
5387db96d56Sopenharmony_ci                BaseTestHandler.__init__(self)
5397db96d56Sopenharmony_ci                self.create_socket(family)
5407db96d56Sopenharmony_ci                bind_af_aware(self.socket, addr)
5417db96d56Sopenharmony_ci                self.listen(5)
5427db96d56Sopenharmony_ci                self.address = self.socket.getsockname()
5437db96d56Sopenharmony_ci
5447db96d56Sopenharmony_ci            def handle_accept(self):
5457db96d56Sopenharmony_ci                self.flag = True
5467db96d56Sopenharmony_ci
5477db96d56Sopenharmony_ci        server = TestListener(self.family, self.addr)
5487db96d56Sopenharmony_ci        client = BaseClient(self.family, server.address)
5497db96d56Sopenharmony_ci        self.loop_waiting_for_flag(server)
5507db96d56Sopenharmony_ci
5517db96d56Sopenharmony_ci    def test_handle_accepted(self):
5527db96d56Sopenharmony_ci        # make sure handle_accepted() is called when a client connects
5537db96d56Sopenharmony_ci
5547db96d56Sopenharmony_ci        class TestListener(BaseTestHandler):
5557db96d56Sopenharmony_ci
5567db96d56Sopenharmony_ci            def __init__(self, family, addr):
5577db96d56Sopenharmony_ci                BaseTestHandler.__init__(self)
5587db96d56Sopenharmony_ci                self.create_socket(family)
5597db96d56Sopenharmony_ci                bind_af_aware(self.socket, addr)
5607db96d56Sopenharmony_ci                self.listen(5)
5617db96d56Sopenharmony_ci                self.address = self.socket.getsockname()
5627db96d56Sopenharmony_ci
5637db96d56Sopenharmony_ci            def handle_accept(self):
5647db96d56Sopenharmony_ci                asyncore.dispatcher.handle_accept(self)
5657db96d56Sopenharmony_ci
5667db96d56Sopenharmony_ci            def handle_accepted(self, sock, addr):
5677db96d56Sopenharmony_ci                sock.close()
5687db96d56Sopenharmony_ci                self.flag = True
5697db96d56Sopenharmony_ci
5707db96d56Sopenharmony_ci        server = TestListener(self.family, self.addr)
5717db96d56Sopenharmony_ci        client = BaseClient(self.family, server.address)
5727db96d56Sopenharmony_ci        self.loop_waiting_for_flag(server)
5737db96d56Sopenharmony_ci
5747db96d56Sopenharmony_ci
5757db96d56Sopenharmony_ci    def test_handle_read(self):
5767db96d56Sopenharmony_ci        # make sure handle_read is called on data received
5777db96d56Sopenharmony_ci
5787db96d56Sopenharmony_ci        class TestClient(BaseClient):
5797db96d56Sopenharmony_ci            def handle_read(self):
5807db96d56Sopenharmony_ci                self.flag = True
5817db96d56Sopenharmony_ci
5827db96d56Sopenharmony_ci        class TestHandler(BaseTestHandler):
5837db96d56Sopenharmony_ci            def __init__(self, conn):
5847db96d56Sopenharmony_ci                BaseTestHandler.__init__(self, conn)
5857db96d56Sopenharmony_ci                self.send(b'x' * 1024)
5867db96d56Sopenharmony_ci
5877db96d56Sopenharmony_ci        server = BaseServer(self.family, self.addr, TestHandler)
5887db96d56Sopenharmony_ci        client = TestClient(self.family, server.address)
5897db96d56Sopenharmony_ci        self.loop_waiting_for_flag(client)
5907db96d56Sopenharmony_ci
5917db96d56Sopenharmony_ci    def test_handle_write(self):
5927db96d56Sopenharmony_ci        # make sure handle_write is called
5937db96d56Sopenharmony_ci
5947db96d56Sopenharmony_ci        class TestClient(BaseClient):
5957db96d56Sopenharmony_ci            def handle_write(self):
5967db96d56Sopenharmony_ci                self.flag = True
5977db96d56Sopenharmony_ci
5987db96d56Sopenharmony_ci        server = BaseServer(self.family, self.addr)
5997db96d56Sopenharmony_ci        client = TestClient(self.family, server.address)
6007db96d56Sopenharmony_ci        self.loop_waiting_for_flag(client)
6017db96d56Sopenharmony_ci
6027db96d56Sopenharmony_ci    def test_handle_close(self):
6037db96d56Sopenharmony_ci        # make sure handle_close is called when the other end closes
6047db96d56Sopenharmony_ci        # the connection
6057db96d56Sopenharmony_ci
6067db96d56Sopenharmony_ci        class TestClient(BaseClient):
6077db96d56Sopenharmony_ci
6087db96d56Sopenharmony_ci            def handle_read(self):
6097db96d56Sopenharmony_ci                # in order to make handle_close be called we are supposed
6107db96d56Sopenharmony_ci                # to make at least one recv() call
6117db96d56Sopenharmony_ci                self.recv(1024)
6127db96d56Sopenharmony_ci
6137db96d56Sopenharmony_ci            def handle_close(self):
6147db96d56Sopenharmony_ci                self.flag = True
6157db96d56Sopenharmony_ci                self.close()
6167db96d56Sopenharmony_ci
6177db96d56Sopenharmony_ci        class TestHandler(BaseTestHandler):
6187db96d56Sopenharmony_ci            def __init__(self, conn):
6197db96d56Sopenharmony_ci                BaseTestHandler.__init__(self, conn)
6207db96d56Sopenharmony_ci                self.close()
6217db96d56Sopenharmony_ci
6227db96d56Sopenharmony_ci        server = BaseServer(self.family, self.addr, TestHandler)
6237db96d56Sopenharmony_ci        client = TestClient(self.family, server.address)
6247db96d56Sopenharmony_ci        self.loop_waiting_for_flag(client)
6257db96d56Sopenharmony_ci
6267db96d56Sopenharmony_ci    def test_handle_close_after_conn_broken(self):
6277db96d56Sopenharmony_ci        # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and
6287db96d56Sopenharmony_ci        # #11265).
6297db96d56Sopenharmony_ci
6307db96d56Sopenharmony_ci        data = b'\0' * 128
6317db96d56Sopenharmony_ci
6327db96d56Sopenharmony_ci        class TestClient(BaseClient):
6337db96d56Sopenharmony_ci
6347db96d56Sopenharmony_ci            def handle_write(self):
6357db96d56Sopenharmony_ci                self.send(data)
6367db96d56Sopenharmony_ci
6377db96d56Sopenharmony_ci            def handle_close(self):
6387db96d56Sopenharmony_ci                self.flag = True
6397db96d56Sopenharmony_ci                self.close()
6407db96d56Sopenharmony_ci
6417db96d56Sopenharmony_ci            def handle_expt(self):
6427db96d56Sopenharmony_ci                self.flag = True
6437db96d56Sopenharmony_ci                self.close()
6447db96d56Sopenharmony_ci
6457db96d56Sopenharmony_ci        class TestHandler(BaseTestHandler):
6467db96d56Sopenharmony_ci
6477db96d56Sopenharmony_ci            def handle_read(self):
6487db96d56Sopenharmony_ci                self.recv(len(data))
6497db96d56Sopenharmony_ci                self.close()
6507db96d56Sopenharmony_ci
6517db96d56Sopenharmony_ci            def writable(self):
6527db96d56Sopenharmony_ci                return False
6537db96d56Sopenharmony_ci
6547db96d56Sopenharmony_ci        server = BaseServer(self.family, self.addr, TestHandler)
6557db96d56Sopenharmony_ci        client = TestClient(self.family, server.address)
6567db96d56Sopenharmony_ci        self.loop_waiting_for_flag(client)
6577db96d56Sopenharmony_ci
6587db96d56Sopenharmony_ci    @unittest.skipIf(sys.platform.startswith("sunos"),
6597db96d56Sopenharmony_ci                     "OOB support is broken on Solaris")
6607db96d56Sopenharmony_ci    def test_handle_expt(self):
6617db96d56Sopenharmony_ci        # Make sure handle_expt is called on OOB data received.
6627db96d56Sopenharmony_ci        # Note: this might fail on some platforms as OOB data is
6637db96d56Sopenharmony_ci        # tenuously supported and rarely used.
6647db96d56Sopenharmony_ci        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
6657db96d56Sopenharmony_ci            self.skipTest("Not applicable to AF_UNIX sockets.")
6667db96d56Sopenharmony_ci
6677db96d56Sopenharmony_ci        if sys.platform == "darwin" and self.use_poll:
6687db96d56Sopenharmony_ci            self.skipTest("poll may fail on macOS; see issue #28087")
6697db96d56Sopenharmony_ci
6707db96d56Sopenharmony_ci        class TestClient(BaseClient):
6717db96d56Sopenharmony_ci            def handle_expt(self):
6727db96d56Sopenharmony_ci                self.socket.recv(1024, socket.MSG_OOB)
6737db96d56Sopenharmony_ci                self.flag = True
6747db96d56Sopenharmony_ci
6757db96d56Sopenharmony_ci        class TestHandler(BaseTestHandler):
6767db96d56Sopenharmony_ci            def __init__(self, conn):
6777db96d56Sopenharmony_ci                BaseTestHandler.__init__(self, conn)
6787db96d56Sopenharmony_ci                self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
6797db96d56Sopenharmony_ci
6807db96d56Sopenharmony_ci        server = BaseServer(self.family, self.addr, TestHandler)
6817db96d56Sopenharmony_ci        client = TestClient(self.family, server.address)
6827db96d56Sopenharmony_ci        self.loop_waiting_for_flag(client)
6837db96d56Sopenharmony_ci
6847db96d56Sopenharmony_ci    def test_handle_error(self):
6857db96d56Sopenharmony_ci
6867db96d56Sopenharmony_ci        class TestClient(BaseClient):
6877db96d56Sopenharmony_ci            def handle_write(self):
6887db96d56Sopenharmony_ci                1.0 / 0
6897db96d56Sopenharmony_ci            def handle_error(self):
6907db96d56Sopenharmony_ci                self.flag = True
6917db96d56Sopenharmony_ci                try:
6927db96d56Sopenharmony_ci                    raise
6937db96d56Sopenharmony_ci                except ZeroDivisionError:
6947db96d56Sopenharmony_ci                    pass
6957db96d56Sopenharmony_ci                else:
6967db96d56Sopenharmony_ci                    raise Exception("exception not raised")
6977db96d56Sopenharmony_ci
6987db96d56Sopenharmony_ci        server = BaseServer(self.family, self.addr)
6997db96d56Sopenharmony_ci        client = TestClient(self.family, server.address)
7007db96d56Sopenharmony_ci        self.loop_waiting_for_flag(client)
7017db96d56Sopenharmony_ci
7027db96d56Sopenharmony_ci    def test_connection_attributes(self):
7037db96d56Sopenharmony_ci        server = BaseServer(self.family, self.addr)
7047db96d56Sopenharmony_ci        client = BaseClient(self.family, server.address)
7057db96d56Sopenharmony_ci
7067db96d56Sopenharmony_ci        # we start disconnected
7077db96d56Sopenharmony_ci        self.assertFalse(server.connected)
7087db96d56Sopenharmony_ci        self.assertTrue(server.accepting)
7097db96d56Sopenharmony_ci        # this can't be taken for granted across all platforms
7107db96d56Sopenharmony_ci        #self.assertFalse(client.connected)
7117db96d56Sopenharmony_ci        self.assertFalse(client.accepting)
7127db96d56Sopenharmony_ci
7137db96d56Sopenharmony_ci        # execute some loops so that client connects to server
7147db96d56Sopenharmony_ci        asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
7157db96d56Sopenharmony_ci        self.assertFalse(server.connected)
7167db96d56Sopenharmony_ci        self.assertTrue(server.accepting)
7177db96d56Sopenharmony_ci        self.assertTrue(client.connected)
7187db96d56Sopenharmony_ci        self.assertFalse(client.accepting)
7197db96d56Sopenharmony_ci
7207db96d56Sopenharmony_ci        # disconnect the client
7217db96d56Sopenharmony_ci        client.close()
7227db96d56Sopenharmony_ci        self.assertFalse(server.connected)
7237db96d56Sopenharmony_ci        self.assertTrue(server.accepting)
7247db96d56Sopenharmony_ci        self.assertFalse(client.connected)
7257db96d56Sopenharmony_ci        self.assertFalse(client.accepting)
7267db96d56Sopenharmony_ci
7277db96d56Sopenharmony_ci        # stop serving
7287db96d56Sopenharmony_ci        server.close()
7297db96d56Sopenharmony_ci        self.assertFalse(server.connected)
7307db96d56Sopenharmony_ci        self.assertFalse(server.accepting)
7317db96d56Sopenharmony_ci
7327db96d56Sopenharmony_ci    def test_create_socket(self):
7337db96d56Sopenharmony_ci        s = asyncore.dispatcher()
7347db96d56Sopenharmony_ci        s.create_socket(self.family)
7357db96d56Sopenharmony_ci        self.assertEqual(s.socket.type, socket.SOCK_STREAM)
7367db96d56Sopenharmony_ci        self.assertEqual(s.socket.family, self.family)
7377db96d56Sopenharmony_ci        self.assertEqual(s.socket.gettimeout(), 0)
7387db96d56Sopenharmony_ci        self.assertFalse(s.socket.get_inheritable())
7397db96d56Sopenharmony_ci
7407db96d56Sopenharmony_ci    def test_bind(self):
7417db96d56Sopenharmony_ci        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
7427db96d56Sopenharmony_ci            self.skipTest("Not applicable to AF_UNIX sockets.")
7437db96d56Sopenharmony_ci        s1 = asyncore.dispatcher()
7447db96d56Sopenharmony_ci        s1.create_socket(self.family)
7457db96d56Sopenharmony_ci        s1.bind(self.addr)
7467db96d56Sopenharmony_ci        s1.listen(5)
7477db96d56Sopenharmony_ci        port = s1.socket.getsockname()[1]
7487db96d56Sopenharmony_ci
7497db96d56Sopenharmony_ci        s2 = asyncore.dispatcher()
7507db96d56Sopenharmony_ci        s2.create_socket(self.family)
7517db96d56Sopenharmony_ci        # EADDRINUSE indicates the socket was correctly bound
7527db96d56Sopenharmony_ci        self.assertRaises(OSError, s2.bind, (self.addr[0], port))
7537db96d56Sopenharmony_ci
7547db96d56Sopenharmony_ci    def test_set_reuse_addr(self):
7557db96d56Sopenharmony_ci        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
7567db96d56Sopenharmony_ci            self.skipTest("Not applicable to AF_UNIX sockets.")
7577db96d56Sopenharmony_ci
7587db96d56Sopenharmony_ci        with socket.socket(self.family) as sock:
7597db96d56Sopenharmony_ci            try:
7607db96d56Sopenharmony_ci                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
7617db96d56Sopenharmony_ci            except OSError:
7627db96d56Sopenharmony_ci                unittest.skip("SO_REUSEADDR not supported on this platform")
7637db96d56Sopenharmony_ci            else:
7647db96d56Sopenharmony_ci                # if SO_REUSEADDR succeeded for sock we expect asyncore
7657db96d56Sopenharmony_ci                # to do the same
7667db96d56Sopenharmony_ci                s = asyncore.dispatcher(socket.socket(self.family))
7677db96d56Sopenharmony_ci                self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
7687db96d56Sopenharmony_ci                                                     socket.SO_REUSEADDR))
7697db96d56Sopenharmony_ci                s.socket.close()
7707db96d56Sopenharmony_ci                s.create_socket(self.family)
7717db96d56Sopenharmony_ci                s.set_reuse_addr()
7727db96d56Sopenharmony_ci                self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
7737db96d56Sopenharmony_ci                                                     socket.SO_REUSEADDR))
7747db96d56Sopenharmony_ci
7757db96d56Sopenharmony_ci    @threading_helper.reap_threads
7767db96d56Sopenharmony_ci    def test_quick_connect(self):
7777db96d56Sopenharmony_ci        # see: http://bugs.python.org/issue10340
7787db96d56Sopenharmony_ci        if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
7797db96d56Sopenharmony_ci            self.skipTest("test specific to AF_INET and AF_INET6")
7807db96d56Sopenharmony_ci
7817db96d56Sopenharmony_ci        server = BaseServer(self.family, self.addr)
7827db96d56Sopenharmony_ci        # run the thread 500 ms: the socket should be connected in 200 ms
7837db96d56Sopenharmony_ci        t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
7847db96d56Sopenharmony_ci                                                          count=5))
7857db96d56Sopenharmony_ci        t.start()
7867db96d56Sopenharmony_ci        try:
7877db96d56Sopenharmony_ci            with socket.socket(self.family, socket.SOCK_STREAM) as s:
7887db96d56Sopenharmony_ci                s.settimeout(.2)
7897db96d56Sopenharmony_ci                s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
7907db96d56Sopenharmony_ci                             struct.pack('ii', 1, 0))
7917db96d56Sopenharmony_ci
7927db96d56Sopenharmony_ci                try:
7937db96d56Sopenharmony_ci                    s.connect(server.address)
7947db96d56Sopenharmony_ci                except OSError:
7957db96d56Sopenharmony_ci                    pass
7967db96d56Sopenharmony_ci        finally:
7977db96d56Sopenharmony_ci            threading_helper.join_thread(t)
7987db96d56Sopenharmony_ci
7997db96d56Sopenharmony_ciclass TestAPI_UseIPv4Sockets(BaseTestAPI):
8007db96d56Sopenharmony_ci    family = socket.AF_INET
8017db96d56Sopenharmony_ci    addr = (socket_helper.HOST, 0)
8027db96d56Sopenharmony_ci
8037db96d56Sopenharmony_ci@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required')
8047db96d56Sopenharmony_ciclass TestAPI_UseIPv6Sockets(BaseTestAPI):
8057db96d56Sopenharmony_ci    family = socket.AF_INET6
8067db96d56Sopenharmony_ci    addr = (socket_helper.HOSTv6, 0)
8077db96d56Sopenharmony_ci
8087db96d56Sopenharmony_ci@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
8097db96d56Sopenharmony_ciclass TestAPI_UseUnixSockets(BaseTestAPI):
8107db96d56Sopenharmony_ci    if HAS_UNIX_SOCKETS:
8117db96d56Sopenharmony_ci        family = socket.AF_UNIX
8127db96d56Sopenharmony_ci    addr = os_helper.TESTFN
8137db96d56Sopenharmony_ci
8147db96d56Sopenharmony_ci    def tearDown(self):
8157db96d56Sopenharmony_ci        os_helper.unlink(self.addr)
8167db96d56Sopenharmony_ci        BaseTestAPI.tearDown(self)
8177db96d56Sopenharmony_ci
8187db96d56Sopenharmony_ciclass TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):
8197db96d56Sopenharmony_ci    use_poll = False
8207db96d56Sopenharmony_ci
8217db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
8227db96d56Sopenharmony_ciclass TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets, unittest.TestCase):
8237db96d56Sopenharmony_ci    use_poll = True
8247db96d56Sopenharmony_ci
8257db96d56Sopenharmony_ciclass TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets, unittest.TestCase):
8267db96d56Sopenharmony_ci    use_poll = False
8277db96d56Sopenharmony_ci
8287db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
8297db96d56Sopenharmony_ciclass TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets, unittest.TestCase):
8307db96d56Sopenharmony_ci    use_poll = True
8317db96d56Sopenharmony_ci
8327db96d56Sopenharmony_ciclass TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets, unittest.TestCase):
8337db96d56Sopenharmony_ci    use_poll = False
8347db96d56Sopenharmony_ci
8357db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
8367db96d56Sopenharmony_ciclass TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets, unittest.TestCase):
8377db96d56Sopenharmony_ci    use_poll = True
8387db96d56Sopenharmony_ci
8397db96d56Sopenharmony_ciif __name__ == "__main__":
8407db96d56Sopenharmony_ci    unittest.main()
841