17db96d56Sopenharmony_ci""" 27db96d56Sopenharmony_ciTests for kqueue wrapper. 37db96d56Sopenharmony_ci""" 47db96d56Sopenharmony_ciimport errno 57db96d56Sopenharmony_ciimport os 67db96d56Sopenharmony_ciimport select 77db96d56Sopenharmony_ciimport socket 87db96d56Sopenharmony_ciimport time 97db96d56Sopenharmony_ciimport unittest 107db96d56Sopenharmony_ci 117db96d56Sopenharmony_ciif not hasattr(select, "kqueue"): 127db96d56Sopenharmony_ci raise unittest.SkipTest("test works only on BSD") 137db96d56Sopenharmony_ci 147db96d56Sopenharmony_ciclass TestKQueue(unittest.TestCase): 157db96d56Sopenharmony_ci def test_create_queue(self): 167db96d56Sopenharmony_ci kq = select.kqueue() 177db96d56Sopenharmony_ci self.assertTrue(kq.fileno() > 0, kq.fileno()) 187db96d56Sopenharmony_ci self.assertTrue(not kq.closed) 197db96d56Sopenharmony_ci kq.close() 207db96d56Sopenharmony_ci self.assertTrue(kq.closed) 217db96d56Sopenharmony_ci self.assertRaises(ValueError, kq.fileno) 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ci def test_create_event(self): 247db96d56Sopenharmony_ci from operator import lt, le, gt, ge 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ci fd = os.open(os.devnull, os.O_WRONLY) 277db96d56Sopenharmony_ci self.addCleanup(os.close, fd) 287db96d56Sopenharmony_ci 297db96d56Sopenharmony_ci ev = select.kevent(fd) 307db96d56Sopenharmony_ci other = select.kevent(1000) 317db96d56Sopenharmony_ci self.assertEqual(ev.ident, fd) 327db96d56Sopenharmony_ci self.assertEqual(ev.filter, select.KQ_FILTER_READ) 337db96d56Sopenharmony_ci self.assertEqual(ev.flags, select.KQ_EV_ADD) 347db96d56Sopenharmony_ci self.assertEqual(ev.fflags, 0) 357db96d56Sopenharmony_ci self.assertEqual(ev.data, 0) 367db96d56Sopenharmony_ci self.assertEqual(ev.udata, 0) 377db96d56Sopenharmony_ci self.assertEqual(ev, ev) 387db96d56Sopenharmony_ci self.assertNotEqual(ev, other) 397db96d56Sopenharmony_ci self.assertTrue(ev < other) 407db96d56Sopenharmony_ci self.assertTrue(other >= ev) 417db96d56Sopenharmony_ci for op in lt, le, gt, ge: 427db96d56Sopenharmony_ci self.assertRaises(TypeError, op, ev, None) 437db96d56Sopenharmony_ci self.assertRaises(TypeError, op, ev, 1) 447db96d56Sopenharmony_ci self.assertRaises(TypeError, op, ev, "ev") 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_ci ev = select.kevent(fd, select.KQ_FILTER_WRITE) 477db96d56Sopenharmony_ci self.assertEqual(ev.ident, fd) 487db96d56Sopenharmony_ci self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 497db96d56Sopenharmony_ci self.assertEqual(ev.flags, select.KQ_EV_ADD) 507db96d56Sopenharmony_ci self.assertEqual(ev.fflags, 0) 517db96d56Sopenharmony_ci self.assertEqual(ev.data, 0) 527db96d56Sopenharmony_ci self.assertEqual(ev.udata, 0) 537db96d56Sopenharmony_ci self.assertEqual(ev, ev) 547db96d56Sopenharmony_ci self.assertNotEqual(ev, other) 557db96d56Sopenharmony_ci 567db96d56Sopenharmony_ci ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT) 577db96d56Sopenharmony_ci self.assertEqual(ev.ident, fd) 587db96d56Sopenharmony_ci self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 597db96d56Sopenharmony_ci self.assertEqual(ev.flags, select.KQ_EV_ONESHOT) 607db96d56Sopenharmony_ci self.assertEqual(ev.fflags, 0) 617db96d56Sopenharmony_ci self.assertEqual(ev.data, 0) 627db96d56Sopenharmony_ci self.assertEqual(ev.udata, 0) 637db96d56Sopenharmony_ci self.assertEqual(ev, ev) 647db96d56Sopenharmony_ci self.assertNotEqual(ev, other) 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ci ev = select.kevent(1, 2, 3, 4, 5, 6) 677db96d56Sopenharmony_ci self.assertEqual(ev.ident, 1) 687db96d56Sopenharmony_ci self.assertEqual(ev.filter, 2) 697db96d56Sopenharmony_ci self.assertEqual(ev.flags, 3) 707db96d56Sopenharmony_ci self.assertEqual(ev.fflags, 4) 717db96d56Sopenharmony_ci self.assertEqual(ev.data, 5) 727db96d56Sopenharmony_ci self.assertEqual(ev.udata, 6) 737db96d56Sopenharmony_ci self.assertEqual(ev, ev) 747db96d56Sopenharmony_ci self.assertNotEqual(ev, other) 757db96d56Sopenharmony_ci 767db96d56Sopenharmony_ci bignum = 0x7fff 777db96d56Sopenharmony_ci ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum) 787db96d56Sopenharmony_ci self.assertEqual(ev.ident, bignum) 797db96d56Sopenharmony_ci self.assertEqual(ev.filter, 1) 807db96d56Sopenharmony_ci self.assertEqual(ev.flags, 2) 817db96d56Sopenharmony_ci self.assertEqual(ev.fflags, 3) 827db96d56Sopenharmony_ci self.assertEqual(ev.data, bignum - 1) 837db96d56Sopenharmony_ci self.assertEqual(ev.udata, bignum) 847db96d56Sopenharmony_ci self.assertEqual(ev, ev) 857db96d56Sopenharmony_ci self.assertNotEqual(ev, other) 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ci # Issue 11973 887db96d56Sopenharmony_ci bignum = 0xffff 897db96d56Sopenharmony_ci ev = select.kevent(0, 1, bignum) 907db96d56Sopenharmony_ci self.assertEqual(ev.ident, 0) 917db96d56Sopenharmony_ci self.assertEqual(ev.filter, 1) 927db96d56Sopenharmony_ci self.assertEqual(ev.flags, bignum) 937db96d56Sopenharmony_ci self.assertEqual(ev.fflags, 0) 947db96d56Sopenharmony_ci self.assertEqual(ev.data, 0) 957db96d56Sopenharmony_ci self.assertEqual(ev.udata, 0) 967db96d56Sopenharmony_ci self.assertEqual(ev, ev) 977db96d56Sopenharmony_ci self.assertNotEqual(ev, other) 987db96d56Sopenharmony_ci 997db96d56Sopenharmony_ci # Issue 11973 1007db96d56Sopenharmony_ci bignum = 0xffffffff 1017db96d56Sopenharmony_ci ev = select.kevent(0, 1, 2, bignum) 1027db96d56Sopenharmony_ci self.assertEqual(ev.ident, 0) 1037db96d56Sopenharmony_ci self.assertEqual(ev.filter, 1) 1047db96d56Sopenharmony_ci self.assertEqual(ev.flags, 2) 1057db96d56Sopenharmony_ci self.assertEqual(ev.fflags, bignum) 1067db96d56Sopenharmony_ci self.assertEqual(ev.data, 0) 1077db96d56Sopenharmony_ci self.assertEqual(ev.udata, 0) 1087db96d56Sopenharmony_ci self.assertEqual(ev, ev) 1097db96d56Sopenharmony_ci self.assertNotEqual(ev, other) 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ci 1127db96d56Sopenharmony_ci def test_queue_event(self): 1137db96d56Sopenharmony_ci serverSocket = socket.create_server(('127.0.0.1', 0)) 1147db96d56Sopenharmony_ci client = socket.socket() 1157db96d56Sopenharmony_ci client.setblocking(False) 1167db96d56Sopenharmony_ci try: 1177db96d56Sopenharmony_ci client.connect(('127.0.0.1', serverSocket.getsockname()[1])) 1187db96d56Sopenharmony_ci except OSError as e: 1197db96d56Sopenharmony_ci self.assertEqual(e.args[0], errno.EINPROGRESS) 1207db96d56Sopenharmony_ci else: 1217db96d56Sopenharmony_ci #raise AssertionError("Connect should have raised EINPROGRESS") 1227db96d56Sopenharmony_ci pass # FreeBSD doesn't raise an exception here 1237db96d56Sopenharmony_ci server, addr = serverSocket.accept() 1247db96d56Sopenharmony_ci 1257db96d56Sopenharmony_ci kq = select.kqueue() 1267db96d56Sopenharmony_ci kq2 = select.kqueue.fromfd(kq.fileno()) 1277db96d56Sopenharmony_ci 1287db96d56Sopenharmony_ci ev = select.kevent(server.fileno(), 1297db96d56Sopenharmony_ci select.KQ_FILTER_WRITE, 1307db96d56Sopenharmony_ci select.KQ_EV_ADD | select.KQ_EV_ENABLE) 1317db96d56Sopenharmony_ci kq.control([ev], 0) 1327db96d56Sopenharmony_ci ev = select.kevent(server.fileno(), 1337db96d56Sopenharmony_ci select.KQ_FILTER_READ, 1347db96d56Sopenharmony_ci select.KQ_EV_ADD | select.KQ_EV_ENABLE) 1357db96d56Sopenharmony_ci kq.control([ev], 0) 1367db96d56Sopenharmony_ci ev = select.kevent(client.fileno(), 1377db96d56Sopenharmony_ci select.KQ_FILTER_WRITE, 1387db96d56Sopenharmony_ci select.KQ_EV_ADD | select.KQ_EV_ENABLE) 1397db96d56Sopenharmony_ci kq2.control([ev], 0) 1407db96d56Sopenharmony_ci ev = select.kevent(client.fileno(), 1417db96d56Sopenharmony_ci select.KQ_FILTER_READ, 1427db96d56Sopenharmony_ci select.KQ_EV_ADD | select.KQ_EV_ENABLE) 1437db96d56Sopenharmony_ci kq2.control([ev], 0) 1447db96d56Sopenharmony_ci 1457db96d56Sopenharmony_ci events = kq.control(None, 4, 1) 1467db96d56Sopenharmony_ci events = set((e.ident, e.filter) for e in events) 1477db96d56Sopenharmony_ci self.assertEqual(events, set([ 1487db96d56Sopenharmony_ci (client.fileno(), select.KQ_FILTER_WRITE), 1497db96d56Sopenharmony_ci (server.fileno(), select.KQ_FILTER_WRITE)])) 1507db96d56Sopenharmony_ci 1517db96d56Sopenharmony_ci client.send(b"Hello!") 1527db96d56Sopenharmony_ci server.send(b"world!!!") 1537db96d56Sopenharmony_ci 1547db96d56Sopenharmony_ci # We may need to call it several times 1557db96d56Sopenharmony_ci for i in range(10): 1567db96d56Sopenharmony_ci events = kq.control(None, 4, 1) 1577db96d56Sopenharmony_ci if len(events) == 4: 1587db96d56Sopenharmony_ci break 1597db96d56Sopenharmony_ci time.sleep(1.0) 1607db96d56Sopenharmony_ci else: 1617db96d56Sopenharmony_ci self.fail('timeout waiting for event notifications') 1627db96d56Sopenharmony_ci 1637db96d56Sopenharmony_ci events = set((e.ident, e.filter) for e in events) 1647db96d56Sopenharmony_ci self.assertEqual(events, set([ 1657db96d56Sopenharmony_ci (client.fileno(), select.KQ_FILTER_WRITE), 1667db96d56Sopenharmony_ci (client.fileno(), select.KQ_FILTER_READ), 1677db96d56Sopenharmony_ci (server.fileno(), select.KQ_FILTER_WRITE), 1687db96d56Sopenharmony_ci (server.fileno(), select.KQ_FILTER_READ)])) 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ci # Remove completely client, and server read part 1717db96d56Sopenharmony_ci ev = select.kevent(client.fileno(), 1727db96d56Sopenharmony_ci select.KQ_FILTER_WRITE, 1737db96d56Sopenharmony_ci select.KQ_EV_DELETE) 1747db96d56Sopenharmony_ci kq.control([ev], 0) 1757db96d56Sopenharmony_ci ev = select.kevent(client.fileno(), 1767db96d56Sopenharmony_ci select.KQ_FILTER_READ, 1777db96d56Sopenharmony_ci select.KQ_EV_DELETE) 1787db96d56Sopenharmony_ci kq.control([ev], 0) 1797db96d56Sopenharmony_ci ev = select.kevent(server.fileno(), 1807db96d56Sopenharmony_ci select.KQ_FILTER_READ, 1817db96d56Sopenharmony_ci select.KQ_EV_DELETE) 1827db96d56Sopenharmony_ci kq.control([ev], 0, 0) 1837db96d56Sopenharmony_ci 1847db96d56Sopenharmony_ci events = kq.control([], 4, 0.99) 1857db96d56Sopenharmony_ci events = set((e.ident, e.filter) for e in events) 1867db96d56Sopenharmony_ci self.assertEqual(events, set([ 1877db96d56Sopenharmony_ci (server.fileno(), select.KQ_FILTER_WRITE)])) 1887db96d56Sopenharmony_ci 1897db96d56Sopenharmony_ci client.close() 1907db96d56Sopenharmony_ci server.close() 1917db96d56Sopenharmony_ci serverSocket.close() 1927db96d56Sopenharmony_ci 1937db96d56Sopenharmony_ci def testPair(self): 1947db96d56Sopenharmony_ci kq = select.kqueue() 1957db96d56Sopenharmony_ci a, b = socket.socketpair() 1967db96d56Sopenharmony_ci 1977db96d56Sopenharmony_ci a.send(b'foo') 1987db96d56Sopenharmony_ci event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 1997db96d56Sopenharmony_ci event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 2007db96d56Sopenharmony_ci r = kq.control([event1, event2], 1, 1) 2017db96d56Sopenharmony_ci self.assertTrue(r) 2027db96d56Sopenharmony_ci self.assertFalse(r[0].flags & select.KQ_EV_ERROR) 2037db96d56Sopenharmony_ci self.assertEqual(b.recv(r[0].data), b'foo') 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_ci a.close() 2067db96d56Sopenharmony_ci b.close() 2077db96d56Sopenharmony_ci kq.close() 2087db96d56Sopenharmony_ci 2097db96d56Sopenharmony_ci def test_issue30058(self): 2107db96d56Sopenharmony_ci # changelist must be an iterable 2117db96d56Sopenharmony_ci kq = select.kqueue() 2127db96d56Sopenharmony_ci a, b = socket.socketpair() 2137db96d56Sopenharmony_ci ev = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 2147db96d56Sopenharmony_ci 2157db96d56Sopenharmony_ci kq.control([ev], 0) 2167db96d56Sopenharmony_ci # not a list 2177db96d56Sopenharmony_ci kq.control((ev,), 0) 2187db96d56Sopenharmony_ci # __len__ is not consistent with __iter__ 2197db96d56Sopenharmony_ci class BadList: 2207db96d56Sopenharmony_ci def __len__(self): 2217db96d56Sopenharmony_ci return 0 2227db96d56Sopenharmony_ci def __iter__(self): 2237db96d56Sopenharmony_ci for i in range(100): 2247db96d56Sopenharmony_ci yield ev 2257db96d56Sopenharmony_ci kq.control(BadList(), 0) 2267db96d56Sopenharmony_ci # doesn't have __len__ 2277db96d56Sopenharmony_ci kq.control(iter([ev]), 0) 2287db96d56Sopenharmony_ci 2297db96d56Sopenharmony_ci a.close() 2307db96d56Sopenharmony_ci b.close() 2317db96d56Sopenharmony_ci kq.close() 2327db96d56Sopenharmony_ci 2337db96d56Sopenharmony_ci def test_close(self): 2347db96d56Sopenharmony_ci open_file = open(__file__, "rb") 2357db96d56Sopenharmony_ci self.addCleanup(open_file.close) 2367db96d56Sopenharmony_ci fd = open_file.fileno() 2377db96d56Sopenharmony_ci kqueue = select.kqueue() 2387db96d56Sopenharmony_ci 2397db96d56Sopenharmony_ci # test fileno() method and closed attribute 2407db96d56Sopenharmony_ci self.assertIsInstance(kqueue.fileno(), int) 2417db96d56Sopenharmony_ci self.assertFalse(kqueue.closed) 2427db96d56Sopenharmony_ci 2437db96d56Sopenharmony_ci # test close() 2447db96d56Sopenharmony_ci kqueue.close() 2457db96d56Sopenharmony_ci self.assertTrue(kqueue.closed) 2467db96d56Sopenharmony_ci self.assertRaises(ValueError, kqueue.fileno) 2477db96d56Sopenharmony_ci 2487db96d56Sopenharmony_ci # close() can be called more than once 2497db96d56Sopenharmony_ci kqueue.close() 2507db96d56Sopenharmony_ci 2517db96d56Sopenharmony_ci # operations must fail with ValueError("I/O operation on closed ...") 2527db96d56Sopenharmony_ci self.assertRaises(ValueError, kqueue.control, None, 4) 2537db96d56Sopenharmony_ci 2547db96d56Sopenharmony_ci def test_fd_non_inheritable(self): 2557db96d56Sopenharmony_ci kqueue = select.kqueue() 2567db96d56Sopenharmony_ci self.addCleanup(kqueue.close) 2577db96d56Sopenharmony_ci self.assertEqual(os.get_inheritable(kqueue.fileno()), False) 2587db96d56Sopenharmony_ci 2597db96d56Sopenharmony_ci 2607db96d56Sopenharmony_ciif __name__ == "__main__": 2617db96d56Sopenharmony_ci unittest.main() 262