17db96d56Sopenharmony_ciimport unittest 27db96d56Sopenharmony_ciimport select 37db96d56Sopenharmony_ciimport os 47db96d56Sopenharmony_ciimport socket 57db96d56Sopenharmony_ciimport sys 67db96d56Sopenharmony_ciimport time 77db96d56Sopenharmony_ciimport errno 87db96d56Sopenharmony_ciimport struct 97db96d56Sopenharmony_ciimport threading 107db96d56Sopenharmony_ci 117db96d56Sopenharmony_cifrom test import support 127db96d56Sopenharmony_cifrom test.support import os_helper 137db96d56Sopenharmony_cifrom test.support import socket_helper 147db96d56Sopenharmony_cifrom test.support import threading_helper 157db96d56Sopenharmony_cifrom test.support import warnings_helper 167db96d56Sopenharmony_cifrom io import BytesIO 177db96d56Sopenharmony_ci 187db96d56Sopenharmony_ciif support.PGO: 197db96d56Sopenharmony_ci raise unittest.SkipTest("test is not helpful for PGO") 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_cisupport.requires_working_socket(module=True) 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ciasyncore = warnings_helper.import_deprecated('asyncore') 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ciHAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX') 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ciclass dummysocket: 297db96d56Sopenharmony_ci def __init__(self): 307db96d56Sopenharmony_ci self.closed = False 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_ci def close(self): 337db96d56Sopenharmony_ci self.closed = True 347db96d56Sopenharmony_ci 357db96d56Sopenharmony_ci def fileno(self): 367db96d56Sopenharmony_ci return 42 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ciclass dummychannel: 397db96d56Sopenharmony_ci def __init__(self): 407db96d56Sopenharmony_ci self.socket = dummysocket() 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ci def close(self): 437db96d56Sopenharmony_ci self.socket.close() 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ciclass exitingdummy: 467db96d56Sopenharmony_ci def __init__(self): 477db96d56Sopenharmony_ci pass 487db96d56Sopenharmony_ci 497db96d56Sopenharmony_ci def handle_read_event(self): 507db96d56Sopenharmony_ci raise asyncore.ExitNow() 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ci handle_write_event = handle_read_event 537db96d56Sopenharmony_ci handle_close = handle_read_event 547db96d56Sopenharmony_ci handle_expt_event = handle_read_event 557db96d56Sopenharmony_ci 567db96d56Sopenharmony_ciclass crashingdummy: 577db96d56Sopenharmony_ci def __init__(self): 587db96d56Sopenharmony_ci self.error_handled = False 597db96d56Sopenharmony_ci 607db96d56Sopenharmony_ci def handle_read_event(self): 617db96d56Sopenharmony_ci raise Exception() 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ci handle_write_event = handle_read_event 647db96d56Sopenharmony_ci handle_close = handle_read_event 657db96d56Sopenharmony_ci handle_expt_event = handle_read_event 667db96d56Sopenharmony_ci 677db96d56Sopenharmony_ci def handle_error(self): 687db96d56Sopenharmony_ci self.error_handled = True 697db96d56Sopenharmony_ci 707db96d56Sopenharmony_ci# used when testing senders; just collects what it gets until newline is sent 717db96d56Sopenharmony_cidef capture_server(evt, buf, serv): 727db96d56Sopenharmony_ci try: 737db96d56Sopenharmony_ci serv.listen() 747db96d56Sopenharmony_ci conn, addr = serv.accept() 757db96d56Sopenharmony_ci except TimeoutError: 767db96d56Sopenharmony_ci pass 777db96d56Sopenharmony_ci else: 787db96d56Sopenharmony_ci n = 200 797db96d56Sopenharmony_ci start = time.monotonic() 807db96d56Sopenharmony_ci while n > 0 and time.monotonic() - start < 3.0: 817db96d56Sopenharmony_ci r, w, e = select.select([conn], [], [], 0.1) 827db96d56Sopenharmony_ci if r: 837db96d56Sopenharmony_ci n -= 1 847db96d56Sopenharmony_ci data = conn.recv(10) 857db96d56Sopenharmony_ci # keep everything except for the newline terminator 867db96d56Sopenharmony_ci buf.write(data.replace(b'\n', b'')) 877db96d56Sopenharmony_ci if b'\n' in data: 887db96d56Sopenharmony_ci break 897db96d56Sopenharmony_ci time.sleep(0.01) 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci conn.close() 927db96d56Sopenharmony_ci finally: 937db96d56Sopenharmony_ci serv.close() 947db96d56Sopenharmony_ci evt.set() 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_cidef bind_af_aware(sock, addr): 977db96d56Sopenharmony_ci """Helper function to bind a socket according to its family.""" 987db96d56Sopenharmony_ci if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: 997db96d56Sopenharmony_ci # Make sure the path doesn't exist. 1007db96d56Sopenharmony_ci os_helper.unlink(addr) 1017db96d56Sopenharmony_ci socket_helper.bind_unix_socket(sock, addr) 1027db96d56Sopenharmony_ci else: 1037db96d56Sopenharmony_ci sock.bind(addr) 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_ci 1067db96d56Sopenharmony_ciclass HelperFunctionTests(unittest.TestCase): 1077db96d56Sopenharmony_ci def test_readwriteexc(self): 1087db96d56Sopenharmony_ci # Check exception handling behavior of read, write and _exception 1097db96d56Sopenharmony_ci 1107db96d56Sopenharmony_ci # check that ExitNow exceptions in the object handler method 1117db96d56Sopenharmony_ci # bubbles all the way up through asyncore read/write/_exception calls 1127db96d56Sopenharmony_ci tr1 = exitingdummy() 1137db96d56Sopenharmony_ci self.assertRaises(asyncore.ExitNow, asyncore.read, tr1) 1147db96d56Sopenharmony_ci self.assertRaises(asyncore.ExitNow, asyncore.write, tr1) 1157db96d56Sopenharmony_ci self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1) 1167db96d56Sopenharmony_ci 1177db96d56Sopenharmony_ci # check that an exception other than ExitNow in the object handler 1187db96d56Sopenharmony_ci # method causes the handle_error method to get called 1197db96d56Sopenharmony_ci tr2 = crashingdummy() 1207db96d56Sopenharmony_ci asyncore.read(tr2) 1217db96d56Sopenharmony_ci self.assertEqual(tr2.error_handled, True) 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ci tr2 = crashingdummy() 1247db96d56Sopenharmony_ci asyncore.write(tr2) 1257db96d56Sopenharmony_ci self.assertEqual(tr2.error_handled, True) 1267db96d56Sopenharmony_ci 1277db96d56Sopenharmony_ci tr2 = crashingdummy() 1287db96d56Sopenharmony_ci asyncore._exception(tr2) 1297db96d56Sopenharmony_ci self.assertEqual(tr2.error_handled, True) 1307db96d56Sopenharmony_ci 1317db96d56Sopenharmony_ci # asyncore.readwrite uses constants in the select module that 1327db96d56Sopenharmony_ci # are not present in Windows systems (see this thread: 1337db96d56Sopenharmony_ci # http://mail.python.org/pipermail/python-list/2001-October/109973.html) 1347db96d56Sopenharmony_ci # These constants should be present as long as poll is available 1357db96d56Sopenharmony_ci 1367db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 1377db96d56Sopenharmony_ci def test_readwrite(self): 1387db96d56Sopenharmony_ci # Check that correct methods are called by readwrite() 1397db96d56Sopenharmony_ci 1407db96d56Sopenharmony_ci attributes = ('read', 'expt', 'write', 'closed', 'error_handled') 1417db96d56Sopenharmony_ci 1427db96d56Sopenharmony_ci expected = ( 1437db96d56Sopenharmony_ci (select.POLLIN, 'read'), 1447db96d56Sopenharmony_ci (select.POLLPRI, 'expt'), 1457db96d56Sopenharmony_ci (select.POLLOUT, 'write'), 1467db96d56Sopenharmony_ci (select.POLLERR, 'closed'), 1477db96d56Sopenharmony_ci (select.POLLHUP, 'closed'), 1487db96d56Sopenharmony_ci (select.POLLNVAL, 'closed'), 1497db96d56Sopenharmony_ci ) 1507db96d56Sopenharmony_ci 1517db96d56Sopenharmony_ci class testobj: 1527db96d56Sopenharmony_ci def __init__(self): 1537db96d56Sopenharmony_ci self.read = False 1547db96d56Sopenharmony_ci self.write = False 1557db96d56Sopenharmony_ci self.closed = False 1567db96d56Sopenharmony_ci self.expt = False 1577db96d56Sopenharmony_ci self.error_handled = False 1587db96d56Sopenharmony_ci 1597db96d56Sopenharmony_ci def handle_read_event(self): 1607db96d56Sopenharmony_ci self.read = True 1617db96d56Sopenharmony_ci 1627db96d56Sopenharmony_ci def handle_write_event(self): 1637db96d56Sopenharmony_ci self.write = True 1647db96d56Sopenharmony_ci 1657db96d56Sopenharmony_ci def handle_close(self): 1667db96d56Sopenharmony_ci self.closed = True 1677db96d56Sopenharmony_ci 1687db96d56Sopenharmony_ci def handle_expt_event(self): 1697db96d56Sopenharmony_ci self.expt = True 1707db96d56Sopenharmony_ci 1717db96d56Sopenharmony_ci def handle_error(self): 1727db96d56Sopenharmony_ci self.error_handled = True 1737db96d56Sopenharmony_ci 1747db96d56Sopenharmony_ci for flag, expectedattr in expected: 1757db96d56Sopenharmony_ci tobj = testobj() 1767db96d56Sopenharmony_ci self.assertEqual(getattr(tobj, expectedattr), False) 1777db96d56Sopenharmony_ci asyncore.readwrite(tobj, flag) 1787db96d56Sopenharmony_ci 1797db96d56Sopenharmony_ci # Only the attribute modified by the routine we expect to be 1807db96d56Sopenharmony_ci # called should be True. 1817db96d56Sopenharmony_ci for attr in attributes: 1827db96d56Sopenharmony_ci self.assertEqual(getattr(tobj, attr), attr==expectedattr) 1837db96d56Sopenharmony_ci 1847db96d56Sopenharmony_ci # check that ExitNow exceptions in the object handler method 1857db96d56Sopenharmony_ci # bubbles all the way up through asyncore readwrite call 1867db96d56Sopenharmony_ci tr1 = exitingdummy() 1877db96d56Sopenharmony_ci self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) 1887db96d56Sopenharmony_ci 1897db96d56Sopenharmony_ci # check that an exception other than ExitNow in the object handler 1907db96d56Sopenharmony_ci # method causes the handle_error method to get called 1917db96d56Sopenharmony_ci tr2 = crashingdummy() 1927db96d56Sopenharmony_ci self.assertEqual(tr2.error_handled, False) 1937db96d56Sopenharmony_ci asyncore.readwrite(tr2, flag) 1947db96d56Sopenharmony_ci self.assertEqual(tr2.error_handled, True) 1957db96d56Sopenharmony_ci 1967db96d56Sopenharmony_ci def test_closeall(self): 1977db96d56Sopenharmony_ci self.closeall_check(False) 1987db96d56Sopenharmony_ci 1997db96d56Sopenharmony_ci def test_closeall_default(self): 2007db96d56Sopenharmony_ci self.closeall_check(True) 2017db96d56Sopenharmony_ci 2027db96d56Sopenharmony_ci def closeall_check(self, usedefault): 2037db96d56Sopenharmony_ci # Check that close_all() closes everything in a given map 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_ci l = [] 2067db96d56Sopenharmony_ci testmap = {} 2077db96d56Sopenharmony_ci for i in range(10): 2087db96d56Sopenharmony_ci c = dummychannel() 2097db96d56Sopenharmony_ci l.append(c) 2107db96d56Sopenharmony_ci self.assertEqual(c.socket.closed, False) 2117db96d56Sopenharmony_ci testmap[i] = c 2127db96d56Sopenharmony_ci 2137db96d56Sopenharmony_ci if usedefault: 2147db96d56Sopenharmony_ci socketmap = asyncore.socket_map 2157db96d56Sopenharmony_ci try: 2167db96d56Sopenharmony_ci asyncore.socket_map = testmap 2177db96d56Sopenharmony_ci asyncore.close_all() 2187db96d56Sopenharmony_ci finally: 2197db96d56Sopenharmony_ci testmap, asyncore.socket_map = asyncore.socket_map, socketmap 2207db96d56Sopenharmony_ci else: 2217db96d56Sopenharmony_ci asyncore.close_all(testmap) 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_ci self.assertEqual(len(testmap), 0) 2247db96d56Sopenharmony_ci 2257db96d56Sopenharmony_ci for c in l: 2267db96d56Sopenharmony_ci self.assertEqual(c.socket.closed, True) 2277db96d56Sopenharmony_ci 2287db96d56Sopenharmony_ci def test_compact_traceback(self): 2297db96d56Sopenharmony_ci try: 2307db96d56Sopenharmony_ci raise Exception("I don't like spam!") 2317db96d56Sopenharmony_ci except: 2327db96d56Sopenharmony_ci real_t, real_v, real_tb = sys.exc_info() 2337db96d56Sopenharmony_ci r = asyncore.compact_traceback() 2347db96d56Sopenharmony_ci else: 2357db96d56Sopenharmony_ci self.fail("Expected exception") 2367db96d56Sopenharmony_ci 2377db96d56Sopenharmony_ci (f, function, line), t, v, info = r 2387db96d56Sopenharmony_ci self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py') 2397db96d56Sopenharmony_ci self.assertEqual(function, 'test_compact_traceback') 2407db96d56Sopenharmony_ci self.assertEqual(t, real_t) 2417db96d56Sopenharmony_ci self.assertEqual(v, real_v) 2427db96d56Sopenharmony_ci self.assertEqual(info, '[%s|%s|%s]' % (f, function, line)) 2437db96d56Sopenharmony_ci 2447db96d56Sopenharmony_ci 2457db96d56Sopenharmony_ciclass DispatcherTests(unittest.TestCase): 2467db96d56Sopenharmony_ci def setUp(self): 2477db96d56Sopenharmony_ci pass 2487db96d56Sopenharmony_ci 2497db96d56Sopenharmony_ci def tearDown(self): 2507db96d56Sopenharmony_ci asyncore.close_all() 2517db96d56Sopenharmony_ci 2527db96d56Sopenharmony_ci def test_basic(self): 2537db96d56Sopenharmony_ci d = asyncore.dispatcher() 2547db96d56Sopenharmony_ci self.assertEqual(d.readable(), True) 2557db96d56Sopenharmony_ci self.assertEqual(d.writable(), True) 2567db96d56Sopenharmony_ci 2577db96d56Sopenharmony_ci def test_repr(self): 2587db96d56Sopenharmony_ci d = asyncore.dispatcher() 2597db96d56Sopenharmony_ci self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d)) 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ci def test_log(self): 2627db96d56Sopenharmony_ci d = asyncore.dispatcher() 2637db96d56Sopenharmony_ci 2647db96d56Sopenharmony_ci # capture output of dispatcher.log() (to stderr) 2657db96d56Sopenharmony_ci l1 = "Lovely spam! Wonderful spam!" 2667db96d56Sopenharmony_ci l2 = "I don't like spam!" 2677db96d56Sopenharmony_ci with support.captured_stderr() as stderr: 2687db96d56Sopenharmony_ci d.log(l1) 2697db96d56Sopenharmony_ci d.log(l2) 2707db96d56Sopenharmony_ci 2717db96d56Sopenharmony_ci lines = stderr.getvalue().splitlines() 2727db96d56Sopenharmony_ci self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2]) 2737db96d56Sopenharmony_ci 2747db96d56Sopenharmony_ci def test_log_info(self): 2757db96d56Sopenharmony_ci d = asyncore.dispatcher() 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci # capture output of dispatcher.log_info() (to stdout via print) 2787db96d56Sopenharmony_ci l1 = "Have you got anything without spam?" 2797db96d56Sopenharmony_ci l2 = "Why can't she have egg bacon spam and sausage?" 2807db96d56Sopenharmony_ci l3 = "THAT'S got spam in it!" 2817db96d56Sopenharmony_ci with support.captured_stdout() as stdout: 2827db96d56Sopenharmony_ci d.log_info(l1, 'EGGS') 2837db96d56Sopenharmony_ci d.log_info(l2) 2847db96d56Sopenharmony_ci d.log_info(l3, 'SPAM') 2857db96d56Sopenharmony_ci 2867db96d56Sopenharmony_ci lines = stdout.getvalue().splitlines() 2877db96d56Sopenharmony_ci expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] 2887db96d56Sopenharmony_ci self.assertEqual(lines, expected) 2897db96d56Sopenharmony_ci 2907db96d56Sopenharmony_ci def test_unhandled(self): 2917db96d56Sopenharmony_ci d = asyncore.dispatcher() 2927db96d56Sopenharmony_ci d.ignore_log_types = () 2937db96d56Sopenharmony_ci 2947db96d56Sopenharmony_ci # capture output of dispatcher.log_info() (to stdout via print) 2957db96d56Sopenharmony_ci with support.captured_stdout() as stdout: 2967db96d56Sopenharmony_ci d.handle_expt() 2977db96d56Sopenharmony_ci d.handle_read() 2987db96d56Sopenharmony_ci d.handle_write() 2997db96d56Sopenharmony_ci d.handle_connect() 3007db96d56Sopenharmony_ci 3017db96d56Sopenharmony_ci lines = stdout.getvalue().splitlines() 3027db96d56Sopenharmony_ci expected = ['warning: unhandled incoming priority event', 3037db96d56Sopenharmony_ci 'warning: unhandled read event', 3047db96d56Sopenharmony_ci 'warning: unhandled write event', 3057db96d56Sopenharmony_ci 'warning: unhandled connect event'] 3067db96d56Sopenharmony_ci self.assertEqual(lines, expected) 3077db96d56Sopenharmony_ci 3087db96d56Sopenharmony_ci def test_strerror(self): 3097db96d56Sopenharmony_ci # refers to bug #8573 3107db96d56Sopenharmony_ci err = asyncore._strerror(errno.EPERM) 3117db96d56Sopenharmony_ci if hasattr(os, 'strerror'): 3127db96d56Sopenharmony_ci self.assertEqual(err, os.strerror(errno.EPERM)) 3137db96d56Sopenharmony_ci err = asyncore._strerror(-1) 3147db96d56Sopenharmony_ci self.assertTrue(err != "") 3157db96d56Sopenharmony_ci 3167db96d56Sopenharmony_ci 3177db96d56Sopenharmony_ciclass dispatcherwithsend_noread(asyncore.dispatcher_with_send): 3187db96d56Sopenharmony_ci def readable(self): 3197db96d56Sopenharmony_ci return False 3207db96d56Sopenharmony_ci 3217db96d56Sopenharmony_ci def handle_connect(self): 3227db96d56Sopenharmony_ci pass 3237db96d56Sopenharmony_ci 3247db96d56Sopenharmony_ci 3257db96d56Sopenharmony_ciclass DispatcherWithSendTests(unittest.TestCase): 3267db96d56Sopenharmony_ci def setUp(self): 3277db96d56Sopenharmony_ci pass 3287db96d56Sopenharmony_ci 3297db96d56Sopenharmony_ci def tearDown(self): 3307db96d56Sopenharmony_ci asyncore.close_all() 3317db96d56Sopenharmony_ci 3327db96d56Sopenharmony_ci @threading_helper.reap_threads 3337db96d56Sopenharmony_ci def test_send(self): 3347db96d56Sopenharmony_ci evt = threading.Event() 3357db96d56Sopenharmony_ci sock = socket.socket() 3367db96d56Sopenharmony_ci sock.settimeout(3) 3377db96d56Sopenharmony_ci port = socket_helper.bind_port(sock) 3387db96d56Sopenharmony_ci 3397db96d56Sopenharmony_ci cap = BytesIO() 3407db96d56Sopenharmony_ci args = (evt, cap, sock) 3417db96d56Sopenharmony_ci t = threading.Thread(target=capture_server, args=args) 3427db96d56Sopenharmony_ci t.start() 3437db96d56Sopenharmony_ci try: 3447db96d56Sopenharmony_ci # wait a little longer for the server to initialize (it sometimes 3457db96d56Sopenharmony_ci # refuses connections on slow machines without this wait) 3467db96d56Sopenharmony_ci time.sleep(0.2) 3477db96d56Sopenharmony_ci 3487db96d56Sopenharmony_ci data = b"Suppose there isn't a 16-ton weight?" 3497db96d56Sopenharmony_ci d = dispatcherwithsend_noread() 3507db96d56Sopenharmony_ci d.create_socket() 3517db96d56Sopenharmony_ci d.connect((socket_helper.HOST, port)) 3527db96d56Sopenharmony_ci 3537db96d56Sopenharmony_ci # give time for socket to connect 3547db96d56Sopenharmony_ci time.sleep(0.1) 3557db96d56Sopenharmony_ci 3567db96d56Sopenharmony_ci d.send(data) 3577db96d56Sopenharmony_ci d.send(data) 3587db96d56Sopenharmony_ci d.send(b'\n') 3597db96d56Sopenharmony_ci 3607db96d56Sopenharmony_ci n = 1000 3617db96d56Sopenharmony_ci while d.out_buffer and n > 0: 3627db96d56Sopenharmony_ci asyncore.poll() 3637db96d56Sopenharmony_ci n -= 1 3647db96d56Sopenharmony_ci 3657db96d56Sopenharmony_ci evt.wait() 3667db96d56Sopenharmony_ci 3677db96d56Sopenharmony_ci self.assertEqual(cap.getvalue(), data*2) 3687db96d56Sopenharmony_ci finally: 3697db96d56Sopenharmony_ci threading_helper.join_thread(t) 3707db96d56Sopenharmony_ci 3717db96d56Sopenharmony_ci 3727db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'), 3737db96d56Sopenharmony_ci 'asyncore.file_wrapper required') 3747db96d56Sopenharmony_ciclass FileWrapperTest(unittest.TestCase): 3757db96d56Sopenharmony_ci def setUp(self): 3767db96d56Sopenharmony_ci self.d = b"It's not dead, it's sleeping!" 3777db96d56Sopenharmony_ci with open(os_helper.TESTFN, 'wb') as file: 3787db96d56Sopenharmony_ci file.write(self.d) 3797db96d56Sopenharmony_ci 3807db96d56Sopenharmony_ci def tearDown(self): 3817db96d56Sopenharmony_ci os_helper.unlink(os_helper.TESTFN) 3827db96d56Sopenharmony_ci 3837db96d56Sopenharmony_ci def test_recv(self): 3847db96d56Sopenharmony_ci fd = os.open(os_helper.TESTFN, os.O_RDONLY) 3857db96d56Sopenharmony_ci w = asyncore.file_wrapper(fd) 3867db96d56Sopenharmony_ci os.close(fd) 3877db96d56Sopenharmony_ci 3887db96d56Sopenharmony_ci self.assertNotEqual(w.fd, fd) 3897db96d56Sopenharmony_ci self.assertNotEqual(w.fileno(), fd) 3907db96d56Sopenharmony_ci self.assertEqual(w.recv(13), b"It's not dead") 3917db96d56Sopenharmony_ci self.assertEqual(w.read(6), b", it's") 3927db96d56Sopenharmony_ci w.close() 3937db96d56Sopenharmony_ci self.assertRaises(OSError, w.read, 1) 3947db96d56Sopenharmony_ci 3957db96d56Sopenharmony_ci def test_send(self): 3967db96d56Sopenharmony_ci d1 = b"Come again?" 3977db96d56Sopenharmony_ci d2 = b"I want to buy some cheese." 3987db96d56Sopenharmony_ci fd = os.open(os_helper.TESTFN, os.O_WRONLY | os.O_APPEND) 3997db96d56Sopenharmony_ci w = asyncore.file_wrapper(fd) 4007db96d56Sopenharmony_ci os.close(fd) 4017db96d56Sopenharmony_ci 4027db96d56Sopenharmony_ci w.write(d1) 4037db96d56Sopenharmony_ci w.send(d2) 4047db96d56Sopenharmony_ci w.close() 4057db96d56Sopenharmony_ci with open(os_helper.TESTFN, 'rb') as file: 4067db96d56Sopenharmony_ci self.assertEqual(file.read(), self.d + d1 + d2) 4077db96d56Sopenharmony_ci 4087db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'), 4097db96d56Sopenharmony_ci 'asyncore.file_dispatcher required') 4107db96d56Sopenharmony_ci def test_dispatcher(self): 4117db96d56Sopenharmony_ci fd = os.open(os_helper.TESTFN, os.O_RDONLY) 4127db96d56Sopenharmony_ci data = [] 4137db96d56Sopenharmony_ci class FileDispatcher(asyncore.file_dispatcher): 4147db96d56Sopenharmony_ci def handle_read(self): 4157db96d56Sopenharmony_ci data.append(self.recv(29)) 4167db96d56Sopenharmony_ci s = FileDispatcher(fd) 4177db96d56Sopenharmony_ci os.close(fd) 4187db96d56Sopenharmony_ci asyncore.loop(timeout=0.01, use_poll=True, count=2) 4197db96d56Sopenharmony_ci self.assertEqual(b"".join(data), self.d) 4207db96d56Sopenharmony_ci 4217db96d56Sopenharmony_ci def test_resource_warning(self): 4227db96d56Sopenharmony_ci # Issue #11453 4237db96d56Sopenharmony_ci fd = os.open(os_helper.TESTFN, os.O_RDONLY) 4247db96d56Sopenharmony_ci f = asyncore.file_wrapper(fd) 4257db96d56Sopenharmony_ci 4267db96d56Sopenharmony_ci os.close(fd) 4277db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', ResourceWarning)): 4287db96d56Sopenharmony_ci f = None 4297db96d56Sopenharmony_ci support.gc_collect() 4307db96d56Sopenharmony_ci 4317db96d56Sopenharmony_ci def test_close_twice(self): 4327db96d56Sopenharmony_ci fd = os.open(os_helper.TESTFN, os.O_RDONLY) 4337db96d56Sopenharmony_ci f = asyncore.file_wrapper(fd) 4347db96d56Sopenharmony_ci os.close(fd) 4357db96d56Sopenharmony_ci 4367db96d56Sopenharmony_ci os.close(f.fd) # file_wrapper dupped fd 4377db96d56Sopenharmony_ci with self.assertRaises(OSError): 4387db96d56Sopenharmony_ci f.close() 4397db96d56Sopenharmony_ci 4407db96d56Sopenharmony_ci self.assertEqual(f.fd, -1) 4417db96d56Sopenharmony_ci # calling close twice should not fail 4427db96d56Sopenharmony_ci f.close() 4437db96d56Sopenharmony_ci 4447db96d56Sopenharmony_ci 4457db96d56Sopenharmony_ciclass BaseTestHandler(asyncore.dispatcher): 4467db96d56Sopenharmony_ci 4477db96d56Sopenharmony_ci def __init__(self, sock=None): 4487db96d56Sopenharmony_ci asyncore.dispatcher.__init__(self, sock) 4497db96d56Sopenharmony_ci self.flag = False 4507db96d56Sopenharmony_ci 4517db96d56Sopenharmony_ci def handle_accept(self): 4527db96d56Sopenharmony_ci raise Exception("handle_accept not supposed to be called") 4537db96d56Sopenharmony_ci 4547db96d56Sopenharmony_ci def handle_accepted(self): 4557db96d56Sopenharmony_ci raise Exception("handle_accepted not supposed to be called") 4567db96d56Sopenharmony_ci 4577db96d56Sopenharmony_ci def handle_connect(self): 4587db96d56Sopenharmony_ci raise Exception("handle_connect not supposed to be called") 4597db96d56Sopenharmony_ci 4607db96d56Sopenharmony_ci def handle_expt(self): 4617db96d56Sopenharmony_ci raise Exception("handle_expt not supposed to be called") 4627db96d56Sopenharmony_ci 4637db96d56Sopenharmony_ci def handle_close(self): 4647db96d56Sopenharmony_ci raise Exception("handle_close not supposed to be called") 4657db96d56Sopenharmony_ci 4667db96d56Sopenharmony_ci def handle_error(self): 4677db96d56Sopenharmony_ci raise 4687db96d56Sopenharmony_ci 4697db96d56Sopenharmony_ci 4707db96d56Sopenharmony_ciclass BaseServer(asyncore.dispatcher): 4717db96d56Sopenharmony_ci """A server which listens on an address and dispatches the 4727db96d56Sopenharmony_ci connection to a handler. 4737db96d56Sopenharmony_ci """ 4747db96d56Sopenharmony_ci 4757db96d56Sopenharmony_ci def __init__(self, family, addr, handler=BaseTestHandler): 4767db96d56Sopenharmony_ci asyncore.dispatcher.__init__(self) 4777db96d56Sopenharmony_ci self.create_socket(family) 4787db96d56Sopenharmony_ci self.set_reuse_addr() 4797db96d56Sopenharmony_ci bind_af_aware(self.socket, addr) 4807db96d56Sopenharmony_ci self.listen(5) 4817db96d56Sopenharmony_ci self.handler = handler 4827db96d56Sopenharmony_ci 4837db96d56Sopenharmony_ci @property 4847db96d56Sopenharmony_ci def address(self): 4857db96d56Sopenharmony_ci return self.socket.getsockname() 4867db96d56Sopenharmony_ci 4877db96d56Sopenharmony_ci def handle_accepted(self, sock, addr): 4887db96d56Sopenharmony_ci self.handler(sock) 4897db96d56Sopenharmony_ci 4907db96d56Sopenharmony_ci def handle_error(self): 4917db96d56Sopenharmony_ci raise 4927db96d56Sopenharmony_ci 4937db96d56Sopenharmony_ci 4947db96d56Sopenharmony_ciclass BaseClient(BaseTestHandler): 4957db96d56Sopenharmony_ci 4967db96d56Sopenharmony_ci def __init__(self, family, address): 4977db96d56Sopenharmony_ci BaseTestHandler.__init__(self) 4987db96d56Sopenharmony_ci self.create_socket(family) 4997db96d56Sopenharmony_ci self.connect(address) 5007db96d56Sopenharmony_ci 5017db96d56Sopenharmony_ci def handle_connect(self): 5027db96d56Sopenharmony_ci pass 5037db96d56Sopenharmony_ci 5047db96d56Sopenharmony_ci 5057db96d56Sopenharmony_ciclass BaseTestAPI: 5067db96d56Sopenharmony_ci 5077db96d56Sopenharmony_ci def tearDown(self): 5087db96d56Sopenharmony_ci asyncore.close_all(ignore_all=True) 5097db96d56Sopenharmony_ci 5107db96d56Sopenharmony_ci def loop_waiting_for_flag(self, instance, timeout=5): 5117db96d56Sopenharmony_ci timeout = float(timeout) / 100 5127db96d56Sopenharmony_ci count = 100 5137db96d56Sopenharmony_ci while asyncore.socket_map and count > 0: 5147db96d56Sopenharmony_ci asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll) 5157db96d56Sopenharmony_ci if instance.flag: 5167db96d56Sopenharmony_ci return 5177db96d56Sopenharmony_ci count -= 1 5187db96d56Sopenharmony_ci time.sleep(timeout) 5197db96d56Sopenharmony_ci self.fail("flag not set") 5207db96d56Sopenharmony_ci 5217db96d56Sopenharmony_ci def test_handle_connect(self): 5227db96d56Sopenharmony_ci # make sure handle_connect is called on connect() 5237db96d56Sopenharmony_ci 5247db96d56Sopenharmony_ci class TestClient(BaseClient): 5257db96d56Sopenharmony_ci def handle_connect(self): 5267db96d56Sopenharmony_ci self.flag = True 5277db96d56Sopenharmony_ci 5287db96d56Sopenharmony_ci server = BaseServer(self.family, self.addr) 5297db96d56Sopenharmony_ci client = TestClient(self.family, server.address) 5307db96d56Sopenharmony_ci self.loop_waiting_for_flag(client) 5317db96d56Sopenharmony_ci 5327db96d56Sopenharmony_ci def test_handle_accept(self): 5337db96d56Sopenharmony_ci # make sure handle_accept() is called when a client connects 5347db96d56Sopenharmony_ci 5357db96d56Sopenharmony_ci class TestListener(BaseTestHandler): 5367db96d56Sopenharmony_ci 5377db96d56Sopenharmony_ci def __init__(self, family, addr): 5387db96d56Sopenharmony_ci BaseTestHandler.__init__(self) 5397db96d56Sopenharmony_ci self.create_socket(family) 5407db96d56Sopenharmony_ci bind_af_aware(self.socket, addr) 5417db96d56Sopenharmony_ci self.listen(5) 5427db96d56Sopenharmony_ci self.address = self.socket.getsockname() 5437db96d56Sopenharmony_ci 5447db96d56Sopenharmony_ci def handle_accept(self): 5457db96d56Sopenharmony_ci self.flag = True 5467db96d56Sopenharmony_ci 5477db96d56Sopenharmony_ci server = TestListener(self.family, self.addr) 5487db96d56Sopenharmony_ci client = BaseClient(self.family, server.address) 5497db96d56Sopenharmony_ci self.loop_waiting_for_flag(server) 5507db96d56Sopenharmony_ci 5517db96d56Sopenharmony_ci def test_handle_accepted(self): 5527db96d56Sopenharmony_ci # make sure handle_accepted() is called when a client connects 5537db96d56Sopenharmony_ci 5547db96d56Sopenharmony_ci class TestListener(BaseTestHandler): 5557db96d56Sopenharmony_ci 5567db96d56Sopenharmony_ci def __init__(self, family, addr): 5577db96d56Sopenharmony_ci BaseTestHandler.__init__(self) 5587db96d56Sopenharmony_ci self.create_socket(family) 5597db96d56Sopenharmony_ci bind_af_aware(self.socket, addr) 5607db96d56Sopenharmony_ci self.listen(5) 5617db96d56Sopenharmony_ci self.address = self.socket.getsockname() 5627db96d56Sopenharmony_ci 5637db96d56Sopenharmony_ci def handle_accept(self): 5647db96d56Sopenharmony_ci asyncore.dispatcher.handle_accept(self) 5657db96d56Sopenharmony_ci 5667db96d56Sopenharmony_ci def handle_accepted(self, sock, addr): 5677db96d56Sopenharmony_ci sock.close() 5687db96d56Sopenharmony_ci self.flag = True 5697db96d56Sopenharmony_ci 5707db96d56Sopenharmony_ci server = TestListener(self.family, self.addr) 5717db96d56Sopenharmony_ci client = BaseClient(self.family, server.address) 5727db96d56Sopenharmony_ci self.loop_waiting_for_flag(server) 5737db96d56Sopenharmony_ci 5747db96d56Sopenharmony_ci 5757db96d56Sopenharmony_ci def test_handle_read(self): 5767db96d56Sopenharmony_ci # make sure handle_read is called on data received 5777db96d56Sopenharmony_ci 5787db96d56Sopenharmony_ci class TestClient(BaseClient): 5797db96d56Sopenharmony_ci def handle_read(self): 5807db96d56Sopenharmony_ci self.flag = True 5817db96d56Sopenharmony_ci 5827db96d56Sopenharmony_ci class TestHandler(BaseTestHandler): 5837db96d56Sopenharmony_ci def __init__(self, conn): 5847db96d56Sopenharmony_ci BaseTestHandler.__init__(self, conn) 5857db96d56Sopenharmony_ci self.send(b'x' * 1024) 5867db96d56Sopenharmony_ci 5877db96d56Sopenharmony_ci server = BaseServer(self.family, self.addr, TestHandler) 5887db96d56Sopenharmony_ci client = TestClient(self.family, server.address) 5897db96d56Sopenharmony_ci self.loop_waiting_for_flag(client) 5907db96d56Sopenharmony_ci 5917db96d56Sopenharmony_ci def test_handle_write(self): 5927db96d56Sopenharmony_ci # make sure handle_write is called 5937db96d56Sopenharmony_ci 5947db96d56Sopenharmony_ci class TestClient(BaseClient): 5957db96d56Sopenharmony_ci def handle_write(self): 5967db96d56Sopenharmony_ci self.flag = True 5977db96d56Sopenharmony_ci 5987db96d56Sopenharmony_ci server = BaseServer(self.family, self.addr) 5997db96d56Sopenharmony_ci client = TestClient(self.family, server.address) 6007db96d56Sopenharmony_ci self.loop_waiting_for_flag(client) 6017db96d56Sopenharmony_ci 6027db96d56Sopenharmony_ci def test_handle_close(self): 6037db96d56Sopenharmony_ci # make sure handle_close is called when the other end closes 6047db96d56Sopenharmony_ci # the connection 6057db96d56Sopenharmony_ci 6067db96d56Sopenharmony_ci class TestClient(BaseClient): 6077db96d56Sopenharmony_ci 6087db96d56Sopenharmony_ci def handle_read(self): 6097db96d56Sopenharmony_ci # in order to make handle_close be called we are supposed 6107db96d56Sopenharmony_ci # to make at least one recv() call 6117db96d56Sopenharmony_ci self.recv(1024) 6127db96d56Sopenharmony_ci 6137db96d56Sopenharmony_ci def handle_close(self): 6147db96d56Sopenharmony_ci self.flag = True 6157db96d56Sopenharmony_ci self.close() 6167db96d56Sopenharmony_ci 6177db96d56Sopenharmony_ci class TestHandler(BaseTestHandler): 6187db96d56Sopenharmony_ci def __init__(self, conn): 6197db96d56Sopenharmony_ci BaseTestHandler.__init__(self, conn) 6207db96d56Sopenharmony_ci self.close() 6217db96d56Sopenharmony_ci 6227db96d56Sopenharmony_ci server = BaseServer(self.family, self.addr, TestHandler) 6237db96d56Sopenharmony_ci client = TestClient(self.family, server.address) 6247db96d56Sopenharmony_ci self.loop_waiting_for_flag(client) 6257db96d56Sopenharmony_ci 6267db96d56Sopenharmony_ci def test_handle_close_after_conn_broken(self): 6277db96d56Sopenharmony_ci # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and 6287db96d56Sopenharmony_ci # #11265). 6297db96d56Sopenharmony_ci 6307db96d56Sopenharmony_ci data = b'\0' * 128 6317db96d56Sopenharmony_ci 6327db96d56Sopenharmony_ci class TestClient(BaseClient): 6337db96d56Sopenharmony_ci 6347db96d56Sopenharmony_ci def handle_write(self): 6357db96d56Sopenharmony_ci self.send(data) 6367db96d56Sopenharmony_ci 6377db96d56Sopenharmony_ci def handle_close(self): 6387db96d56Sopenharmony_ci self.flag = True 6397db96d56Sopenharmony_ci self.close() 6407db96d56Sopenharmony_ci 6417db96d56Sopenharmony_ci def handle_expt(self): 6427db96d56Sopenharmony_ci self.flag = True 6437db96d56Sopenharmony_ci self.close() 6447db96d56Sopenharmony_ci 6457db96d56Sopenharmony_ci class TestHandler(BaseTestHandler): 6467db96d56Sopenharmony_ci 6477db96d56Sopenharmony_ci def handle_read(self): 6487db96d56Sopenharmony_ci self.recv(len(data)) 6497db96d56Sopenharmony_ci self.close() 6507db96d56Sopenharmony_ci 6517db96d56Sopenharmony_ci def writable(self): 6527db96d56Sopenharmony_ci return False 6537db96d56Sopenharmony_ci 6547db96d56Sopenharmony_ci server = BaseServer(self.family, self.addr, TestHandler) 6557db96d56Sopenharmony_ci client = TestClient(self.family, server.address) 6567db96d56Sopenharmony_ci self.loop_waiting_for_flag(client) 6577db96d56Sopenharmony_ci 6587db96d56Sopenharmony_ci @unittest.skipIf(sys.platform.startswith("sunos"), 6597db96d56Sopenharmony_ci "OOB support is broken on Solaris") 6607db96d56Sopenharmony_ci def test_handle_expt(self): 6617db96d56Sopenharmony_ci # Make sure handle_expt is called on OOB data received. 6627db96d56Sopenharmony_ci # Note: this might fail on some platforms as OOB data is 6637db96d56Sopenharmony_ci # tenuously supported and rarely used. 6647db96d56Sopenharmony_ci if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 6657db96d56Sopenharmony_ci self.skipTest("Not applicable to AF_UNIX sockets.") 6667db96d56Sopenharmony_ci 6677db96d56Sopenharmony_ci if sys.platform == "darwin" and self.use_poll: 6687db96d56Sopenharmony_ci self.skipTest("poll may fail on macOS; see issue #28087") 6697db96d56Sopenharmony_ci 6707db96d56Sopenharmony_ci class TestClient(BaseClient): 6717db96d56Sopenharmony_ci def handle_expt(self): 6727db96d56Sopenharmony_ci self.socket.recv(1024, socket.MSG_OOB) 6737db96d56Sopenharmony_ci self.flag = True 6747db96d56Sopenharmony_ci 6757db96d56Sopenharmony_ci class TestHandler(BaseTestHandler): 6767db96d56Sopenharmony_ci def __init__(self, conn): 6777db96d56Sopenharmony_ci BaseTestHandler.__init__(self, conn) 6787db96d56Sopenharmony_ci self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB) 6797db96d56Sopenharmony_ci 6807db96d56Sopenharmony_ci server = BaseServer(self.family, self.addr, TestHandler) 6817db96d56Sopenharmony_ci client = TestClient(self.family, server.address) 6827db96d56Sopenharmony_ci self.loop_waiting_for_flag(client) 6837db96d56Sopenharmony_ci 6847db96d56Sopenharmony_ci def test_handle_error(self): 6857db96d56Sopenharmony_ci 6867db96d56Sopenharmony_ci class TestClient(BaseClient): 6877db96d56Sopenharmony_ci def handle_write(self): 6887db96d56Sopenharmony_ci 1.0 / 0 6897db96d56Sopenharmony_ci def handle_error(self): 6907db96d56Sopenharmony_ci self.flag = True 6917db96d56Sopenharmony_ci try: 6927db96d56Sopenharmony_ci raise 6937db96d56Sopenharmony_ci except ZeroDivisionError: 6947db96d56Sopenharmony_ci pass 6957db96d56Sopenharmony_ci else: 6967db96d56Sopenharmony_ci raise Exception("exception not raised") 6977db96d56Sopenharmony_ci 6987db96d56Sopenharmony_ci server = BaseServer(self.family, self.addr) 6997db96d56Sopenharmony_ci client = TestClient(self.family, server.address) 7007db96d56Sopenharmony_ci self.loop_waiting_for_flag(client) 7017db96d56Sopenharmony_ci 7027db96d56Sopenharmony_ci def test_connection_attributes(self): 7037db96d56Sopenharmony_ci server = BaseServer(self.family, self.addr) 7047db96d56Sopenharmony_ci client = BaseClient(self.family, server.address) 7057db96d56Sopenharmony_ci 7067db96d56Sopenharmony_ci # we start disconnected 7077db96d56Sopenharmony_ci self.assertFalse(server.connected) 7087db96d56Sopenharmony_ci self.assertTrue(server.accepting) 7097db96d56Sopenharmony_ci # this can't be taken for granted across all platforms 7107db96d56Sopenharmony_ci #self.assertFalse(client.connected) 7117db96d56Sopenharmony_ci self.assertFalse(client.accepting) 7127db96d56Sopenharmony_ci 7137db96d56Sopenharmony_ci # execute some loops so that client connects to server 7147db96d56Sopenharmony_ci asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100) 7157db96d56Sopenharmony_ci self.assertFalse(server.connected) 7167db96d56Sopenharmony_ci self.assertTrue(server.accepting) 7177db96d56Sopenharmony_ci self.assertTrue(client.connected) 7187db96d56Sopenharmony_ci self.assertFalse(client.accepting) 7197db96d56Sopenharmony_ci 7207db96d56Sopenharmony_ci # disconnect the client 7217db96d56Sopenharmony_ci client.close() 7227db96d56Sopenharmony_ci self.assertFalse(server.connected) 7237db96d56Sopenharmony_ci self.assertTrue(server.accepting) 7247db96d56Sopenharmony_ci self.assertFalse(client.connected) 7257db96d56Sopenharmony_ci self.assertFalse(client.accepting) 7267db96d56Sopenharmony_ci 7277db96d56Sopenharmony_ci # stop serving 7287db96d56Sopenharmony_ci server.close() 7297db96d56Sopenharmony_ci self.assertFalse(server.connected) 7307db96d56Sopenharmony_ci self.assertFalse(server.accepting) 7317db96d56Sopenharmony_ci 7327db96d56Sopenharmony_ci def test_create_socket(self): 7337db96d56Sopenharmony_ci s = asyncore.dispatcher() 7347db96d56Sopenharmony_ci s.create_socket(self.family) 7357db96d56Sopenharmony_ci self.assertEqual(s.socket.type, socket.SOCK_STREAM) 7367db96d56Sopenharmony_ci self.assertEqual(s.socket.family, self.family) 7377db96d56Sopenharmony_ci self.assertEqual(s.socket.gettimeout(), 0) 7387db96d56Sopenharmony_ci self.assertFalse(s.socket.get_inheritable()) 7397db96d56Sopenharmony_ci 7407db96d56Sopenharmony_ci def test_bind(self): 7417db96d56Sopenharmony_ci if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 7427db96d56Sopenharmony_ci self.skipTest("Not applicable to AF_UNIX sockets.") 7437db96d56Sopenharmony_ci s1 = asyncore.dispatcher() 7447db96d56Sopenharmony_ci s1.create_socket(self.family) 7457db96d56Sopenharmony_ci s1.bind(self.addr) 7467db96d56Sopenharmony_ci s1.listen(5) 7477db96d56Sopenharmony_ci port = s1.socket.getsockname()[1] 7487db96d56Sopenharmony_ci 7497db96d56Sopenharmony_ci s2 = asyncore.dispatcher() 7507db96d56Sopenharmony_ci s2.create_socket(self.family) 7517db96d56Sopenharmony_ci # EADDRINUSE indicates the socket was correctly bound 7527db96d56Sopenharmony_ci self.assertRaises(OSError, s2.bind, (self.addr[0], port)) 7537db96d56Sopenharmony_ci 7547db96d56Sopenharmony_ci def test_set_reuse_addr(self): 7557db96d56Sopenharmony_ci if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 7567db96d56Sopenharmony_ci self.skipTest("Not applicable to AF_UNIX sockets.") 7577db96d56Sopenharmony_ci 7587db96d56Sopenharmony_ci with socket.socket(self.family) as sock: 7597db96d56Sopenharmony_ci try: 7607db96d56Sopenharmony_ci sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 7617db96d56Sopenharmony_ci except OSError: 7627db96d56Sopenharmony_ci unittest.skip("SO_REUSEADDR not supported on this platform") 7637db96d56Sopenharmony_ci else: 7647db96d56Sopenharmony_ci # if SO_REUSEADDR succeeded for sock we expect asyncore 7657db96d56Sopenharmony_ci # to do the same 7667db96d56Sopenharmony_ci s = asyncore.dispatcher(socket.socket(self.family)) 7677db96d56Sopenharmony_ci self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, 7687db96d56Sopenharmony_ci socket.SO_REUSEADDR)) 7697db96d56Sopenharmony_ci s.socket.close() 7707db96d56Sopenharmony_ci s.create_socket(self.family) 7717db96d56Sopenharmony_ci s.set_reuse_addr() 7727db96d56Sopenharmony_ci self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, 7737db96d56Sopenharmony_ci socket.SO_REUSEADDR)) 7747db96d56Sopenharmony_ci 7757db96d56Sopenharmony_ci @threading_helper.reap_threads 7767db96d56Sopenharmony_ci def test_quick_connect(self): 7777db96d56Sopenharmony_ci # see: http://bugs.python.org/issue10340 7787db96d56Sopenharmony_ci if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())): 7797db96d56Sopenharmony_ci self.skipTest("test specific to AF_INET and AF_INET6") 7807db96d56Sopenharmony_ci 7817db96d56Sopenharmony_ci server = BaseServer(self.family, self.addr) 7827db96d56Sopenharmony_ci # run the thread 500 ms: the socket should be connected in 200 ms 7837db96d56Sopenharmony_ci t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, 7847db96d56Sopenharmony_ci count=5)) 7857db96d56Sopenharmony_ci t.start() 7867db96d56Sopenharmony_ci try: 7877db96d56Sopenharmony_ci with socket.socket(self.family, socket.SOCK_STREAM) as s: 7887db96d56Sopenharmony_ci s.settimeout(.2) 7897db96d56Sopenharmony_ci s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 7907db96d56Sopenharmony_ci struct.pack('ii', 1, 0)) 7917db96d56Sopenharmony_ci 7927db96d56Sopenharmony_ci try: 7937db96d56Sopenharmony_ci s.connect(server.address) 7947db96d56Sopenharmony_ci except OSError: 7957db96d56Sopenharmony_ci pass 7967db96d56Sopenharmony_ci finally: 7977db96d56Sopenharmony_ci threading_helper.join_thread(t) 7987db96d56Sopenharmony_ci 7997db96d56Sopenharmony_ciclass TestAPI_UseIPv4Sockets(BaseTestAPI): 8007db96d56Sopenharmony_ci family = socket.AF_INET 8017db96d56Sopenharmony_ci addr = (socket_helper.HOST, 0) 8027db96d56Sopenharmony_ci 8037db96d56Sopenharmony_ci@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 support required') 8047db96d56Sopenharmony_ciclass TestAPI_UseIPv6Sockets(BaseTestAPI): 8057db96d56Sopenharmony_ci family = socket.AF_INET6 8067db96d56Sopenharmony_ci addr = (socket_helper.HOSTv6, 0) 8077db96d56Sopenharmony_ci 8087db96d56Sopenharmony_ci@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') 8097db96d56Sopenharmony_ciclass TestAPI_UseUnixSockets(BaseTestAPI): 8107db96d56Sopenharmony_ci if HAS_UNIX_SOCKETS: 8117db96d56Sopenharmony_ci family = socket.AF_UNIX 8127db96d56Sopenharmony_ci addr = os_helper.TESTFN 8137db96d56Sopenharmony_ci 8147db96d56Sopenharmony_ci def tearDown(self): 8157db96d56Sopenharmony_ci os_helper.unlink(self.addr) 8167db96d56Sopenharmony_ci BaseTestAPI.tearDown(self) 8177db96d56Sopenharmony_ci 8187db96d56Sopenharmony_ciclass TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase): 8197db96d56Sopenharmony_ci use_poll = False 8207db96d56Sopenharmony_ci 8217db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 8227db96d56Sopenharmony_ciclass TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets, unittest.TestCase): 8237db96d56Sopenharmony_ci use_poll = True 8247db96d56Sopenharmony_ci 8257db96d56Sopenharmony_ciclass TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets, unittest.TestCase): 8267db96d56Sopenharmony_ci use_poll = False 8277db96d56Sopenharmony_ci 8287db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 8297db96d56Sopenharmony_ciclass TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets, unittest.TestCase): 8307db96d56Sopenharmony_ci use_poll = True 8317db96d56Sopenharmony_ci 8327db96d56Sopenharmony_ciclass TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets, unittest.TestCase): 8337db96d56Sopenharmony_ci use_poll = False 8347db96d56Sopenharmony_ci 8357db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 8367db96d56Sopenharmony_ciclass TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets, unittest.TestCase): 8377db96d56Sopenharmony_ci use_poll = True 8387db96d56Sopenharmony_ci 8397db96d56Sopenharmony_ciif __name__ == "__main__": 8407db96d56Sopenharmony_ci unittest.main() 841