17db96d56Sopenharmony_ciimport unittest 27db96d56Sopenharmony_cifrom test import support 37db96d56Sopenharmony_cifrom test.support import os_helper 47db96d56Sopenharmony_cifrom test.support import socket_helper 57db96d56Sopenharmony_ci 67db96d56Sopenharmony_ciimport contextlib 77db96d56Sopenharmony_ciimport socket 87db96d56Sopenharmony_ciimport urllib.parse 97db96d56Sopenharmony_ciimport urllib.request 107db96d56Sopenharmony_ciimport os 117db96d56Sopenharmony_ciimport email.message 127db96d56Sopenharmony_ciimport time 137db96d56Sopenharmony_ci 147db96d56Sopenharmony_ci 157db96d56Sopenharmony_cisupport.requires('network') 167db96d56Sopenharmony_ci 177db96d56Sopenharmony_ci 187db96d56Sopenharmony_ciclass URLTimeoutTest(unittest.TestCase): 197db96d56Sopenharmony_ci # XXX this test doesn't seem to test anything useful. 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_ci def setUp(self): 227db96d56Sopenharmony_ci socket.setdefaulttimeout(support.INTERNET_TIMEOUT) 237db96d56Sopenharmony_ci 247db96d56Sopenharmony_ci def tearDown(self): 257db96d56Sopenharmony_ci socket.setdefaulttimeout(None) 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ci def testURLread(self): 287db96d56Sopenharmony_ci # clear _opener global variable 297db96d56Sopenharmony_ci self.addCleanup(urllib.request.urlcleanup) 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc 327db96d56Sopenharmony_ci with socket_helper.transient_internet(domain): 337db96d56Sopenharmony_ci f = urllib.request.urlopen(support.TEST_HTTP_URL) 347db96d56Sopenharmony_ci f.read() 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_ci 377db96d56Sopenharmony_ciclass urlopenNetworkTests(unittest.TestCase): 387db96d56Sopenharmony_ci """Tests urllib.request.urlopen using the network. 397db96d56Sopenharmony_ci 407db96d56Sopenharmony_ci These tests are not exhaustive. Assuming that testing using files does a 417db96d56Sopenharmony_ci good job overall of some of the basic interface features. There are no 427db96d56Sopenharmony_ci tests exercising the optional 'data' and 'proxies' arguments. No tests 437db96d56Sopenharmony_ci for transparent redirection have been written. 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci setUp is not used for always constructing a connection to 467db96d56Sopenharmony_ci http://www.pythontest.net/ since there a few tests that don't use that address 477db96d56Sopenharmony_ci and making a connection is expensive enough to warrant minimizing unneeded 487db96d56Sopenharmony_ci connections. 497db96d56Sopenharmony_ci 507db96d56Sopenharmony_ci """ 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ci url = 'http://www.pythontest.net/' 537db96d56Sopenharmony_ci 547db96d56Sopenharmony_ci def setUp(self): 557db96d56Sopenharmony_ci # clear _opener global variable 567db96d56Sopenharmony_ci self.addCleanup(urllib.request.urlcleanup) 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ci @contextlib.contextmanager 597db96d56Sopenharmony_ci def urlopen(self, *args, **kwargs): 607db96d56Sopenharmony_ci resource = args[0] 617db96d56Sopenharmony_ci with socket_helper.transient_internet(resource): 627db96d56Sopenharmony_ci r = urllib.request.urlopen(*args, **kwargs) 637db96d56Sopenharmony_ci try: 647db96d56Sopenharmony_ci yield r 657db96d56Sopenharmony_ci finally: 667db96d56Sopenharmony_ci r.close() 677db96d56Sopenharmony_ci 687db96d56Sopenharmony_ci def test_basic(self): 697db96d56Sopenharmony_ci # Simple test expected to pass. 707db96d56Sopenharmony_ci with self.urlopen(self.url) as open_url: 717db96d56Sopenharmony_ci for attr in ("read", "readline", "readlines", "fileno", "close", 727db96d56Sopenharmony_ci "info", "geturl"): 737db96d56Sopenharmony_ci self.assertTrue(hasattr(open_url, attr), "object returned from " 747db96d56Sopenharmony_ci "urlopen lacks the %s attribute" % attr) 757db96d56Sopenharmony_ci self.assertTrue(open_url.read(), "calling 'read' failed") 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ci def test_readlines(self): 787db96d56Sopenharmony_ci # Test both readline and readlines. 797db96d56Sopenharmony_ci with self.urlopen(self.url) as open_url: 807db96d56Sopenharmony_ci self.assertIsInstance(open_url.readline(), bytes, 817db96d56Sopenharmony_ci "readline did not return a string") 827db96d56Sopenharmony_ci self.assertIsInstance(open_url.readlines(), list, 837db96d56Sopenharmony_ci "readlines did not return a list") 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ci def test_info(self): 867db96d56Sopenharmony_ci # Test 'info'. 877db96d56Sopenharmony_ci with self.urlopen(self.url) as open_url: 887db96d56Sopenharmony_ci info_obj = open_url.info() 897db96d56Sopenharmony_ci self.assertIsInstance(info_obj, email.message.Message, 907db96d56Sopenharmony_ci "object returned by 'info' is not an " 917db96d56Sopenharmony_ci "instance of email.message.Message") 927db96d56Sopenharmony_ci self.assertEqual(info_obj.get_content_subtype(), "html") 937db96d56Sopenharmony_ci 947db96d56Sopenharmony_ci def test_geturl(self): 957db96d56Sopenharmony_ci # Make sure same URL as opened is returned by geturl. 967db96d56Sopenharmony_ci with self.urlopen(self.url) as open_url: 977db96d56Sopenharmony_ci gotten_url = open_url.geturl() 987db96d56Sopenharmony_ci self.assertEqual(gotten_url, self.url) 997db96d56Sopenharmony_ci 1007db96d56Sopenharmony_ci def test_getcode(self): 1017db96d56Sopenharmony_ci # test getcode() with the fancy opener to get 404 error codes 1027db96d56Sopenharmony_ci URL = self.url + "XXXinvalidXXX" 1037db96d56Sopenharmony_ci with socket_helper.transient_internet(URL): 1047db96d56Sopenharmony_ci with self.assertWarns(DeprecationWarning): 1057db96d56Sopenharmony_ci open_url = urllib.request.FancyURLopener().open(URL) 1067db96d56Sopenharmony_ci try: 1077db96d56Sopenharmony_ci code = open_url.getcode() 1087db96d56Sopenharmony_ci finally: 1097db96d56Sopenharmony_ci open_url.close() 1107db96d56Sopenharmony_ci self.assertEqual(code, 404) 1117db96d56Sopenharmony_ci 1127db96d56Sopenharmony_ci def test_bad_address(self): 1137db96d56Sopenharmony_ci # Make sure proper exception is raised when connecting to a bogus 1147db96d56Sopenharmony_ci # address. 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci # Given that both VeriSign and various ISPs have in 1177db96d56Sopenharmony_ci # the past or are presently hijacking various invalid 1187db96d56Sopenharmony_ci # domain name requests in an attempt to boost traffic 1197db96d56Sopenharmony_ci # to their own sites, finding a domain name to use 1207db96d56Sopenharmony_ci # for this test is difficult. RFC2606 leads one to 1217db96d56Sopenharmony_ci # believe that '.invalid' should work, but experience 1227db96d56Sopenharmony_ci # seemed to indicate otherwise. Single character 1237db96d56Sopenharmony_ci # TLDs are likely to remain invalid, so this seems to 1247db96d56Sopenharmony_ci # be the best choice. The trailing '.' prevents a 1257db96d56Sopenharmony_ci # related problem: The normal DNS resolver appends 1267db96d56Sopenharmony_ci # the domain names from the search path if there is 1277db96d56Sopenharmony_ci # no '.' the end and, and if one of those domains 1287db96d56Sopenharmony_ci # implements a '*' rule a result is returned. 1297db96d56Sopenharmony_ci # However, none of this will prevent the test from 1307db96d56Sopenharmony_ci # failing if the ISP hijacks all invalid domain 1317db96d56Sopenharmony_ci # requests. The real solution would be to be able to 1327db96d56Sopenharmony_ci # parameterize the framework with a mock resolver. 1337db96d56Sopenharmony_ci bogus_domain = "sadflkjsasf.i.nvali.d." 1347db96d56Sopenharmony_ci try: 1357db96d56Sopenharmony_ci socket.gethostbyname(bogus_domain) 1367db96d56Sopenharmony_ci except OSError: 1377db96d56Sopenharmony_ci # socket.gaierror is too narrow, since getaddrinfo() may also 1387db96d56Sopenharmony_ci # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04), 1397db96d56Sopenharmony_ci # i.e. Python's TimeoutError. 1407db96d56Sopenharmony_ci pass 1417db96d56Sopenharmony_ci else: 1427db96d56Sopenharmony_ci # This happens with some overzealous DNS providers such as OpenDNS 1437db96d56Sopenharmony_ci self.skipTest("%r should not resolve for test to work" % bogus_domain) 1447db96d56Sopenharmony_ci failure_explanation = ('opening an invalid URL did not raise OSError; ' 1457db96d56Sopenharmony_ci 'can be caused by a broken DNS server ' 1467db96d56Sopenharmony_ci '(e.g. returns 404 or hijacks page)') 1477db96d56Sopenharmony_ci with self.assertRaises(OSError, msg=failure_explanation): 1487db96d56Sopenharmony_ci urllib.request.urlopen("http://{}/".format(bogus_domain)) 1497db96d56Sopenharmony_ci 1507db96d56Sopenharmony_ci 1517db96d56Sopenharmony_ciclass urlretrieveNetworkTests(unittest.TestCase): 1527db96d56Sopenharmony_ci """Tests urllib.request.urlretrieve using the network.""" 1537db96d56Sopenharmony_ci 1547db96d56Sopenharmony_ci def setUp(self): 1557db96d56Sopenharmony_ci # remove temporary files created by urlretrieve() 1567db96d56Sopenharmony_ci self.addCleanup(urllib.request.urlcleanup) 1577db96d56Sopenharmony_ci 1587db96d56Sopenharmony_ci @contextlib.contextmanager 1597db96d56Sopenharmony_ci def urlretrieve(self, *args, **kwargs): 1607db96d56Sopenharmony_ci resource = args[0] 1617db96d56Sopenharmony_ci with socket_helper.transient_internet(resource): 1627db96d56Sopenharmony_ci file_location, info = urllib.request.urlretrieve(*args, **kwargs) 1637db96d56Sopenharmony_ci try: 1647db96d56Sopenharmony_ci yield file_location, info 1657db96d56Sopenharmony_ci finally: 1667db96d56Sopenharmony_ci os_helper.unlink(file_location) 1677db96d56Sopenharmony_ci 1687db96d56Sopenharmony_ci def test_basic(self): 1697db96d56Sopenharmony_ci # Test basic functionality. 1707db96d56Sopenharmony_ci with self.urlretrieve(self.logo) as (file_location, info): 1717db96d56Sopenharmony_ci self.assertTrue(os.path.exists(file_location), "file location returned by" 1727db96d56Sopenharmony_ci " urlretrieve is not a valid path") 1737db96d56Sopenharmony_ci with open(file_location, 'rb') as f: 1747db96d56Sopenharmony_ci self.assertTrue(f.read(), "reading from the file location returned" 1757db96d56Sopenharmony_ci " by urlretrieve failed") 1767db96d56Sopenharmony_ci 1777db96d56Sopenharmony_ci def test_specified_path(self): 1787db96d56Sopenharmony_ci # Make sure that specifying the location of the file to write to works. 1797db96d56Sopenharmony_ci with self.urlretrieve(self.logo, 1807db96d56Sopenharmony_ci os_helper.TESTFN) as (file_location, info): 1817db96d56Sopenharmony_ci self.assertEqual(file_location, os_helper.TESTFN) 1827db96d56Sopenharmony_ci self.assertTrue(os.path.exists(file_location)) 1837db96d56Sopenharmony_ci with open(file_location, 'rb') as f: 1847db96d56Sopenharmony_ci self.assertTrue(f.read(), "reading from temporary file failed") 1857db96d56Sopenharmony_ci 1867db96d56Sopenharmony_ci def test_header(self): 1877db96d56Sopenharmony_ci # Make sure header returned as 2nd value from urlretrieve is good. 1887db96d56Sopenharmony_ci with self.urlretrieve(self.logo) as (file_location, info): 1897db96d56Sopenharmony_ci self.assertIsInstance(info, email.message.Message, 1907db96d56Sopenharmony_ci "info is not an instance of email.message.Message") 1917db96d56Sopenharmony_ci 1927db96d56Sopenharmony_ci logo = "http://www.pythontest.net/" 1937db96d56Sopenharmony_ci 1947db96d56Sopenharmony_ci def test_data_header(self): 1957db96d56Sopenharmony_ci with self.urlretrieve(self.logo) as (file_location, fileheaders): 1967db96d56Sopenharmony_ci datevalue = fileheaders.get('Date') 1977db96d56Sopenharmony_ci dateformat = '%a, %d %b %Y %H:%M:%S GMT' 1987db96d56Sopenharmony_ci try: 1997db96d56Sopenharmony_ci time.strptime(datevalue, dateformat) 2007db96d56Sopenharmony_ci except ValueError: 2017db96d56Sopenharmony_ci self.fail('Date value not in %r format' % dateformat) 2027db96d56Sopenharmony_ci 2037db96d56Sopenharmony_ci def test_reporthook(self): 2047db96d56Sopenharmony_ci records = [] 2057db96d56Sopenharmony_ci 2067db96d56Sopenharmony_ci def recording_reporthook(blocks, block_size, total_size): 2077db96d56Sopenharmony_ci records.append((blocks, block_size, total_size)) 2087db96d56Sopenharmony_ci 2097db96d56Sopenharmony_ci with self.urlretrieve(self.logo, reporthook=recording_reporthook) as ( 2107db96d56Sopenharmony_ci file_location, fileheaders): 2117db96d56Sopenharmony_ci expected_size = int(fileheaders['Content-Length']) 2127db96d56Sopenharmony_ci 2137db96d56Sopenharmony_ci records_repr = repr(records) # For use in error messages. 2147db96d56Sopenharmony_ci self.assertGreater(len(records), 1, msg="There should always be two " 2157db96d56Sopenharmony_ci "calls; the first one before the transfer starts.") 2167db96d56Sopenharmony_ci self.assertEqual(records[0][0], 0) 2177db96d56Sopenharmony_ci self.assertGreater(records[0][1], 0, 2187db96d56Sopenharmony_ci msg="block size can't be 0 in %s" % records_repr) 2197db96d56Sopenharmony_ci self.assertEqual(records[0][2], expected_size) 2207db96d56Sopenharmony_ci self.assertEqual(records[-1][2], expected_size) 2217db96d56Sopenharmony_ci 2227db96d56Sopenharmony_ci block_sizes = {block_size for _, block_size, _ in records} 2237db96d56Sopenharmony_ci self.assertEqual({records[0][1]}, block_sizes, 2247db96d56Sopenharmony_ci msg="block sizes in %s must be equal" % records_repr) 2257db96d56Sopenharmony_ci self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size, 2267db96d56Sopenharmony_ci msg="number of blocks * block size must be" 2277db96d56Sopenharmony_ci " >= total size in %s" % records_repr) 2287db96d56Sopenharmony_ci 2297db96d56Sopenharmony_ci 2307db96d56Sopenharmony_ciif __name__ == "__main__": 2317db96d56Sopenharmony_ci unittest.main() 232