xref: /third_party/python/Lib/test/test_kqueue.py (revision 7db96d56)
17db96d56Sopenharmony_ci"""
27db96d56Sopenharmony_ciTests for kqueue wrapper.
37db96d56Sopenharmony_ci"""
47db96d56Sopenharmony_ciimport errno
57db96d56Sopenharmony_ciimport os
67db96d56Sopenharmony_ciimport select
77db96d56Sopenharmony_ciimport socket
87db96d56Sopenharmony_ciimport time
97db96d56Sopenharmony_ciimport unittest
107db96d56Sopenharmony_ci
117db96d56Sopenharmony_ciif not hasattr(select, "kqueue"):
127db96d56Sopenharmony_ci    raise unittest.SkipTest("test works only on BSD")
137db96d56Sopenharmony_ci
147db96d56Sopenharmony_ciclass TestKQueue(unittest.TestCase):
157db96d56Sopenharmony_ci    def test_create_queue(self):
167db96d56Sopenharmony_ci        kq = select.kqueue()
177db96d56Sopenharmony_ci        self.assertTrue(kq.fileno() > 0, kq.fileno())
187db96d56Sopenharmony_ci        self.assertTrue(not kq.closed)
197db96d56Sopenharmony_ci        kq.close()
207db96d56Sopenharmony_ci        self.assertTrue(kq.closed)
217db96d56Sopenharmony_ci        self.assertRaises(ValueError, kq.fileno)
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_ci    def test_create_event(self):
247db96d56Sopenharmony_ci        from operator import lt, le, gt, ge
257db96d56Sopenharmony_ci
267db96d56Sopenharmony_ci        fd = os.open(os.devnull, os.O_WRONLY)
277db96d56Sopenharmony_ci        self.addCleanup(os.close, fd)
287db96d56Sopenharmony_ci
297db96d56Sopenharmony_ci        ev = select.kevent(fd)
307db96d56Sopenharmony_ci        other = select.kevent(1000)
317db96d56Sopenharmony_ci        self.assertEqual(ev.ident, fd)
327db96d56Sopenharmony_ci        self.assertEqual(ev.filter, select.KQ_FILTER_READ)
337db96d56Sopenharmony_ci        self.assertEqual(ev.flags, select.KQ_EV_ADD)
347db96d56Sopenharmony_ci        self.assertEqual(ev.fflags, 0)
357db96d56Sopenharmony_ci        self.assertEqual(ev.data, 0)
367db96d56Sopenharmony_ci        self.assertEqual(ev.udata, 0)
377db96d56Sopenharmony_ci        self.assertEqual(ev, ev)
387db96d56Sopenharmony_ci        self.assertNotEqual(ev, other)
397db96d56Sopenharmony_ci        self.assertTrue(ev < other)
407db96d56Sopenharmony_ci        self.assertTrue(other >= ev)
417db96d56Sopenharmony_ci        for op in lt, le, gt, ge:
427db96d56Sopenharmony_ci            self.assertRaises(TypeError, op, ev, None)
437db96d56Sopenharmony_ci            self.assertRaises(TypeError, op, ev, 1)
447db96d56Sopenharmony_ci            self.assertRaises(TypeError, op, ev, "ev")
457db96d56Sopenharmony_ci
467db96d56Sopenharmony_ci        ev = select.kevent(fd, select.KQ_FILTER_WRITE)
477db96d56Sopenharmony_ci        self.assertEqual(ev.ident, fd)
487db96d56Sopenharmony_ci        self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
497db96d56Sopenharmony_ci        self.assertEqual(ev.flags, select.KQ_EV_ADD)
507db96d56Sopenharmony_ci        self.assertEqual(ev.fflags, 0)
517db96d56Sopenharmony_ci        self.assertEqual(ev.data, 0)
527db96d56Sopenharmony_ci        self.assertEqual(ev.udata, 0)
537db96d56Sopenharmony_ci        self.assertEqual(ev, ev)
547db96d56Sopenharmony_ci        self.assertNotEqual(ev, other)
557db96d56Sopenharmony_ci
567db96d56Sopenharmony_ci        ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
577db96d56Sopenharmony_ci        self.assertEqual(ev.ident, fd)
587db96d56Sopenharmony_ci        self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
597db96d56Sopenharmony_ci        self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
607db96d56Sopenharmony_ci        self.assertEqual(ev.fflags, 0)
617db96d56Sopenharmony_ci        self.assertEqual(ev.data, 0)
627db96d56Sopenharmony_ci        self.assertEqual(ev.udata, 0)
637db96d56Sopenharmony_ci        self.assertEqual(ev, ev)
647db96d56Sopenharmony_ci        self.assertNotEqual(ev, other)
657db96d56Sopenharmony_ci
667db96d56Sopenharmony_ci        ev = select.kevent(1, 2, 3, 4, 5, 6)
677db96d56Sopenharmony_ci        self.assertEqual(ev.ident, 1)
687db96d56Sopenharmony_ci        self.assertEqual(ev.filter, 2)
697db96d56Sopenharmony_ci        self.assertEqual(ev.flags, 3)
707db96d56Sopenharmony_ci        self.assertEqual(ev.fflags, 4)
717db96d56Sopenharmony_ci        self.assertEqual(ev.data, 5)
727db96d56Sopenharmony_ci        self.assertEqual(ev.udata, 6)
737db96d56Sopenharmony_ci        self.assertEqual(ev, ev)
747db96d56Sopenharmony_ci        self.assertNotEqual(ev, other)
757db96d56Sopenharmony_ci
767db96d56Sopenharmony_ci        bignum = 0x7fff
777db96d56Sopenharmony_ci        ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum)
787db96d56Sopenharmony_ci        self.assertEqual(ev.ident, bignum)
797db96d56Sopenharmony_ci        self.assertEqual(ev.filter, 1)
807db96d56Sopenharmony_ci        self.assertEqual(ev.flags, 2)
817db96d56Sopenharmony_ci        self.assertEqual(ev.fflags, 3)
827db96d56Sopenharmony_ci        self.assertEqual(ev.data, bignum - 1)
837db96d56Sopenharmony_ci        self.assertEqual(ev.udata, bignum)
847db96d56Sopenharmony_ci        self.assertEqual(ev, ev)
857db96d56Sopenharmony_ci        self.assertNotEqual(ev, other)
867db96d56Sopenharmony_ci
877db96d56Sopenharmony_ci        # Issue 11973
887db96d56Sopenharmony_ci        bignum = 0xffff
897db96d56Sopenharmony_ci        ev = select.kevent(0, 1, bignum)
907db96d56Sopenharmony_ci        self.assertEqual(ev.ident, 0)
917db96d56Sopenharmony_ci        self.assertEqual(ev.filter, 1)
927db96d56Sopenharmony_ci        self.assertEqual(ev.flags, bignum)
937db96d56Sopenharmony_ci        self.assertEqual(ev.fflags, 0)
947db96d56Sopenharmony_ci        self.assertEqual(ev.data, 0)
957db96d56Sopenharmony_ci        self.assertEqual(ev.udata, 0)
967db96d56Sopenharmony_ci        self.assertEqual(ev, ev)
977db96d56Sopenharmony_ci        self.assertNotEqual(ev, other)
987db96d56Sopenharmony_ci
997db96d56Sopenharmony_ci        # Issue 11973
1007db96d56Sopenharmony_ci        bignum = 0xffffffff
1017db96d56Sopenharmony_ci        ev = select.kevent(0, 1, 2, bignum)
1027db96d56Sopenharmony_ci        self.assertEqual(ev.ident, 0)
1037db96d56Sopenharmony_ci        self.assertEqual(ev.filter, 1)
1047db96d56Sopenharmony_ci        self.assertEqual(ev.flags, 2)
1057db96d56Sopenharmony_ci        self.assertEqual(ev.fflags, bignum)
1067db96d56Sopenharmony_ci        self.assertEqual(ev.data, 0)
1077db96d56Sopenharmony_ci        self.assertEqual(ev.udata, 0)
1087db96d56Sopenharmony_ci        self.assertEqual(ev, ev)
1097db96d56Sopenharmony_ci        self.assertNotEqual(ev, other)
1107db96d56Sopenharmony_ci
1117db96d56Sopenharmony_ci
1127db96d56Sopenharmony_ci    def test_queue_event(self):
1137db96d56Sopenharmony_ci        serverSocket = socket.create_server(('127.0.0.1', 0))
1147db96d56Sopenharmony_ci        client = socket.socket()
1157db96d56Sopenharmony_ci        client.setblocking(False)
1167db96d56Sopenharmony_ci        try:
1177db96d56Sopenharmony_ci            client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
1187db96d56Sopenharmony_ci        except OSError as e:
1197db96d56Sopenharmony_ci            self.assertEqual(e.args[0], errno.EINPROGRESS)
1207db96d56Sopenharmony_ci        else:
1217db96d56Sopenharmony_ci            #raise AssertionError("Connect should have raised EINPROGRESS")
1227db96d56Sopenharmony_ci            pass # FreeBSD doesn't raise an exception here
1237db96d56Sopenharmony_ci        server, addr = serverSocket.accept()
1247db96d56Sopenharmony_ci
1257db96d56Sopenharmony_ci        kq = select.kqueue()
1267db96d56Sopenharmony_ci        kq2 = select.kqueue.fromfd(kq.fileno())
1277db96d56Sopenharmony_ci
1287db96d56Sopenharmony_ci        ev = select.kevent(server.fileno(),
1297db96d56Sopenharmony_ci                           select.KQ_FILTER_WRITE,
1307db96d56Sopenharmony_ci                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
1317db96d56Sopenharmony_ci        kq.control([ev], 0)
1327db96d56Sopenharmony_ci        ev = select.kevent(server.fileno(),
1337db96d56Sopenharmony_ci                           select.KQ_FILTER_READ,
1347db96d56Sopenharmony_ci                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
1357db96d56Sopenharmony_ci        kq.control([ev], 0)
1367db96d56Sopenharmony_ci        ev = select.kevent(client.fileno(),
1377db96d56Sopenharmony_ci                           select.KQ_FILTER_WRITE,
1387db96d56Sopenharmony_ci                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
1397db96d56Sopenharmony_ci        kq2.control([ev], 0)
1407db96d56Sopenharmony_ci        ev = select.kevent(client.fileno(),
1417db96d56Sopenharmony_ci                           select.KQ_FILTER_READ,
1427db96d56Sopenharmony_ci                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
1437db96d56Sopenharmony_ci        kq2.control([ev], 0)
1447db96d56Sopenharmony_ci
1457db96d56Sopenharmony_ci        events = kq.control(None, 4, 1)
1467db96d56Sopenharmony_ci        events = set((e.ident, e.filter) for e in events)
1477db96d56Sopenharmony_ci        self.assertEqual(events, set([
1487db96d56Sopenharmony_ci            (client.fileno(), select.KQ_FILTER_WRITE),
1497db96d56Sopenharmony_ci            (server.fileno(), select.KQ_FILTER_WRITE)]))
1507db96d56Sopenharmony_ci
1517db96d56Sopenharmony_ci        client.send(b"Hello!")
1527db96d56Sopenharmony_ci        server.send(b"world!!!")
1537db96d56Sopenharmony_ci
1547db96d56Sopenharmony_ci        # We may need to call it several times
1557db96d56Sopenharmony_ci        for i in range(10):
1567db96d56Sopenharmony_ci            events = kq.control(None, 4, 1)
1577db96d56Sopenharmony_ci            if len(events) == 4:
1587db96d56Sopenharmony_ci                break
1597db96d56Sopenharmony_ci            time.sleep(1.0)
1607db96d56Sopenharmony_ci        else:
1617db96d56Sopenharmony_ci            self.fail('timeout waiting for event notifications')
1627db96d56Sopenharmony_ci
1637db96d56Sopenharmony_ci        events = set((e.ident, e.filter) for e in events)
1647db96d56Sopenharmony_ci        self.assertEqual(events, set([
1657db96d56Sopenharmony_ci            (client.fileno(), select.KQ_FILTER_WRITE),
1667db96d56Sopenharmony_ci            (client.fileno(), select.KQ_FILTER_READ),
1677db96d56Sopenharmony_ci            (server.fileno(), select.KQ_FILTER_WRITE),
1687db96d56Sopenharmony_ci            (server.fileno(), select.KQ_FILTER_READ)]))
1697db96d56Sopenharmony_ci
1707db96d56Sopenharmony_ci        # Remove completely client, and server read part
1717db96d56Sopenharmony_ci        ev = select.kevent(client.fileno(),
1727db96d56Sopenharmony_ci                           select.KQ_FILTER_WRITE,
1737db96d56Sopenharmony_ci                           select.KQ_EV_DELETE)
1747db96d56Sopenharmony_ci        kq.control([ev], 0)
1757db96d56Sopenharmony_ci        ev = select.kevent(client.fileno(),
1767db96d56Sopenharmony_ci                           select.KQ_FILTER_READ,
1777db96d56Sopenharmony_ci                           select.KQ_EV_DELETE)
1787db96d56Sopenharmony_ci        kq.control([ev], 0)
1797db96d56Sopenharmony_ci        ev = select.kevent(server.fileno(),
1807db96d56Sopenharmony_ci                           select.KQ_FILTER_READ,
1817db96d56Sopenharmony_ci                           select.KQ_EV_DELETE)
1827db96d56Sopenharmony_ci        kq.control([ev], 0, 0)
1837db96d56Sopenharmony_ci
1847db96d56Sopenharmony_ci        events = kq.control([], 4, 0.99)
1857db96d56Sopenharmony_ci        events = set((e.ident, e.filter) for e in events)
1867db96d56Sopenharmony_ci        self.assertEqual(events, set([
1877db96d56Sopenharmony_ci            (server.fileno(), select.KQ_FILTER_WRITE)]))
1887db96d56Sopenharmony_ci
1897db96d56Sopenharmony_ci        client.close()
1907db96d56Sopenharmony_ci        server.close()
1917db96d56Sopenharmony_ci        serverSocket.close()
1927db96d56Sopenharmony_ci
1937db96d56Sopenharmony_ci    def testPair(self):
1947db96d56Sopenharmony_ci        kq = select.kqueue()
1957db96d56Sopenharmony_ci        a, b = socket.socketpair()
1967db96d56Sopenharmony_ci
1977db96d56Sopenharmony_ci        a.send(b'foo')
1987db96d56Sopenharmony_ci        event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
1997db96d56Sopenharmony_ci        event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
2007db96d56Sopenharmony_ci        r = kq.control([event1, event2], 1, 1)
2017db96d56Sopenharmony_ci        self.assertTrue(r)
2027db96d56Sopenharmony_ci        self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
2037db96d56Sopenharmony_ci        self.assertEqual(b.recv(r[0].data), b'foo')
2047db96d56Sopenharmony_ci
2057db96d56Sopenharmony_ci        a.close()
2067db96d56Sopenharmony_ci        b.close()
2077db96d56Sopenharmony_ci        kq.close()
2087db96d56Sopenharmony_ci
2097db96d56Sopenharmony_ci    def test_issue30058(self):
2107db96d56Sopenharmony_ci        # changelist must be an iterable
2117db96d56Sopenharmony_ci        kq = select.kqueue()
2127db96d56Sopenharmony_ci        a, b = socket.socketpair()
2137db96d56Sopenharmony_ci        ev = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
2147db96d56Sopenharmony_ci
2157db96d56Sopenharmony_ci        kq.control([ev], 0)
2167db96d56Sopenharmony_ci        # not a list
2177db96d56Sopenharmony_ci        kq.control((ev,), 0)
2187db96d56Sopenharmony_ci        # __len__ is not consistent with __iter__
2197db96d56Sopenharmony_ci        class BadList:
2207db96d56Sopenharmony_ci            def __len__(self):
2217db96d56Sopenharmony_ci                return 0
2227db96d56Sopenharmony_ci            def __iter__(self):
2237db96d56Sopenharmony_ci                for i in range(100):
2247db96d56Sopenharmony_ci                    yield ev
2257db96d56Sopenharmony_ci        kq.control(BadList(), 0)
2267db96d56Sopenharmony_ci        # doesn't have __len__
2277db96d56Sopenharmony_ci        kq.control(iter([ev]), 0)
2287db96d56Sopenharmony_ci
2297db96d56Sopenharmony_ci        a.close()
2307db96d56Sopenharmony_ci        b.close()
2317db96d56Sopenharmony_ci        kq.close()
2327db96d56Sopenharmony_ci
2337db96d56Sopenharmony_ci    def test_close(self):
2347db96d56Sopenharmony_ci        open_file = open(__file__, "rb")
2357db96d56Sopenharmony_ci        self.addCleanup(open_file.close)
2367db96d56Sopenharmony_ci        fd = open_file.fileno()
2377db96d56Sopenharmony_ci        kqueue = select.kqueue()
2387db96d56Sopenharmony_ci
2397db96d56Sopenharmony_ci        # test fileno() method and closed attribute
2407db96d56Sopenharmony_ci        self.assertIsInstance(kqueue.fileno(), int)
2417db96d56Sopenharmony_ci        self.assertFalse(kqueue.closed)
2427db96d56Sopenharmony_ci
2437db96d56Sopenharmony_ci        # test close()
2447db96d56Sopenharmony_ci        kqueue.close()
2457db96d56Sopenharmony_ci        self.assertTrue(kqueue.closed)
2467db96d56Sopenharmony_ci        self.assertRaises(ValueError, kqueue.fileno)
2477db96d56Sopenharmony_ci
2487db96d56Sopenharmony_ci        # close() can be called more than once
2497db96d56Sopenharmony_ci        kqueue.close()
2507db96d56Sopenharmony_ci
2517db96d56Sopenharmony_ci        # operations must fail with ValueError("I/O operation on closed ...")
2527db96d56Sopenharmony_ci        self.assertRaises(ValueError, kqueue.control, None, 4)
2537db96d56Sopenharmony_ci
2547db96d56Sopenharmony_ci    def test_fd_non_inheritable(self):
2557db96d56Sopenharmony_ci        kqueue = select.kqueue()
2567db96d56Sopenharmony_ci        self.addCleanup(kqueue.close)
2577db96d56Sopenharmony_ci        self.assertEqual(os.get_inheritable(kqueue.fileno()), False)
2587db96d56Sopenharmony_ci
2597db96d56Sopenharmony_ci
2607db96d56Sopenharmony_ciif __name__ == "__main__":
2617db96d56Sopenharmony_ci    unittest.main()
262