17db96d56Sopenharmony_ci"""Unit tests for socket timeout feature.""" 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport functools 47db96d56Sopenharmony_ciimport unittest 57db96d56Sopenharmony_cifrom test import support 67db96d56Sopenharmony_cifrom test.support import socket_helper 77db96d56Sopenharmony_ci 87db96d56Sopenharmony_ciimport time 97db96d56Sopenharmony_ciimport errno 107db96d56Sopenharmony_ciimport socket 117db96d56Sopenharmony_ci 127db96d56Sopenharmony_ci 137db96d56Sopenharmony_ci@functools.lru_cache() 147db96d56Sopenharmony_cidef resolve_address(host, port): 157db96d56Sopenharmony_ci """Resolve an (host, port) to an address. 167db96d56Sopenharmony_ci 177db96d56Sopenharmony_ci We must perform name resolution before timeout tests, otherwise it will be 187db96d56Sopenharmony_ci performed by connect(). 197db96d56Sopenharmony_ci """ 207db96d56Sopenharmony_ci with socket_helper.transient_internet(host): 217db96d56Sopenharmony_ci return socket.getaddrinfo(host, port, socket.AF_INET, 227db96d56Sopenharmony_ci socket.SOCK_STREAM)[0][4] 237db96d56Sopenharmony_ci 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ciclass CreationTestCase(unittest.TestCase): 267db96d56Sopenharmony_ci """Test case for socket.gettimeout() and socket.settimeout()""" 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ci def setUp(self): 297db96d56Sopenharmony_ci self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci def tearDown(self): 327db96d56Sopenharmony_ci self.sock.close() 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ci def testObjectCreation(self): 357db96d56Sopenharmony_ci # Test Socket creation 367db96d56Sopenharmony_ci self.assertEqual(self.sock.gettimeout(), None, 377db96d56Sopenharmony_ci "timeout not disabled by default") 387db96d56Sopenharmony_ci 397db96d56Sopenharmony_ci def testFloatReturnValue(self): 407db96d56Sopenharmony_ci # Test return value of gettimeout() 417db96d56Sopenharmony_ci self.sock.settimeout(7.345) 427db96d56Sopenharmony_ci self.assertEqual(self.sock.gettimeout(), 7.345) 437db96d56Sopenharmony_ci 447db96d56Sopenharmony_ci self.sock.settimeout(3) 457db96d56Sopenharmony_ci self.assertEqual(self.sock.gettimeout(), 3) 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_ci self.sock.settimeout(None) 487db96d56Sopenharmony_ci self.assertEqual(self.sock.gettimeout(), None) 497db96d56Sopenharmony_ci 507db96d56Sopenharmony_ci def testReturnType(self): 517db96d56Sopenharmony_ci # Test return type of gettimeout() 527db96d56Sopenharmony_ci self.sock.settimeout(1) 537db96d56Sopenharmony_ci self.assertEqual(type(self.sock.gettimeout()), type(1.0)) 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ci self.sock.settimeout(3.9) 567db96d56Sopenharmony_ci self.assertEqual(type(self.sock.gettimeout()), type(1.0)) 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ci def testTypeCheck(self): 597db96d56Sopenharmony_ci # Test type checking by settimeout() 607db96d56Sopenharmony_ci self.sock.settimeout(0) 617db96d56Sopenharmony_ci self.sock.settimeout(0) 627db96d56Sopenharmony_ci self.sock.settimeout(0.0) 637db96d56Sopenharmony_ci self.sock.settimeout(None) 647db96d56Sopenharmony_ci self.assertRaises(TypeError, self.sock.settimeout, "") 657db96d56Sopenharmony_ci self.assertRaises(TypeError, self.sock.settimeout, "") 667db96d56Sopenharmony_ci self.assertRaises(TypeError, self.sock.settimeout, ()) 677db96d56Sopenharmony_ci self.assertRaises(TypeError, self.sock.settimeout, []) 687db96d56Sopenharmony_ci self.assertRaises(TypeError, self.sock.settimeout, {}) 697db96d56Sopenharmony_ci self.assertRaises(TypeError, self.sock.settimeout, 0j) 707db96d56Sopenharmony_ci 717db96d56Sopenharmony_ci def testRangeCheck(self): 727db96d56Sopenharmony_ci # Test range checking by settimeout() 737db96d56Sopenharmony_ci self.assertRaises(ValueError, self.sock.settimeout, -1) 747db96d56Sopenharmony_ci self.assertRaises(ValueError, self.sock.settimeout, -1) 757db96d56Sopenharmony_ci self.assertRaises(ValueError, self.sock.settimeout, -1.0) 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ci def testTimeoutThenBlocking(self): 787db96d56Sopenharmony_ci # Test settimeout() followed by setblocking() 797db96d56Sopenharmony_ci self.sock.settimeout(10) 807db96d56Sopenharmony_ci self.sock.setblocking(True) 817db96d56Sopenharmony_ci self.assertEqual(self.sock.gettimeout(), None) 827db96d56Sopenharmony_ci self.sock.setblocking(False) 837db96d56Sopenharmony_ci self.assertEqual(self.sock.gettimeout(), 0.0) 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ci self.sock.settimeout(10) 867db96d56Sopenharmony_ci self.sock.setblocking(False) 877db96d56Sopenharmony_ci self.assertEqual(self.sock.gettimeout(), 0.0) 887db96d56Sopenharmony_ci self.sock.setblocking(True) 897db96d56Sopenharmony_ci self.assertEqual(self.sock.gettimeout(), None) 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci def testBlockingThenTimeout(self): 927db96d56Sopenharmony_ci # Test setblocking() followed by settimeout() 937db96d56Sopenharmony_ci self.sock.setblocking(False) 947db96d56Sopenharmony_ci self.sock.settimeout(1) 957db96d56Sopenharmony_ci self.assertEqual(self.sock.gettimeout(), 1) 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_ci self.sock.setblocking(True) 987db96d56Sopenharmony_ci self.sock.settimeout(1) 997db96d56Sopenharmony_ci self.assertEqual(self.sock.gettimeout(), 1) 1007db96d56Sopenharmony_ci 1017db96d56Sopenharmony_ci 1027db96d56Sopenharmony_ciclass TimeoutTestCase(unittest.TestCase): 1037db96d56Sopenharmony_ci # There are a number of tests here trying to make sure that an operation 1047db96d56Sopenharmony_ci # doesn't take too much longer than expected. But competing machine 1057db96d56Sopenharmony_ci # activity makes it inevitable that such tests will fail at times. 1067db96d56Sopenharmony_ci # When fuzz was at 1.0, I (tim) routinely saw bogus failures on Win2K 1077db96d56Sopenharmony_ci # and Win98SE. Boosting it to 2.0 helped a lot, but isn't a real 1087db96d56Sopenharmony_ci # solution. 1097db96d56Sopenharmony_ci fuzz = 2.0 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ci localhost = socket_helper.HOST 1127db96d56Sopenharmony_ci 1137db96d56Sopenharmony_ci def setUp(self): 1147db96d56Sopenharmony_ci raise NotImplementedError() 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci tearDown = setUp 1177db96d56Sopenharmony_ci 1187db96d56Sopenharmony_ci def _sock_operation(self, count, timeout, method, *args): 1197db96d56Sopenharmony_ci """ 1207db96d56Sopenharmony_ci Test the specified socket method. 1217db96d56Sopenharmony_ci 1227db96d56Sopenharmony_ci The method is run at most `count` times and must raise a TimeoutError 1237db96d56Sopenharmony_ci within `timeout` + self.fuzz seconds. 1247db96d56Sopenharmony_ci """ 1257db96d56Sopenharmony_ci self.sock.settimeout(timeout) 1267db96d56Sopenharmony_ci method = getattr(self.sock, method) 1277db96d56Sopenharmony_ci for i in range(count): 1287db96d56Sopenharmony_ci t1 = time.monotonic() 1297db96d56Sopenharmony_ci try: 1307db96d56Sopenharmony_ci method(*args) 1317db96d56Sopenharmony_ci except TimeoutError as e: 1327db96d56Sopenharmony_ci delta = time.monotonic() - t1 1337db96d56Sopenharmony_ci break 1347db96d56Sopenharmony_ci else: 1357db96d56Sopenharmony_ci self.fail('TimeoutError was not raised') 1367db96d56Sopenharmony_ci # These checks should account for timing unprecision 1377db96d56Sopenharmony_ci self.assertLess(delta, timeout + self.fuzz) 1387db96d56Sopenharmony_ci self.assertGreater(delta, timeout - 1.0) 1397db96d56Sopenharmony_ci 1407db96d56Sopenharmony_ci 1417db96d56Sopenharmony_ciclass TCPTimeoutTestCase(TimeoutTestCase): 1427db96d56Sopenharmony_ci """TCP test case for socket.socket() timeout functions""" 1437db96d56Sopenharmony_ci 1447db96d56Sopenharmony_ci def setUp(self): 1457db96d56Sopenharmony_ci self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1467db96d56Sopenharmony_ci self.addr_remote = resolve_address('www.python.org.', 80) 1477db96d56Sopenharmony_ci 1487db96d56Sopenharmony_ci def tearDown(self): 1497db96d56Sopenharmony_ci self.sock.close() 1507db96d56Sopenharmony_ci 1517db96d56Sopenharmony_ci @unittest.skipIf(True, 'need to replace these hosts; see bpo-35518') 1527db96d56Sopenharmony_ci def testConnectTimeout(self): 1537db96d56Sopenharmony_ci # Testing connect timeout is tricky: we need to have IP connectivity 1547db96d56Sopenharmony_ci # to a host that silently drops our packets. We can't simulate this 1557db96d56Sopenharmony_ci # from Python because it's a function of the underlying TCP/IP stack. 1567db96d56Sopenharmony_ci # So, the following Snakebite host has been defined: 1577db96d56Sopenharmony_ci blackhole = resolve_address('blackhole.snakebite.net', 56666) 1587db96d56Sopenharmony_ci 1597db96d56Sopenharmony_ci # Blackhole has been configured to silently drop any incoming packets. 1607db96d56Sopenharmony_ci # No RSTs (for TCP) or ICMP UNREACH (for UDP/ICMP) will be sent back 1617db96d56Sopenharmony_ci # to hosts that attempt to connect to this address: which is exactly 1627db96d56Sopenharmony_ci # what we need to confidently test connect timeout. 1637db96d56Sopenharmony_ci 1647db96d56Sopenharmony_ci # However, we want to prevent false positives. It's not unreasonable 1657db96d56Sopenharmony_ci # to expect certain hosts may not be able to reach the blackhole, due 1667db96d56Sopenharmony_ci # to firewalling or general network configuration. In order to improve 1677db96d56Sopenharmony_ci # our confidence in testing the blackhole, a corresponding 'whitehole' 1687db96d56Sopenharmony_ci # has also been set up using one port higher: 1697db96d56Sopenharmony_ci whitehole = resolve_address('whitehole.snakebite.net', 56667) 1707db96d56Sopenharmony_ci 1717db96d56Sopenharmony_ci # This address has been configured to immediately drop any incoming 1727db96d56Sopenharmony_ci # packets as well, but it does it respectfully with regards to the 1737db96d56Sopenharmony_ci # incoming protocol. RSTs are sent for TCP packets, and ICMP UNREACH 1747db96d56Sopenharmony_ci # is sent for UDP/ICMP packets. This means our attempts to connect to 1757db96d56Sopenharmony_ci # it should be met immediately with ECONNREFUSED. The test case has 1767db96d56Sopenharmony_ci # been structured around this premise: if we get an ECONNREFUSED from 1777db96d56Sopenharmony_ci # the whitehole, we proceed with testing connect timeout against the 1787db96d56Sopenharmony_ci # blackhole. If we don't, we skip the test (with a message about not 1797db96d56Sopenharmony_ci # getting the required RST from the whitehole within the required 1807db96d56Sopenharmony_ci # timeframe). 1817db96d56Sopenharmony_ci 1827db96d56Sopenharmony_ci # For the records, the whitehole/blackhole configuration has been set 1837db96d56Sopenharmony_ci # up using the 'pf' firewall (available on BSDs), using the following: 1847db96d56Sopenharmony_ci # 1857db96d56Sopenharmony_ci # ext_if="bge0" 1867db96d56Sopenharmony_ci # 1877db96d56Sopenharmony_ci # blackhole_ip="35.8.247.6" 1887db96d56Sopenharmony_ci # whitehole_ip="35.8.247.6" 1897db96d56Sopenharmony_ci # blackhole_port="56666" 1907db96d56Sopenharmony_ci # whitehole_port="56667" 1917db96d56Sopenharmony_ci # 1927db96d56Sopenharmony_ci # block return in log quick on $ext_if proto { tcp udp } \ 1937db96d56Sopenharmony_ci # from any to $whitehole_ip port $whitehole_port 1947db96d56Sopenharmony_ci # block drop in log quick on $ext_if proto { tcp udp } \ 1957db96d56Sopenharmony_ci # from any to $blackhole_ip port $blackhole_port 1967db96d56Sopenharmony_ci # 1977db96d56Sopenharmony_ci 1987db96d56Sopenharmony_ci skip = True 1997db96d56Sopenharmony_ci sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 2007db96d56Sopenharmony_ci timeout = support.LOOPBACK_TIMEOUT 2017db96d56Sopenharmony_ci sock.settimeout(timeout) 2027db96d56Sopenharmony_ci try: 2037db96d56Sopenharmony_ci sock.connect((whitehole)) 2047db96d56Sopenharmony_ci except TimeoutError: 2057db96d56Sopenharmony_ci pass 2067db96d56Sopenharmony_ci except OSError as err: 2077db96d56Sopenharmony_ci if err.errno == errno.ECONNREFUSED: 2087db96d56Sopenharmony_ci skip = False 2097db96d56Sopenharmony_ci finally: 2107db96d56Sopenharmony_ci sock.close() 2117db96d56Sopenharmony_ci del sock 2127db96d56Sopenharmony_ci 2137db96d56Sopenharmony_ci if skip: 2147db96d56Sopenharmony_ci self.skipTest( 2157db96d56Sopenharmony_ci "We didn't receive a connection reset (RST) packet from " 2167db96d56Sopenharmony_ci "{}:{} within {} seconds, so we're unable to test connect " 2177db96d56Sopenharmony_ci "timeout against the corresponding {}:{} (which is " 2187db96d56Sopenharmony_ci "configured to silently drop packets)." 2197db96d56Sopenharmony_ci .format( 2207db96d56Sopenharmony_ci whitehole[0], 2217db96d56Sopenharmony_ci whitehole[1], 2227db96d56Sopenharmony_ci timeout, 2237db96d56Sopenharmony_ci blackhole[0], 2247db96d56Sopenharmony_ci blackhole[1], 2257db96d56Sopenharmony_ci ) 2267db96d56Sopenharmony_ci ) 2277db96d56Sopenharmony_ci 2287db96d56Sopenharmony_ci # All that hard work just to test if connect times out in 0.001s ;-) 2297db96d56Sopenharmony_ci self.addr_remote = blackhole 2307db96d56Sopenharmony_ci with socket_helper.transient_internet(self.addr_remote[0]): 2317db96d56Sopenharmony_ci self._sock_operation(1, 0.001, 'connect', self.addr_remote) 2327db96d56Sopenharmony_ci 2337db96d56Sopenharmony_ci def testRecvTimeout(self): 2347db96d56Sopenharmony_ci # Test recv() timeout 2357db96d56Sopenharmony_ci with socket_helper.transient_internet(self.addr_remote[0]): 2367db96d56Sopenharmony_ci self.sock.connect(self.addr_remote) 2377db96d56Sopenharmony_ci self._sock_operation(1, 1.5, 'recv', 1024) 2387db96d56Sopenharmony_ci 2397db96d56Sopenharmony_ci def testAcceptTimeout(self): 2407db96d56Sopenharmony_ci # Test accept() timeout 2417db96d56Sopenharmony_ci socket_helper.bind_port(self.sock, self.localhost) 2427db96d56Sopenharmony_ci self.sock.listen() 2437db96d56Sopenharmony_ci self._sock_operation(1, 1.5, 'accept') 2447db96d56Sopenharmony_ci 2457db96d56Sopenharmony_ci def testSend(self): 2467db96d56Sopenharmony_ci # Test send() timeout 2477db96d56Sopenharmony_ci with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: 2487db96d56Sopenharmony_ci socket_helper.bind_port(serv, self.localhost) 2497db96d56Sopenharmony_ci serv.listen() 2507db96d56Sopenharmony_ci self.sock.connect(serv.getsockname()) 2517db96d56Sopenharmony_ci # Send a lot of data in order to bypass buffering in the TCP stack. 2527db96d56Sopenharmony_ci self._sock_operation(100, 1.5, 'send', b"X" * 200000) 2537db96d56Sopenharmony_ci 2547db96d56Sopenharmony_ci def testSendto(self): 2557db96d56Sopenharmony_ci # Test sendto() timeout 2567db96d56Sopenharmony_ci with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: 2577db96d56Sopenharmony_ci socket_helper.bind_port(serv, self.localhost) 2587db96d56Sopenharmony_ci serv.listen() 2597db96d56Sopenharmony_ci self.sock.connect(serv.getsockname()) 2607db96d56Sopenharmony_ci # The address argument is ignored since we already connected. 2617db96d56Sopenharmony_ci self._sock_operation(100, 1.5, 'sendto', b"X" * 200000, 2627db96d56Sopenharmony_ci serv.getsockname()) 2637db96d56Sopenharmony_ci 2647db96d56Sopenharmony_ci def testSendall(self): 2657db96d56Sopenharmony_ci # Test sendall() timeout 2667db96d56Sopenharmony_ci with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: 2677db96d56Sopenharmony_ci socket_helper.bind_port(serv, self.localhost) 2687db96d56Sopenharmony_ci serv.listen() 2697db96d56Sopenharmony_ci self.sock.connect(serv.getsockname()) 2707db96d56Sopenharmony_ci # Send a lot of data in order to bypass buffering in the TCP stack. 2717db96d56Sopenharmony_ci self._sock_operation(100, 1.5, 'sendall', b"X" * 200000) 2727db96d56Sopenharmony_ci 2737db96d56Sopenharmony_ci 2747db96d56Sopenharmony_ciclass UDPTimeoutTestCase(TimeoutTestCase): 2757db96d56Sopenharmony_ci """UDP test case for socket.socket() timeout functions""" 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci def setUp(self): 2787db96d56Sopenharmony_ci self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 2797db96d56Sopenharmony_ci 2807db96d56Sopenharmony_ci def tearDown(self): 2817db96d56Sopenharmony_ci self.sock.close() 2827db96d56Sopenharmony_ci 2837db96d56Sopenharmony_ci def testRecvfromTimeout(self): 2847db96d56Sopenharmony_ci # Test recvfrom() timeout 2857db96d56Sopenharmony_ci # Prevent "Address already in use" socket exceptions 2867db96d56Sopenharmony_ci socket_helper.bind_port(self.sock, self.localhost) 2877db96d56Sopenharmony_ci self._sock_operation(1, 1.5, 'recvfrom', 1024) 2887db96d56Sopenharmony_ci 2897db96d56Sopenharmony_ci 2907db96d56Sopenharmony_cidef setUpModule(): 2917db96d56Sopenharmony_ci support.requires('network') 2927db96d56Sopenharmony_ci support.requires_working_socket(module=True) 2937db96d56Sopenharmony_ci 2947db96d56Sopenharmony_ci 2957db96d56Sopenharmony_ciif __name__ == "__main__": 2967db96d56Sopenharmony_ci unittest.main() 297