17db96d56Sopenharmony_ciimport base64 27db96d56Sopenharmony_ciimport os 37db96d56Sopenharmony_ciimport email 47db96d56Sopenharmony_ciimport urllib.parse 57db96d56Sopenharmony_ciimport urllib.request 67db96d56Sopenharmony_ciimport http.server 77db96d56Sopenharmony_ciimport threading 87db96d56Sopenharmony_ciimport unittest 97db96d56Sopenharmony_ciimport hashlib 107db96d56Sopenharmony_ci 117db96d56Sopenharmony_cifrom test import support 127db96d56Sopenharmony_cifrom test.support import hashlib_helper 137db96d56Sopenharmony_cifrom test.support import threading_helper 147db96d56Sopenharmony_cifrom test.support import warnings_helper 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_citry: 177db96d56Sopenharmony_ci import ssl 187db96d56Sopenharmony_ciexcept ImportError: 197db96d56Sopenharmony_ci ssl = None 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_cisupport.requires_working_socket(module=True) 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_cihere = os.path.dirname(__file__) 247db96d56Sopenharmony_ci# Self-signed cert file for 'localhost' 257db96d56Sopenharmony_ciCERT_localhost = os.path.join(here, 'keycert.pem') 267db96d56Sopenharmony_ci# Self-signed cert file for 'fakehostname' 277db96d56Sopenharmony_ciCERT_fakehostname = os.path.join(here, 'keycert2.pem') 287db96d56Sopenharmony_ci 297db96d56Sopenharmony_ci 307db96d56Sopenharmony_ci# Loopback http server infrastructure 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_ciclass LoopbackHttpServer(http.server.HTTPServer): 337db96d56Sopenharmony_ci """HTTP server w/ a few modifications that make it useful for 347db96d56Sopenharmony_ci loopback testing purposes. 357db96d56Sopenharmony_ci """ 367db96d56Sopenharmony_ci 377db96d56Sopenharmony_ci def __init__(self, server_address, RequestHandlerClass): 387db96d56Sopenharmony_ci http.server.HTTPServer.__init__(self, 397db96d56Sopenharmony_ci server_address, 407db96d56Sopenharmony_ci RequestHandlerClass) 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ci # Set the timeout of our listening socket really low so 437db96d56Sopenharmony_ci # that we can stop the server easily. 447db96d56Sopenharmony_ci self.socket.settimeout(0.1) 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_ci def get_request(self): 477db96d56Sopenharmony_ci """HTTPServer method, overridden.""" 487db96d56Sopenharmony_ci 497db96d56Sopenharmony_ci request, client_address = self.socket.accept() 507db96d56Sopenharmony_ci 517db96d56Sopenharmony_ci # It's a loopback connection, so setting the timeout 527db96d56Sopenharmony_ci # really low shouldn't affect anything, but should make 537db96d56Sopenharmony_ci # deadlocks less likely to occur. 547db96d56Sopenharmony_ci request.settimeout(10.0) 557db96d56Sopenharmony_ci 567db96d56Sopenharmony_ci return (request, client_address) 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ciclass LoopbackHttpServerThread(threading.Thread): 597db96d56Sopenharmony_ci """Stoppable thread that runs a loopback http server.""" 607db96d56Sopenharmony_ci 617db96d56Sopenharmony_ci def __init__(self, request_handler): 627db96d56Sopenharmony_ci threading.Thread.__init__(self) 637db96d56Sopenharmony_ci self._stop_server = False 647db96d56Sopenharmony_ci self.ready = threading.Event() 657db96d56Sopenharmony_ci request_handler.protocol_version = "HTTP/1.0" 667db96d56Sopenharmony_ci self.httpd = LoopbackHttpServer(("127.0.0.1", 0), 677db96d56Sopenharmony_ci request_handler) 687db96d56Sopenharmony_ci self.port = self.httpd.server_port 697db96d56Sopenharmony_ci 707db96d56Sopenharmony_ci def stop(self): 717db96d56Sopenharmony_ci """Stops the webserver if it's currently running.""" 727db96d56Sopenharmony_ci 737db96d56Sopenharmony_ci self._stop_server = True 747db96d56Sopenharmony_ci 757db96d56Sopenharmony_ci self.join() 767db96d56Sopenharmony_ci self.httpd.server_close() 777db96d56Sopenharmony_ci 787db96d56Sopenharmony_ci def run(self): 797db96d56Sopenharmony_ci self.ready.set() 807db96d56Sopenharmony_ci while not self._stop_server: 817db96d56Sopenharmony_ci self.httpd.handle_request() 827db96d56Sopenharmony_ci 837db96d56Sopenharmony_ci# Authentication infrastructure 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ciclass DigestAuthHandler: 867db96d56Sopenharmony_ci """Handler for performing digest authentication.""" 877db96d56Sopenharmony_ci 887db96d56Sopenharmony_ci def __init__(self): 897db96d56Sopenharmony_ci self._request_num = 0 907db96d56Sopenharmony_ci self._nonces = [] 917db96d56Sopenharmony_ci self._users = {} 927db96d56Sopenharmony_ci self._realm_name = "Test Realm" 937db96d56Sopenharmony_ci self._qop = "auth" 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci def set_qop(self, qop): 967db96d56Sopenharmony_ci self._qop = qop 977db96d56Sopenharmony_ci 987db96d56Sopenharmony_ci def set_users(self, users): 997db96d56Sopenharmony_ci assert isinstance(users, dict) 1007db96d56Sopenharmony_ci self._users = users 1017db96d56Sopenharmony_ci 1027db96d56Sopenharmony_ci def set_realm(self, realm): 1037db96d56Sopenharmony_ci self._realm_name = realm 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_ci def _generate_nonce(self): 1067db96d56Sopenharmony_ci self._request_num += 1 1077db96d56Sopenharmony_ci nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest() 1087db96d56Sopenharmony_ci self._nonces.append(nonce) 1097db96d56Sopenharmony_ci return nonce 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ci def _create_auth_dict(self, auth_str): 1127db96d56Sopenharmony_ci first_space_index = auth_str.find(" ") 1137db96d56Sopenharmony_ci auth_str = auth_str[first_space_index+1:] 1147db96d56Sopenharmony_ci 1157db96d56Sopenharmony_ci parts = auth_str.split(",") 1167db96d56Sopenharmony_ci 1177db96d56Sopenharmony_ci auth_dict = {} 1187db96d56Sopenharmony_ci for part in parts: 1197db96d56Sopenharmony_ci name, value = part.split("=") 1207db96d56Sopenharmony_ci name = name.strip() 1217db96d56Sopenharmony_ci if value[0] == '"' and value[-1] == '"': 1227db96d56Sopenharmony_ci value = value[1:-1] 1237db96d56Sopenharmony_ci else: 1247db96d56Sopenharmony_ci value = value.strip() 1257db96d56Sopenharmony_ci auth_dict[name] = value 1267db96d56Sopenharmony_ci return auth_dict 1277db96d56Sopenharmony_ci 1287db96d56Sopenharmony_ci def _validate_auth(self, auth_dict, password, method, uri): 1297db96d56Sopenharmony_ci final_dict = {} 1307db96d56Sopenharmony_ci final_dict.update(auth_dict) 1317db96d56Sopenharmony_ci final_dict["password"] = password 1327db96d56Sopenharmony_ci final_dict["method"] = method 1337db96d56Sopenharmony_ci final_dict["uri"] = uri 1347db96d56Sopenharmony_ci HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict 1357db96d56Sopenharmony_ci HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest() 1367db96d56Sopenharmony_ci HA2_str = "%(method)s:%(uri)s" % final_dict 1377db96d56Sopenharmony_ci HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest() 1387db96d56Sopenharmony_ci final_dict["HA1"] = HA1 1397db96d56Sopenharmony_ci final_dict["HA2"] = HA2 1407db96d56Sopenharmony_ci response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ 1417db96d56Sopenharmony_ci "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict 1427db96d56Sopenharmony_ci response = hashlib.md5(response_str.encode("ascii")).hexdigest() 1437db96d56Sopenharmony_ci 1447db96d56Sopenharmony_ci return response == auth_dict["response"] 1457db96d56Sopenharmony_ci 1467db96d56Sopenharmony_ci def _return_auth_challenge(self, request_handler): 1477db96d56Sopenharmony_ci request_handler.send_response(407, "Proxy Authentication Required") 1487db96d56Sopenharmony_ci request_handler.send_header("Content-Type", "text/html") 1497db96d56Sopenharmony_ci request_handler.send_header( 1507db96d56Sopenharmony_ci 'Proxy-Authenticate', 'Digest realm="%s", ' 1517db96d56Sopenharmony_ci 'qop="%s",' 1527db96d56Sopenharmony_ci 'nonce="%s", ' % \ 1537db96d56Sopenharmony_ci (self._realm_name, self._qop, self._generate_nonce())) 1547db96d56Sopenharmony_ci # XXX: Not sure if we're supposed to add this next header or 1557db96d56Sopenharmony_ci # not. 1567db96d56Sopenharmony_ci #request_handler.send_header('Connection', 'close') 1577db96d56Sopenharmony_ci request_handler.end_headers() 1587db96d56Sopenharmony_ci request_handler.wfile.write(b"Proxy Authentication Required.") 1597db96d56Sopenharmony_ci return False 1607db96d56Sopenharmony_ci 1617db96d56Sopenharmony_ci def handle_request(self, request_handler): 1627db96d56Sopenharmony_ci """Performs digest authentication on the given HTTP request 1637db96d56Sopenharmony_ci handler. Returns True if authentication was successful, False 1647db96d56Sopenharmony_ci otherwise. 1657db96d56Sopenharmony_ci 1667db96d56Sopenharmony_ci If no users have been set, then digest auth is effectively 1677db96d56Sopenharmony_ci disabled and this method will always return True. 1687db96d56Sopenharmony_ci """ 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ci if len(self._users) == 0: 1717db96d56Sopenharmony_ci return True 1727db96d56Sopenharmony_ci 1737db96d56Sopenharmony_ci if "Proxy-Authorization" not in request_handler.headers: 1747db96d56Sopenharmony_ci return self._return_auth_challenge(request_handler) 1757db96d56Sopenharmony_ci else: 1767db96d56Sopenharmony_ci auth_dict = self._create_auth_dict( 1777db96d56Sopenharmony_ci request_handler.headers["Proxy-Authorization"] 1787db96d56Sopenharmony_ci ) 1797db96d56Sopenharmony_ci if auth_dict["username"] in self._users: 1807db96d56Sopenharmony_ci password = self._users[ auth_dict["username"] ] 1817db96d56Sopenharmony_ci else: 1827db96d56Sopenharmony_ci return self._return_auth_challenge(request_handler) 1837db96d56Sopenharmony_ci if not auth_dict.get("nonce") in self._nonces: 1847db96d56Sopenharmony_ci return self._return_auth_challenge(request_handler) 1857db96d56Sopenharmony_ci else: 1867db96d56Sopenharmony_ci self._nonces.remove(auth_dict["nonce"]) 1877db96d56Sopenharmony_ci 1887db96d56Sopenharmony_ci auth_validated = False 1897db96d56Sopenharmony_ci 1907db96d56Sopenharmony_ci # MSIE uses short_path in its validation, but Python's 1917db96d56Sopenharmony_ci # urllib.request uses the full path, so we're going to see if 1927db96d56Sopenharmony_ci # either of them works here. 1937db96d56Sopenharmony_ci 1947db96d56Sopenharmony_ci for path in [request_handler.path, request_handler.short_path]: 1957db96d56Sopenharmony_ci if self._validate_auth(auth_dict, 1967db96d56Sopenharmony_ci password, 1977db96d56Sopenharmony_ci request_handler.command, 1987db96d56Sopenharmony_ci path): 1997db96d56Sopenharmony_ci auth_validated = True 2007db96d56Sopenharmony_ci 2017db96d56Sopenharmony_ci if not auth_validated: 2027db96d56Sopenharmony_ci return self._return_auth_challenge(request_handler) 2037db96d56Sopenharmony_ci return True 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_ci 2067db96d56Sopenharmony_ciclass BasicAuthHandler(http.server.BaseHTTPRequestHandler): 2077db96d56Sopenharmony_ci """Handler for performing basic authentication.""" 2087db96d56Sopenharmony_ci # Server side values 2097db96d56Sopenharmony_ci USER = 'testUser' 2107db96d56Sopenharmony_ci PASSWD = 'testPass' 2117db96d56Sopenharmony_ci REALM = 'Test' 2127db96d56Sopenharmony_ci USER_PASSWD = "%s:%s" % (USER, PASSWD) 2137db96d56Sopenharmony_ci ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii') 2147db96d56Sopenharmony_ci 2157db96d56Sopenharmony_ci def __init__(self, *args, **kwargs): 2167db96d56Sopenharmony_ci http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 2177db96d56Sopenharmony_ci 2187db96d56Sopenharmony_ci def log_message(self, format, *args): 2197db96d56Sopenharmony_ci # Suppress console log message 2207db96d56Sopenharmony_ci pass 2217db96d56Sopenharmony_ci 2227db96d56Sopenharmony_ci def do_HEAD(self): 2237db96d56Sopenharmony_ci self.send_response(200) 2247db96d56Sopenharmony_ci self.send_header("Content-type", "text/html") 2257db96d56Sopenharmony_ci self.end_headers() 2267db96d56Sopenharmony_ci 2277db96d56Sopenharmony_ci def do_AUTHHEAD(self): 2287db96d56Sopenharmony_ci self.send_response(401) 2297db96d56Sopenharmony_ci self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM) 2307db96d56Sopenharmony_ci self.send_header("Content-type", "text/html") 2317db96d56Sopenharmony_ci self.end_headers() 2327db96d56Sopenharmony_ci 2337db96d56Sopenharmony_ci def do_GET(self): 2347db96d56Sopenharmony_ci if not self.headers.get("Authorization", ""): 2357db96d56Sopenharmony_ci self.do_AUTHHEAD() 2367db96d56Sopenharmony_ci self.wfile.write(b"No Auth header received") 2377db96d56Sopenharmony_ci elif self.headers.get( 2387db96d56Sopenharmony_ci "Authorization", "") == "Basic " + self.ENCODED_AUTH: 2397db96d56Sopenharmony_ci self.send_response(200) 2407db96d56Sopenharmony_ci self.end_headers() 2417db96d56Sopenharmony_ci self.wfile.write(b"It works") 2427db96d56Sopenharmony_ci else: 2437db96d56Sopenharmony_ci # Request Unauthorized 2447db96d56Sopenharmony_ci self.do_AUTHHEAD() 2457db96d56Sopenharmony_ci 2467db96d56Sopenharmony_ci 2477db96d56Sopenharmony_ci 2487db96d56Sopenharmony_ci# Proxy test infrastructure 2497db96d56Sopenharmony_ci 2507db96d56Sopenharmony_ciclass FakeProxyHandler(http.server.BaseHTTPRequestHandler): 2517db96d56Sopenharmony_ci """This is a 'fake proxy' that makes it look like the entire 2527db96d56Sopenharmony_ci internet has gone down due to a sudden zombie invasion. It main 2537db96d56Sopenharmony_ci utility is in providing us with authentication support for 2547db96d56Sopenharmony_ci testing. 2557db96d56Sopenharmony_ci """ 2567db96d56Sopenharmony_ci 2577db96d56Sopenharmony_ci def __init__(self, digest_auth_handler, *args, **kwargs): 2587db96d56Sopenharmony_ci # This has to be set before calling our parent's __init__(), which will 2597db96d56Sopenharmony_ci # try to call do_GET(). 2607db96d56Sopenharmony_ci self.digest_auth_handler = digest_auth_handler 2617db96d56Sopenharmony_ci http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 2627db96d56Sopenharmony_ci 2637db96d56Sopenharmony_ci def log_message(self, format, *args): 2647db96d56Sopenharmony_ci # Uncomment the next line for debugging. 2657db96d56Sopenharmony_ci # sys.stderr.write(format % args) 2667db96d56Sopenharmony_ci pass 2677db96d56Sopenharmony_ci 2687db96d56Sopenharmony_ci def do_GET(self): 2697db96d56Sopenharmony_ci (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse( 2707db96d56Sopenharmony_ci self.path, "http") 2717db96d56Sopenharmony_ci self.short_path = path 2727db96d56Sopenharmony_ci if self.digest_auth_handler.handle_request(self): 2737db96d56Sopenharmony_ci self.send_response(200, "OK") 2747db96d56Sopenharmony_ci self.send_header("Content-Type", "text/html") 2757db96d56Sopenharmony_ci self.end_headers() 2767db96d56Sopenharmony_ci self.wfile.write(bytes("You've reached %s!<BR>" % self.path, 2777db96d56Sopenharmony_ci "ascii")) 2787db96d56Sopenharmony_ci self.wfile.write(b"Our apologies, but our server is down due to " 2797db96d56Sopenharmony_ci b"a sudden zombie invasion.") 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci# Test cases 2827db96d56Sopenharmony_ci 2837db96d56Sopenharmony_ciclass BasicAuthTests(unittest.TestCase): 2847db96d56Sopenharmony_ci USER = "testUser" 2857db96d56Sopenharmony_ci PASSWD = "testPass" 2867db96d56Sopenharmony_ci INCORRECT_PASSWD = "Incorrect" 2877db96d56Sopenharmony_ci REALM = "Test" 2887db96d56Sopenharmony_ci 2897db96d56Sopenharmony_ci def setUp(self): 2907db96d56Sopenharmony_ci super(BasicAuthTests, self).setUp() 2917db96d56Sopenharmony_ci # With Basic Authentication 2927db96d56Sopenharmony_ci def http_server_with_basic_auth_handler(*args, **kwargs): 2937db96d56Sopenharmony_ci return BasicAuthHandler(*args, **kwargs) 2947db96d56Sopenharmony_ci self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler) 2957db96d56Sopenharmony_ci self.addCleanup(self.stop_server) 2967db96d56Sopenharmony_ci self.server_url = 'http://127.0.0.1:%s' % self.server.port 2977db96d56Sopenharmony_ci self.server.start() 2987db96d56Sopenharmony_ci self.server.ready.wait() 2997db96d56Sopenharmony_ci 3007db96d56Sopenharmony_ci def stop_server(self): 3017db96d56Sopenharmony_ci self.server.stop() 3027db96d56Sopenharmony_ci self.server = None 3037db96d56Sopenharmony_ci 3047db96d56Sopenharmony_ci def tearDown(self): 3057db96d56Sopenharmony_ci super(BasicAuthTests, self).tearDown() 3067db96d56Sopenharmony_ci 3077db96d56Sopenharmony_ci def test_basic_auth_success(self): 3087db96d56Sopenharmony_ci ah = urllib.request.HTTPBasicAuthHandler() 3097db96d56Sopenharmony_ci ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD) 3107db96d56Sopenharmony_ci urllib.request.install_opener(urllib.request.build_opener(ah)) 3117db96d56Sopenharmony_ci try: 3127db96d56Sopenharmony_ci self.assertTrue(urllib.request.urlopen(self.server_url)) 3137db96d56Sopenharmony_ci except urllib.error.HTTPError: 3147db96d56Sopenharmony_ci self.fail("Basic auth failed for the url: %s" % self.server_url) 3157db96d56Sopenharmony_ci 3167db96d56Sopenharmony_ci def test_basic_auth_httperror(self): 3177db96d56Sopenharmony_ci ah = urllib.request.HTTPBasicAuthHandler() 3187db96d56Sopenharmony_ci ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD) 3197db96d56Sopenharmony_ci urllib.request.install_opener(urllib.request.build_opener(ah)) 3207db96d56Sopenharmony_ci self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url) 3217db96d56Sopenharmony_ci 3227db96d56Sopenharmony_ci 3237db96d56Sopenharmony_ci@hashlib_helper.requires_hashdigest("md5", openssl=True) 3247db96d56Sopenharmony_ciclass ProxyAuthTests(unittest.TestCase): 3257db96d56Sopenharmony_ci URL = "http://localhost" 3267db96d56Sopenharmony_ci 3277db96d56Sopenharmony_ci USER = "tester" 3287db96d56Sopenharmony_ci PASSWD = "test123" 3297db96d56Sopenharmony_ci REALM = "TestRealm" 3307db96d56Sopenharmony_ci 3317db96d56Sopenharmony_ci def setUp(self): 3327db96d56Sopenharmony_ci super(ProxyAuthTests, self).setUp() 3337db96d56Sopenharmony_ci # Ignore proxy bypass settings in the environment. 3347db96d56Sopenharmony_ci def restore_environ(old_environ): 3357db96d56Sopenharmony_ci os.environ.clear() 3367db96d56Sopenharmony_ci os.environ.update(old_environ) 3377db96d56Sopenharmony_ci self.addCleanup(restore_environ, os.environ.copy()) 3387db96d56Sopenharmony_ci os.environ['NO_PROXY'] = '' 3397db96d56Sopenharmony_ci os.environ['no_proxy'] = '' 3407db96d56Sopenharmony_ci 3417db96d56Sopenharmony_ci self.digest_auth_handler = DigestAuthHandler() 3427db96d56Sopenharmony_ci self.digest_auth_handler.set_users({self.USER: self.PASSWD}) 3437db96d56Sopenharmony_ci self.digest_auth_handler.set_realm(self.REALM) 3447db96d56Sopenharmony_ci # With Digest Authentication. 3457db96d56Sopenharmony_ci def create_fake_proxy_handler(*args, **kwargs): 3467db96d56Sopenharmony_ci return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) 3477db96d56Sopenharmony_ci 3487db96d56Sopenharmony_ci self.server = LoopbackHttpServerThread(create_fake_proxy_handler) 3497db96d56Sopenharmony_ci self.addCleanup(self.stop_server) 3507db96d56Sopenharmony_ci self.server.start() 3517db96d56Sopenharmony_ci self.server.ready.wait() 3527db96d56Sopenharmony_ci proxy_url = "http://127.0.0.1:%d" % self.server.port 3537db96d56Sopenharmony_ci handler = urllib.request.ProxyHandler({"http" : proxy_url}) 3547db96d56Sopenharmony_ci self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler() 3557db96d56Sopenharmony_ci self.opener = urllib.request.build_opener( 3567db96d56Sopenharmony_ci handler, self.proxy_digest_handler) 3577db96d56Sopenharmony_ci 3587db96d56Sopenharmony_ci def stop_server(self): 3597db96d56Sopenharmony_ci self.server.stop() 3607db96d56Sopenharmony_ci self.server = None 3617db96d56Sopenharmony_ci 3627db96d56Sopenharmony_ci def test_proxy_with_bad_password_raises_httperror(self): 3637db96d56Sopenharmony_ci self.proxy_digest_handler.add_password(self.REALM, self.URL, 3647db96d56Sopenharmony_ci self.USER, self.PASSWD+"bad") 3657db96d56Sopenharmony_ci self.digest_auth_handler.set_qop("auth") 3667db96d56Sopenharmony_ci self.assertRaises(urllib.error.HTTPError, 3677db96d56Sopenharmony_ci self.opener.open, 3687db96d56Sopenharmony_ci self.URL) 3697db96d56Sopenharmony_ci 3707db96d56Sopenharmony_ci def test_proxy_with_no_password_raises_httperror(self): 3717db96d56Sopenharmony_ci self.digest_auth_handler.set_qop("auth") 3727db96d56Sopenharmony_ci self.assertRaises(urllib.error.HTTPError, 3737db96d56Sopenharmony_ci self.opener.open, 3747db96d56Sopenharmony_ci self.URL) 3757db96d56Sopenharmony_ci 3767db96d56Sopenharmony_ci def test_proxy_qop_auth_works(self): 3777db96d56Sopenharmony_ci self.proxy_digest_handler.add_password(self.REALM, self.URL, 3787db96d56Sopenharmony_ci self.USER, self.PASSWD) 3797db96d56Sopenharmony_ci self.digest_auth_handler.set_qop("auth") 3807db96d56Sopenharmony_ci with self.opener.open(self.URL) as result: 3817db96d56Sopenharmony_ci while result.read(): 3827db96d56Sopenharmony_ci pass 3837db96d56Sopenharmony_ci 3847db96d56Sopenharmony_ci def test_proxy_qop_auth_int_works_or_throws_urlerror(self): 3857db96d56Sopenharmony_ci self.proxy_digest_handler.add_password(self.REALM, self.URL, 3867db96d56Sopenharmony_ci self.USER, self.PASSWD) 3877db96d56Sopenharmony_ci self.digest_auth_handler.set_qop("auth-int") 3887db96d56Sopenharmony_ci try: 3897db96d56Sopenharmony_ci result = self.opener.open(self.URL) 3907db96d56Sopenharmony_ci except urllib.error.URLError: 3917db96d56Sopenharmony_ci # It's okay if we don't support auth-int, but we certainly 3927db96d56Sopenharmony_ci # shouldn't receive any kind of exception here other than 3937db96d56Sopenharmony_ci # a URLError. 3947db96d56Sopenharmony_ci pass 3957db96d56Sopenharmony_ci else: 3967db96d56Sopenharmony_ci with result: 3977db96d56Sopenharmony_ci while result.read(): 3987db96d56Sopenharmony_ci pass 3997db96d56Sopenharmony_ci 4007db96d56Sopenharmony_ci 4017db96d56Sopenharmony_cidef GetRequestHandler(responses): 4027db96d56Sopenharmony_ci 4037db96d56Sopenharmony_ci class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler): 4047db96d56Sopenharmony_ci 4057db96d56Sopenharmony_ci server_version = "TestHTTP/" 4067db96d56Sopenharmony_ci requests = [] 4077db96d56Sopenharmony_ci headers_received = [] 4087db96d56Sopenharmony_ci port = 80 4097db96d56Sopenharmony_ci 4107db96d56Sopenharmony_ci def do_GET(self): 4117db96d56Sopenharmony_ci body = self.send_head() 4127db96d56Sopenharmony_ci while body: 4137db96d56Sopenharmony_ci done = self.wfile.write(body) 4147db96d56Sopenharmony_ci body = body[done:] 4157db96d56Sopenharmony_ci 4167db96d56Sopenharmony_ci def do_POST(self): 4177db96d56Sopenharmony_ci content_length = self.headers["Content-Length"] 4187db96d56Sopenharmony_ci post_data = self.rfile.read(int(content_length)) 4197db96d56Sopenharmony_ci self.do_GET() 4207db96d56Sopenharmony_ci self.requests.append(post_data) 4217db96d56Sopenharmony_ci 4227db96d56Sopenharmony_ci def send_head(self): 4237db96d56Sopenharmony_ci FakeHTTPRequestHandler.headers_received = self.headers 4247db96d56Sopenharmony_ci self.requests.append(self.path) 4257db96d56Sopenharmony_ci response_code, headers, body = responses.pop(0) 4267db96d56Sopenharmony_ci 4277db96d56Sopenharmony_ci self.send_response(response_code) 4287db96d56Sopenharmony_ci 4297db96d56Sopenharmony_ci for (header, value) in headers: 4307db96d56Sopenharmony_ci self.send_header(header, value % {'port':self.port}) 4317db96d56Sopenharmony_ci if body: 4327db96d56Sopenharmony_ci self.send_header("Content-type", "text/plain") 4337db96d56Sopenharmony_ci self.end_headers() 4347db96d56Sopenharmony_ci return body 4357db96d56Sopenharmony_ci self.end_headers() 4367db96d56Sopenharmony_ci 4377db96d56Sopenharmony_ci def log_message(self, *args): 4387db96d56Sopenharmony_ci pass 4397db96d56Sopenharmony_ci 4407db96d56Sopenharmony_ci 4417db96d56Sopenharmony_ci return FakeHTTPRequestHandler 4427db96d56Sopenharmony_ci 4437db96d56Sopenharmony_ci 4447db96d56Sopenharmony_ciclass TestUrlopen(unittest.TestCase): 4457db96d56Sopenharmony_ci """Tests urllib.request.urlopen using the network. 4467db96d56Sopenharmony_ci 4477db96d56Sopenharmony_ci These tests are not exhaustive. Assuming that testing using files does a 4487db96d56Sopenharmony_ci good job overall of some of the basic interface features. There are no 4497db96d56Sopenharmony_ci tests exercising the optional 'data' and 'proxies' arguments. No tests 4507db96d56Sopenharmony_ci for transparent redirection have been written. 4517db96d56Sopenharmony_ci """ 4527db96d56Sopenharmony_ci 4537db96d56Sopenharmony_ci def setUp(self): 4547db96d56Sopenharmony_ci super(TestUrlopen, self).setUp() 4557db96d56Sopenharmony_ci 4567db96d56Sopenharmony_ci # clear _opener global variable 4577db96d56Sopenharmony_ci self.addCleanup(urllib.request.urlcleanup) 4587db96d56Sopenharmony_ci 4597db96d56Sopenharmony_ci # Ignore proxies for localhost tests. 4607db96d56Sopenharmony_ci def restore_environ(old_environ): 4617db96d56Sopenharmony_ci os.environ.clear() 4627db96d56Sopenharmony_ci os.environ.update(old_environ) 4637db96d56Sopenharmony_ci self.addCleanup(restore_environ, os.environ.copy()) 4647db96d56Sopenharmony_ci os.environ['NO_PROXY'] = '*' 4657db96d56Sopenharmony_ci os.environ['no_proxy'] = '*' 4667db96d56Sopenharmony_ci 4677db96d56Sopenharmony_ci def urlopen(self, url, data=None, **kwargs): 4687db96d56Sopenharmony_ci l = [] 4697db96d56Sopenharmony_ci f = urllib.request.urlopen(url, data, **kwargs) 4707db96d56Sopenharmony_ci try: 4717db96d56Sopenharmony_ci # Exercise various methods 4727db96d56Sopenharmony_ci l.extend(f.readlines(200)) 4737db96d56Sopenharmony_ci l.append(f.readline()) 4747db96d56Sopenharmony_ci l.append(f.read(1024)) 4757db96d56Sopenharmony_ci l.append(f.read()) 4767db96d56Sopenharmony_ci finally: 4777db96d56Sopenharmony_ci f.close() 4787db96d56Sopenharmony_ci return b"".join(l) 4797db96d56Sopenharmony_ci 4807db96d56Sopenharmony_ci def stop_server(self): 4817db96d56Sopenharmony_ci self.server.stop() 4827db96d56Sopenharmony_ci self.server = None 4837db96d56Sopenharmony_ci 4847db96d56Sopenharmony_ci def start_server(self, responses=None): 4857db96d56Sopenharmony_ci if responses is None: 4867db96d56Sopenharmony_ci responses = [(200, [], b"we don't care")] 4877db96d56Sopenharmony_ci handler = GetRequestHandler(responses) 4887db96d56Sopenharmony_ci 4897db96d56Sopenharmony_ci self.server = LoopbackHttpServerThread(handler) 4907db96d56Sopenharmony_ci self.addCleanup(self.stop_server) 4917db96d56Sopenharmony_ci self.server.start() 4927db96d56Sopenharmony_ci self.server.ready.wait() 4937db96d56Sopenharmony_ci port = self.server.port 4947db96d56Sopenharmony_ci handler.port = port 4957db96d56Sopenharmony_ci return handler 4967db96d56Sopenharmony_ci 4977db96d56Sopenharmony_ci def start_https_server(self, responses=None, **kwargs): 4987db96d56Sopenharmony_ci if not hasattr(urllib.request, 'HTTPSHandler'): 4997db96d56Sopenharmony_ci self.skipTest('ssl support required') 5007db96d56Sopenharmony_ci from test.ssl_servers import make_https_server 5017db96d56Sopenharmony_ci if responses is None: 5027db96d56Sopenharmony_ci responses = [(200, [], b"we care a bit")] 5037db96d56Sopenharmony_ci handler = GetRequestHandler(responses) 5047db96d56Sopenharmony_ci server = make_https_server(self, handler_class=handler, **kwargs) 5057db96d56Sopenharmony_ci handler.port = server.port 5067db96d56Sopenharmony_ci return handler 5077db96d56Sopenharmony_ci 5087db96d56Sopenharmony_ci def test_redirection(self): 5097db96d56Sopenharmony_ci expected_response = b"We got here..." 5107db96d56Sopenharmony_ci responses = [ 5117db96d56Sopenharmony_ci (302, [("Location", "http://localhost:%(port)s/somewhere_else")], 5127db96d56Sopenharmony_ci ""), 5137db96d56Sopenharmony_ci (200, [], expected_response) 5147db96d56Sopenharmony_ci ] 5157db96d56Sopenharmony_ci 5167db96d56Sopenharmony_ci handler = self.start_server(responses) 5177db96d56Sopenharmony_ci data = self.urlopen("http://localhost:%s/" % handler.port) 5187db96d56Sopenharmony_ci self.assertEqual(data, expected_response) 5197db96d56Sopenharmony_ci self.assertEqual(handler.requests, ["/", "/somewhere_else"]) 5207db96d56Sopenharmony_ci 5217db96d56Sopenharmony_ci def test_chunked(self): 5227db96d56Sopenharmony_ci expected_response = b"hello world" 5237db96d56Sopenharmony_ci chunked_start = ( 5247db96d56Sopenharmony_ci b'a\r\n' 5257db96d56Sopenharmony_ci b'hello worl\r\n' 5267db96d56Sopenharmony_ci b'1\r\n' 5277db96d56Sopenharmony_ci b'd\r\n' 5287db96d56Sopenharmony_ci b'0\r\n' 5297db96d56Sopenharmony_ci ) 5307db96d56Sopenharmony_ci response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)] 5317db96d56Sopenharmony_ci handler = self.start_server(response) 5327db96d56Sopenharmony_ci data = self.urlopen("http://localhost:%s/" % handler.port) 5337db96d56Sopenharmony_ci self.assertEqual(data, expected_response) 5347db96d56Sopenharmony_ci 5357db96d56Sopenharmony_ci def test_404(self): 5367db96d56Sopenharmony_ci expected_response = b"Bad bad bad..." 5377db96d56Sopenharmony_ci handler = self.start_server([(404, [], expected_response)]) 5387db96d56Sopenharmony_ci 5397db96d56Sopenharmony_ci try: 5407db96d56Sopenharmony_ci self.urlopen("http://localhost:%s/weeble" % handler.port) 5417db96d56Sopenharmony_ci except urllib.error.URLError as f: 5427db96d56Sopenharmony_ci data = f.read() 5437db96d56Sopenharmony_ci f.close() 5447db96d56Sopenharmony_ci else: 5457db96d56Sopenharmony_ci self.fail("404 should raise URLError") 5467db96d56Sopenharmony_ci 5477db96d56Sopenharmony_ci self.assertEqual(data, expected_response) 5487db96d56Sopenharmony_ci self.assertEqual(handler.requests, ["/weeble"]) 5497db96d56Sopenharmony_ci 5507db96d56Sopenharmony_ci def test_200(self): 5517db96d56Sopenharmony_ci expected_response = b"pycon 2008..." 5527db96d56Sopenharmony_ci handler = self.start_server([(200, [], expected_response)]) 5537db96d56Sopenharmony_ci data = self.urlopen("http://localhost:%s/bizarre" % handler.port) 5547db96d56Sopenharmony_ci self.assertEqual(data, expected_response) 5557db96d56Sopenharmony_ci self.assertEqual(handler.requests, ["/bizarre"]) 5567db96d56Sopenharmony_ci 5577db96d56Sopenharmony_ci def test_200_with_parameters(self): 5587db96d56Sopenharmony_ci expected_response = b"pycon 2008..." 5597db96d56Sopenharmony_ci handler = self.start_server([(200, [], expected_response)]) 5607db96d56Sopenharmony_ci data = self.urlopen("http://localhost:%s/bizarre" % handler.port, 5617db96d56Sopenharmony_ci b"get=with_feeling") 5627db96d56Sopenharmony_ci self.assertEqual(data, expected_response) 5637db96d56Sopenharmony_ci self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"]) 5647db96d56Sopenharmony_ci 5657db96d56Sopenharmony_ci def test_https(self): 5667db96d56Sopenharmony_ci handler = self.start_https_server() 5677db96d56Sopenharmony_ci context = ssl.create_default_context(cafile=CERT_localhost) 5687db96d56Sopenharmony_ci data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context) 5697db96d56Sopenharmony_ci self.assertEqual(data, b"we care a bit") 5707db96d56Sopenharmony_ci 5717db96d56Sopenharmony_ci def test_https_with_cafile(self): 5727db96d56Sopenharmony_ci handler = self.start_https_server(certfile=CERT_localhost) 5737db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 5747db96d56Sopenharmony_ci # Good cert 5757db96d56Sopenharmony_ci data = self.urlopen("https://localhost:%s/bizarre" % handler.port, 5767db96d56Sopenharmony_ci cafile=CERT_localhost) 5777db96d56Sopenharmony_ci self.assertEqual(data, b"we care a bit") 5787db96d56Sopenharmony_ci # Bad cert 5797db96d56Sopenharmony_ci with self.assertRaises(urllib.error.URLError) as cm: 5807db96d56Sopenharmony_ci self.urlopen("https://localhost:%s/bizarre" % handler.port, 5817db96d56Sopenharmony_ci cafile=CERT_fakehostname) 5827db96d56Sopenharmony_ci # Good cert, but mismatching hostname 5837db96d56Sopenharmony_ci handler = self.start_https_server(certfile=CERT_fakehostname) 5847db96d56Sopenharmony_ci with self.assertRaises(urllib.error.URLError) as cm: 5857db96d56Sopenharmony_ci self.urlopen("https://localhost:%s/bizarre" % handler.port, 5867db96d56Sopenharmony_ci cafile=CERT_fakehostname) 5877db96d56Sopenharmony_ci 5887db96d56Sopenharmony_ci def test_https_with_cadefault(self): 5897db96d56Sopenharmony_ci handler = self.start_https_server(certfile=CERT_localhost) 5907db96d56Sopenharmony_ci # Self-signed cert should fail verification with system certificate store 5917db96d56Sopenharmony_ci with warnings_helper.check_warnings(('', DeprecationWarning)): 5927db96d56Sopenharmony_ci with self.assertRaises(urllib.error.URLError) as cm: 5937db96d56Sopenharmony_ci self.urlopen("https://localhost:%s/bizarre" % handler.port, 5947db96d56Sopenharmony_ci cadefault=True) 5957db96d56Sopenharmony_ci 5967db96d56Sopenharmony_ci def test_https_sni(self): 5977db96d56Sopenharmony_ci if ssl is None: 5987db96d56Sopenharmony_ci self.skipTest("ssl module required") 5997db96d56Sopenharmony_ci if not ssl.HAS_SNI: 6007db96d56Sopenharmony_ci self.skipTest("SNI support required in OpenSSL") 6017db96d56Sopenharmony_ci sni_name = None 6027db96d56Sopenharmony_ci def cb_sni(ssl_sock, server_name, initial_context): 6037db96d56Sopenharmony_ci nonlocal sni_name 6047db96d56Sopenharmony_ci sni_name = server_name 6057db96d56Sopenharmony_ci context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 6067db96d56Sopenharmony_ci context.set_servername_callback(cb_sni) 6077db96d56Sopenharmony_ci handler = self.start_https_server(context=context, certfile=CERT_localhost) 6087db96d56Sopenharmony_ci context = ssl.create_default_context(cafile=CERT_localhost) 6097db96d56Sopenharmony_ci self.urlopen("https://localhost:%s" % handler.port, context=context) 6107db96d56Sopenharmony_ci self.assertEqual(sni_name, "localhost") 6117db96d56Sopenharmony_ci 6127db96d56Sopenharmony_ci def test_sending_headers(self): 6137db96d56Sopenharmony_ci handler = self.start_server() 6147db96d56Sopenharmony_ci req = urllib.request.Request("http://localhost:%s/" % handler.port, 6157db96d56Sopenharmony_ci headers={"Range": "bytes=20-39"}) 6167db96d56Sopenharmony_ci with urllib.request.urlopen(req): 6177db96d56Sopenharmony_ci pass 6187db96d56Sopenharmony_ci self.assertEqual(handler.headers_received["Range"], "bytes=20-39") 6197db96d56Sopenharmony_ci 6207db96d56Sopenharmony_ci def test_sending_headers_camel(self): 6217db96d56Sopenharmony_ci handler = self.start_server() 6227db96d56Sopenharmony_ci req = urllib.request.Request("http://localhost:%s/" % handler.port, 6237db96d56Sopenharmony_ci headers={"X-SoMe-hEader": "foobar"}) 6247db96d56Sopenharmony_ci with urllib.request.urlopen(req): 6257db96d56Sopenharmony_ci pass 6267db96d56Sopenharmony_ci self.assertIn("X-Some-Header", handler.headers_received.keys()) 6277db96d56Sopenharmony_ci self.assertNotIn("X-SoMe-hEader", handler.headers_received.keys()) 6287db96d56Sopenharmony_ci 6297db96d56Sopenharmony_ci def test_basic(self): 6307db96d56Sopenharmony_ci handler = self.start_server() 6317db96d56Sopenharmony_ci with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url: 6327db96d56Sopenharmony_ci for attr in ("read", "close", "info", "geturl"): 6337db96d56Sopenharmony_ci self.assertTrue(hasattr(open_url, attr), "object returned from " 6347db96d56Sopenharmony_ci "urlopen lacks the %s attribute" % attr) 6357db96d56Sopenharmony_ci self.assertTrue(open_url.read(), "calling 'read' failed") 6367db96d56Sopenharmony_ci 6377db96d56Sopenharmony_ci def test_info(self): 6387db96d56Sopenharmony_ci handler = self.start_server() 6397db96d56Sopenharmony_ci open_url = urllib.request.urlopen( 6407db96d56Sopenharmony_ci "http://localhost:%s" % handler.port) 6417db96d56Sopenharmony_ci with open_url: 6427db96d56Sopenharmony_ci info_obj = open_url.info() 6437db96d56Sopenharmony_ci self.assertIsInstance(info_obj, email.message.Message, 6447db96d56Sopenharmony_ci "object returned by 'info' is not an " 6457db96d56Sopenharmony_ci "instance of email.message.Message") 6467db96d56Sopenharmony_ci self.assertEqual(info_obj.get_content_subtype(), "plain") 6477db96d56Sopenharmony_ci 6487db96d56Sopenharmony_ci def test_geturl(self): 6497db96d56Sopenharmony_ci # Make sure same URL as opened is returned by geturl. 6507db96d56Sopenharmony_ci handler = self.start_server() 6517db96d56Sopenharmony_ci open_url = urllib.request.urlopen("http://localhost:%s" % handler.port) 6527db96d56Sopenharmony_ci with open_url: 6537db96d56Sopenharmony_ci url = open_url.geturl() 6547db96d56Sopenharmony_ci self.assertEqual(url, "http://localhost:%s" % handler.port) 6557db96d56Sopenharmony_ci 6567db96d56Sopenharmony_ci def test_iteration(self): 6577db96d56Sopenharmony_ci expected_response = b"pycon 2008..." 6587db96d56Sopenharmony_ci handler = self.start_server([(200, [], expected_response)]) 6597db96d56Sopenharmony_ci data = urllib.request.urlopen("http://localhost:%s" % handler.port) 6607db96d56Sopenharmony_ci for line in data: 6617db96d56Sopenharmony_ci self.assertEqual(line, expected_response) 6627db96d56Sopenharmony_ci 6637db96d56Sopenharmony_ci def test_line_iteration(self): 6647db96d56Sopenharmony_ci lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"] 6657db96d56Sopenharmony_ci expected_response = b"".join(lines) 6667db96d56Sopenharmony_ci handler = self.start_server([(200, [], expected_response)]) 6677db96d56Sopenharmony_ci data = urllib.request.urlopen("http://localhost:%s" % handler.port) 6687db96d56Sopenharmony_ci for index, line in enumerate(data): 6697db96d56Sopenharmony_ci self.assertEqual(line, lines[index], 6707db96d56Sopenharmony_ci "Fetched line number %s doesn't match expected:\n" 6717db96d56Sopenharmony_ci " Expected length was %s, got %s" % 6727db96d56Sopenharmony_ci (index, len(lines[index]), len(line))) 6737db96d56Sopenharmony_ci self.assertEqual(index + 1, len(lines)) 6747db96d56Sopenharmony_ci 6757db96d56Sopenharmony_ci def test_issue16464(self): 6767db96d56Sopenharmony_ci # See https://bugs.python.org/issue16464 6777db96d56Sopenharmony_ci # and https://bugs.python.org/issue46648 6787db96d56Sopenharmony_ci handler = self.start_server([ 6797db96d56Sopenharmony_ci (200, [], b'any'), 6807db96d56Sopenharmony_ci (200, [], b'any'), 6817db96d56Sopenharmony_ci ]) 6827db96d56Sopenharmony_ci opener = urllib.request.build_opener() 6837db96d56Sopenharmony_ci request = urllib.request.Request("http://localhost:%s" % handler.port) 6847db96d56Sopenharmony_ci self.assertEqual(None, request.data) 6857db96d56Sopenharmony_ci 6867db96d56Sopenharmony_ci opener.open(request, "1".encode("us-ascii")) 6877db96d56Sopenharmony_ci self.assertEqual(b"1", request.data) 6887db96d56Sopenharmony_ci self.assertEqual("1", request.get_header("Content-length")) 6897db96d56Sopenharmony_ci 6907db96d56Sopenharmony_ci opener.open(request, "1234567890".encode("us-ascii")) 6917db96d56Sopenharmony_ci self.assertEqual(b"1234567890", request.data) 6927db96d56Sopenharmony_ci self.assertEqual("10", request.get_header("Content-length")) 6937db96d56Sopenharmony_ci 6947db96d56Sopenharmony_cidef setUpModule(): 6957db96d56Sopenharmony_ci thread_info = threading_helper.threading_setup() 6967db96d56Sopenharmony_ci unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 6977db96d56Sopenharmony_ci 6987db96d56Sopenharmony_ci 6997db96d56Sopenharmony_ciif __name__ == "__main__": 7007db96d56Sopenharmony_ci unittest.main() 701