17db96d56Sopenharmony_ciimport unittest
27db96d56Sopenharmony_cifrom test import support
37db96d56Sopenharmony_cifrom test.support import os_helper
47db96d56Sopenharmony_cifrom test.support import socket_helper
57db96d56Sopenharmony_ci
67db96d56Sopenharmony_ciimport contextlib
77db96d56Sopenharmony_ciimport socket
87db96d56Sopenharmony_ciimport urllib.parse
97db96d56Sopenharmony_ciimport urllib.request
107db96d56Sopenharmony_ciimport os
117db96d56Sopenharmony_ciimport email.message
127db96d56Sopenharmony_ciimport time
137db96d56Sopenharmony_ci
147db96d56Sopenharmony_ci
157db96d56Sopenharmony_cisupport.requires('network')
167db96d56Sopenharmony_ci
177db96d56Sopenharmony_ci
187db96d56Sopenharmony_ciclass URLTimeoutTest(unittest.TestCase):
197db96d56Sopenharmony_ci    # XXX this test doesn't seem to test anything useful.
207db96d56Sopenharmony_ci
217db96d56Sopenharmony_ci    def setUp(self):
227db96d56Sopenharmony_ci        socket.setdefaulttimeout(support.INTERNET_TIMEOUT)
237db96d56Sopenharmony_ci
247db96d56Sopenharmony_ci    def tearDown(self):
257db96d56Sopenharmony_ci        socket.setdefaulttimeout(None)
267db96d56Sopenharmony_ci
277db96d56Sopenharmony_ci    def testURLread(self):
287db96d56Sopenharmony_ci        # clear _opener global variable
297db96d56Sopenharmony_ci        self.addCleanup(urllib.request.urlcleanup)
307db96d56Sopenharmony_ci
317db96d56Sopenharmony_ci        domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc
327db96d56Sopenharmony_ci        with socket_helper.transient_internet(domain):
337db96d56Sopenharmony_ci            f = urllib.request.urlopen(support.TEST_HTTP_URL)
347db96d56Sopenharmony_ci            f.read()
357db96d56Sopenharmony_ci
367db96d56Sopenharmony_ci
377db96d56Sopenharmony_ciclass urlopenNetworkTests(unittest.TestCase):
387db96d56Sopenharmony_ci    """Tests urllib.request.urlopen using the network.
397db96d56Sopenharmony_ci
407db96d56Sopenharmony_ci    These tests are not exhaustive.  Assuming that testing using files does a
417db96d56Sopenharmony_ci    good job overall of some of the basic interface features.  There are no
427db96d56Sopenharmony_ci    tests exercising the optional 'data' and 'proxies' arguments.  No tests
437db96d56Sopenharmony_ci    for transparent redirection have been written.
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci    setUp is not used for always constructing a connection to
467db96d56Sopenharmony_ci    http://www.pythontest.net/ since there a few tests that don't use that address
477db96d56Sopenharmony_ci    and making a connection is expensive enough to warrant minimizing unneeded
487db96d56Sopenharmony_ci    connections.
497db96d56Sopenharmony_ci
507db96d56Sopenharmony_ci    """
517db96d56Sopenharmony_ci
527db96d56Sopenharmony_ci    url = 'http://www.pythontest.net/'
537db96d56Sopenharmony_ci
547db96d56Sopenharmony_ci    def setUp(self):
557db96d56Sopenharmony_ci        # clear _opener global variable
567db96d56Sopenharmony_ci        self.addCleanup(urllib.request.urlcleanup)
577db96d56Sopenharmony_ci
587db96d56Sopenharmony_ci    @contextlib.contextmanager
597db96d56Sopenharmony_ci    def urlopen(self, *args, **kwargs):
607db96d56Sopenharmony_ci        resource = args[0]
617db96d56Sopenharmony_ci        with socket_helper.transient_internet(resource):
627db96d56Sopenharmony_ci            r = urllib.request.urlopen(*args, **kwargs)
637db96d56Sopenharmony_ci            try:
647db96d56Sopenharmony_ci                yield r
657db96d56Sopenharmony_ci            finally:
667db96d56Sopenharmony_ci                r.close()
677db96d56Sopenharmony_ci
687db96d56Sopenharmony_ci    def test_basic(self):
697db96d56Sopenharmony_ci        # Simple test expected to pass.
707db96d56Sopenharmony_ci        with self.urlopen(self.url) as open_url:
717db96d56Sopenharmony_ci            for attr in ("read", "readline", "readlines", "fileno", "close",
727db96d56Sopenharmony_ci                         "info", "geturl"):
737db96d56Sopenharmony_ci                self.assertTrue(hasattr(open_url, attr), "object returned from "
747db96d56Sopenharmony_ci                                "urlopen lacks the %s attribute" % attr)
757db96d56Sopenharmony_ci            self.assertTrue(open_url.read(), "calling 'read' failed")
767db96d56Sopenharmony_ci
777db96d56Sopenharmony_ci    def test_readlines(self):
787db96d56Sopenharmony_ci        # Test both readline and readlines.
797db96d56Sopenharmony_ci        with self.urlopen(self.url) as open_url:
807db96d56Sopenharmony_ci            self.assertIsInstance(open_url.readline(), bytes,
817db96d56Sopenharmony_ci                                  "readline did not return a string")
827db96d56Sopenharmony_ci            self.assertIsInstance(open_url.readlines(), list,
837db96d56Sopenharmony_ci                                  "readlines did not return a list")
847db96d56Sopenharmony_ci
857db96d56Sopenharmony_ci    def test_info(self):
867db96d56Sopenharmony_ci        # Test 'info'.
877db96d56Sopenharmony_ci        with self.urlopen(self.url) as open_url:
887db96d56Sopenharmony_ci            info_obj = open_url.info()
897db96d56Sopenharmony_ci            self.assertIsInstance(info_obj, email.message.Message,
907db96d56Sopenharmony_ci                                  "object returned by 'info' is not an "
917db96d56Sopenharmony_ci                                  "instance of email.message.Message")
927db96d56Sopenharmony_ci            self.assertEqual(info_obj.get_content_subtype(), "html")
937db96d56Sopenharmony_ci
947db96d56Sopenharmony_ci    def test_geturl(self):
957db96d56Sopenharmony_ci        # Make sure same URL as opened is returned by geturl.
967db96d56Sopenharmony_ci        with self.urlopen(self.url) as open_url:
977db96d56Sopenharmony_ci            gotten_url = open_url.geturl()
987db96d56Sopenharmony_ci            self.assertEqual(gotten_url, self.url)
997db96d56Sopenharmony_ci
1007db96d56Sopenharmony_ci    def test_getcode(self):
1017db96d56Sopenharmony_ci        # test getcode() with the fancy opener to get 404 error codes
1027db96d56Sopenharmony_ci        URL = self.url + "XXXinvalidXXX"
1037db96d56Sopenharmony_ci        with socket_helper.transient_internet(URL):
1047db96d56Sopenharmony_ci            with self.assertWarns(DeprecationWarning):
1057db96d56Sopenharmony_ci                open_url = urllib.request.FancyURLopener().open(URL)
1067db96d56Sopenharmony_ci            try:
1077db96d56Sopenharmony_ci                code = open_url.getcode()
1087db96d56Sopenharmony_ci            finally:
1097db96d56Sopenharmony_ci                open_url.close()
1107db96d56Sopenharmony_ci            self.assertEqual(code, 404)
1117db96d56Sopenharmony_ci
1127db96d56Sopenharmony_ci    def test_bad_address(self):
1137db96d56Sopenharmony_ci        # Make sure proper exception is raised when connecting to a bogus
1147db96d56Sopenharmony_ci        # address.
1157db96d56Sopenharmony_ci
1167db96d56Sopenharmony_ci        # Given that both VeriSign and various ISPs have in
1177db96d56Sopenharmony_ci        # the past or are presently hijacking various invalid
1187db96d56Sopenharmony_ci        # domain name requests in an attempt to boost traffic
1197db96d56Sopenharmony_ci        # to their own sites, finding a domain name to use
1207db96d56Sopenharmony_ci        # for this test is difficult.  RFC2606 leads one to
1217db96d56Sopenharmony_ci        # believe that '.invalid' should work, but experience
1227db96d56Sopenharmony_ci        # seemed to indicate otherwise.  Single character
1237db96d56Sopenharmony_ci        # TLDs are likely to remain invalid, so this seems to
1247db96d56Sopenharmony_ci        # be the best choice. The trailing '.' prevents a
1257db96d56Sopenharmony_ci        # related problem: The normal DNS resolver appends
1267db96d56Sopenharmony_ci        # the domain names from the search path if there is
1277db96d56Sopenharmony_ci        # no '.' the end and, and if one of those domains
1287db96d56Sopenharmony_ci        # implements a '*' rule a result is returned.
1297db96d56Sopenharmony_ci        # However, none of this will prevent the test from
1307db96d56Sopenharmony_ci        # failing if the ISP hijacks all invalid domain
1317db96d56Sopenharmony_ci        # requests.  The real solution would be to be able to
1327db96d56Sopenharmony_ci        # parameterize the framework with a mock resolver.
1337db96d56Sopenharmony_ci        bogus_domain = "sadflkjsasf.i.nvali.d."
1347db96d56Sopenharmony_ci        try:
1357db96d56Sopenharmony_ci            socket.gethostbyname(bogus_domain)
1367db96d56Sopenharmony_ci        except OSError:
1377db96d56Sopenharmony_ci            # socket.gaierror is too narrow, since getaddrinfo() may also
1387db96d56Sopenharmony_ci            # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04),
1397db96d56Sopenharmony_ci            # i.e. Python's TimeoutError.
1407db96d56Sopenharmony_ci            pass
1417db96d56Sopenharmony_ci        else:
1427db96d56Sopenharmony_ci            # This happens with some overzealous DNS providers such as OpenDNS
1437db96d56Sopenharmony_ci            self.skipTest("%r should not resolve for test to work" % bogus_domain)
1447db96d56Sopenharmony_ci        failure_explanation = ('opening an invalid URL did not raise OSError; '
1457db96d56Sopenharmony_ci                               'can be caused by a broken DNS server '
1467db96d56Sopenharmony_ci                               '(e.g. returns 404 or hijacks page)')
1477db96d56Sopenharmony_ci        with self.assertRaises(OSError, msg=failure_explanation):
1487db96d56Sopenharmony_ci            urllib.request.urlopen("http://{}/".format(bogus_domain))
1497db96d56Sopenharmony_ci
1507db96d56Sopenharmony_ci
1517db96d56Sopenharmony_ciclass urlretrieveNetworkTests(unittest.TestCase):
1527db96d56Sopenharmony_ci    """Tests urllib.request.urlretrieve using the network."""
1537db96d56Sopenharmony_ci
1547db96d56Sopenharmony_ci    def setUp(self):
1557db96d56Sopenharmony_ci        # remove temporary files created by urlretrieve()
1567db96d56Sopenharmony_ci        self.addCleanup(urllib.request.urlcleanup)
1577db96d56Sopenharmony_ci
1587db96d56Sopenharmony_ci    @contextlib.contextmanager
1597db96d56Sopenharmony_ci    def urlretrieve(self, *args, **kwargs):
1607db96d56Sopenharmony_ci        resource = args[0]
1617db96d56Sopenharmony_ci        with socket_helper.transient_internet(resource):
1627db96d56Sopenharmony_ci            file_location, info = urllib.request.urlretrieve(*args, **kwargs)
1637db96d56Sopenharmony_ci            try:
1647db96d56Sopenharmony_ci                yield file_location, info
1657db96d56Sopenharmony_ci            finally:
1667db96d56Sopenharmony_ci                os_helper.unlink(file_location)
1677db96d56Sopenharmony_ci
1687db96d56Sopenharmony_ci    def test_basic(self):
1697db96d56Sopenharmony_ci        # Test basic functionality.
1707db96d56Sopenharmony_ci        with self.urlretrieve(self.logo) as (file_location, info):
1717db96d56Sopenharmony_ci            self.assertTrue(os.path.exists(file_location), "file location returned by"
1727db96d56Sopenharmony_ci                            " urlretrieve is not a valid path")
1737db96d56Sopenharmony_ci            with open(file_location, 'rb') as f:
1747db96d56Sopenharmony_ci                self.assertTrue(f.read(), "reading from the file location returned"
1757db96d56Sopenharmony_ci                                " by urlretrieve failed")
1767db96d56Sopenharmony_ci
1777db96d56Sopenharmony_ci    def test_specified_path(self):
1787db96d56Sopenharmony_ci        # Make sure that specifying the location of the file to write to works.
1797db96d56Sopenharmony_ci        with self.urlretrieve(self.logo,
1807db96d56Sopenharmony_ci                              os_helper.TESTFN) as (file_location, info):
1817db96d56Sopenharmony_ci            self.assertEqual(file_location, os_helper.TESTFN)
1827db96d56Sopenharmony_ci            self.assertTrue(os.path.exists(file_location))
1837db96d56Sopenharmony_ci            with open(file_location, 'rb') as f:
1847db96d56Sopenharmony_ci                self.assertTrue(f.read(), "reading from temporary file failed")
1857db96d56Sopenharmony_ci
1867db96d56Sopenharmony_ci    def test_header(self):
1877db96d56Sopenharmony_ci        # Make sure header returned as 2nd value from urlretrieve is good.
1887db96d56Sopenharmony_ci        with self.urlretrieve(self.logo) as (file_location, info):
1897db96d56Sopenharmony_ci            self.assertIsInstance(info, email.message.Message,
1907db96d56Sopenharmony_ci                                  "info is not an instance of email.message.Message")
1917db96d56Sopenharmony_ci
1927db96d56Sopenharmony_ci    logo = "http://www.pythontest.net/"
1937db96d56Sopenharmony_ci
1947db96d56Sopenharmony_ci    def test_data_header(self):
1957db96d56Sopenharmony_ci        with self.urlretrieve(self.logo) as (file_location, fileheaders):
1967db96d56Sopenharmony_ci            datevalue = fileheaders.get('Date')
1977db96d56Sopenharmony_ci            dateformat = '%a, %d %b %Y %H:%M:%S GMT'
1987db96d56Sopenharmony_ci            try:
1997db96d56Sopenharmony_ci                time.strptime(datevalue, dateformat)
2007db96d56Sopenharmony_ci            except ValueError:
2017db96d56Sopenharmony_ci                self.fail('Date value not in %r format' % dateformat)
2027db96d56Sopenharmony_ci
2037db96d56Sopenharmony_ci    def test_reporthook(self):
2047db96d56Sopenharmony_ci        records = []
2057db96d56Sopenharmony_ci
2067db96d56Sopenharmony_ci        def recording_reporthook(blocks, block_size, total_size):
2077db96d56Sopenharmony_ci            records.append((blocks, block_size, total_size))
2087db96d56Sopenharmony_ci
2097db96d56Sopenharmony_ci        with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
2107db96d56Sopenharmony_ci                file_location, fileheaders):
2117db96d56Sopenharmony_ci            expected_size = int(fileheaders['Content-Length'])
2127db96d56Sopenharmony_ci
2137db96d56Sopenharmony_ci        records_repr = repr(records)  # For use in error messages.
2147db96d56Sopenharmony_ci        self.assertGreater(len(records), 1, msg="There should always be two "
2157db96d56Sopenharmony_ci                           "calls; the first one before the transfer starts.")
2167db96d56Sopenharmony_ci        self.assertEqual(records[0][0], 0)
2177db96d56Sopenharmony_ci        self.assertGreater(records[0][1], 0,
2187db96d56Sopenharmony_ci                           msg="block size can't be 0 in %s" % records_repr)
2197db96d56Sopenharmony_ci        self.assertEqual(records[0][2], expected_size)
2207db96d56Sopenharmony_ci        self.assertEqual(records[-1][2], expected_size)
2217db96d56Sopenharmony_ci
2227db96d56Sopenharmony_ci        block_sizes = {block_size for _, block_size, _ in records}
2237db96d56Sopenharmony_ci        self.assertEqual({records[0][1]}, block_sizes,
2247db96d56Sopenharmony_ci                         msg="block sizes in %s must be equal" % records_repr)
2257db96d56Sopenharmony_ci        self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
2267db96d56Sopenharmony_ci                                msg="number of blocks * block size must be"
2277db96d56Sopenharmony_ci                                " >= total size in %s" % records_repr)
2287db96d56Sopenharmony_ci
2297db96d56Sopenharmony_ci
2307db96d56Sopenharmony_ciif __name__ == "__main__":
2317db96d56Sopenharmony_ci    unittest.main()
232