Lines Matching refs:self
37 def __init__(self, server_address, RequestHandlerClass):
38 http.server.HTTPServer.__init__(self,
44 self.socket.settimeout(0.1)
46 def get_request(self):
49 request, client_address = self.socket.accept()
61 def __init__(self, request_handler):
62 threading.Thread.__init__(self)
63 self._stop_server = False
64 self.ready = threading.Event()
66 self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
68 self.port = self.httpd.server_port
70 def stop(self):
73 self._stop_server = True
75 self.join()
76 self.httpd.server_close()
78 def run(self):
79 self.ready.set()
80 while not self._stop_server:
81 self.httpd.handle_request()
88 def __init__(self):
89 self._request_num = 0
90 self._nonces = []
91 self._users = {}
92 self._realm_name = "Test Realm"
93 self._qop = "auth"
95 def set_qop(self, qop):
96 self._qop = qop
98 def set_users(self, users):
100 self._users = users
102 def set_realm(self, realm):
103 self._realm_name = realm
105 def _generate_nonce(self):
106 self._request_num += 1
107 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
108 self._nonces.append(nonce)
111 def _create_auth_dict(self, auth_str):
128 def _validate_auth(self, auth_dict, password, method, uri):
146 def _return_auth_challenge(self, request_handler):
153 (self._realm_name, self._qop, self._generate_nonce()))
161 def handle_request(self, request_handler):
170 if len(self._users) == 0:
174 return self._return_auth_challenge(request_handler)
176 auth_dict = self._create_auth_dict(
179 if auth_dict["username"] in self._users:
180 password = self._users[ auth_dict["username"] ]
182 return self._return_auth_challenge(request_handler)
183 if not auth_dict.get("nonce") in self._nonces:
184 return self._return_auth_challenge(request_handler)
186 self._nonces.remove(auth_dict["nonce"])
195 if self._validate_auth(auth_dict,
202 return self._return_auth_challenge(request_handler)
215 def __init__(self, *args, **kwargs):
216 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
218 def log_message(self, format, *args):
222 def do_HEAD(self):
223 self.send_response(200)
224 self.send_header("Content-type", "text/html")
225 self.end_headers()
227 def do_AUTHHEAD(self):
228 self.send_response(401)
229 self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
230 self.send_header("Content-type", "text/html")
231 self.end_headers()
233 def do_GET(self):
234 if not self.headers.get("Authorization", ""):
235 self.do_AUTHHEAD()
236 self.wfile.write(b"No Auth header received")
237 elif self.headers.get(
238 "Authorization", "") == "Basic " + self.ENCODED_AUTH:
239 self.send_response(200)
240 self.end_headers()
241 self.wfile.write(b"It works")
244 self.do_AUTHHEAD()
257 def __init__(self, digest_auth_handler, *args, **kwargs):
260 self.digest_auth_handler = digest_auth_handler
261 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
263 def log_message(self, format, *args):
268 def do_GET(self):
270 self.path, "http")
271 self.short_path = path
272 if self.digest_auth_handler.handle_request(self):
273 self.send_response(200, "OK")
274 self.send_header("Content-Type", "text/html")
275 self.end_headers()
276 self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
278 self.wfile.write(b"Our apologies, but our server is down due to "
289 def setUp(self):
290 super(BasicAuthTests, self).setUp()
294 self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
295 self.addCleanup(self.stop_server)
296 self.server_url = 'http://127.0.0.1:%s' % self.server.port
297 self.server.start()
298 self.server.ready.wait()
300 def stop_server(self):
301 self.server.stop()
302 self.server = None
304 def tearDown(self):
305 super(BasicAuthTests, self).tearDown()
307 def test_basic_auth_success(self):
309 ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
312 self.assertTrue(urllib.request.urlopen(self.server_url))
314 self.fail("Basic auth failed for the url: %s" % self.server_url)
316 def test_basic_auth_httperror(self):
318 ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
320 self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
331 def setUp(self):
332 super(ProxyAuthTests, self).setUp()
337 self.addCleanup(restore_environ, os.environ.copy())
341 self.digest_auth_handler = DigestAuthHandler()
342 self.digest_auth_handler.set_users({self.USER: self.PASSWD})
343 self.digest_auth_handler.set_realm(self.REALM)
346 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
348 self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
349 self.addCleanup(self.stop_server)
350 self.server.start()
351 self.server.ready.wait()
352 proxy_url = "http://127.0.0.1:%d" % self.server.port
354 self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
355 self.opener = urllib.request.build_opener(
356 handler, self.proxy_digest_handler)
358 def stop_server(self):
359 self.server.stop()
360 self.server = None
362 def test_proxy_with_bad_password_raises_httperror(self):
363 self.proxy_digest_handler.add_password(self.REALM, self.URL,
364 self.USER, self.PASSWD+"bad")
365 self.digest_auth_handler.set_qop("auth")
366 self.assertRaises(urllib.error.HTTPError,
367 self.opener.open,
368 self.URL)
370 def test_proxy_with_no_password_raises_httperror(self):
371 self.digest_auth_handler.set_qop("auth")
372 self.assertRaises(urllib.error.HTTPError,
373 self.opener.open,
374 self.URL)
376 def test_proxy_qop_auth_works(self):
377 self.proxy_digest_handler.add_password(self.REALM, self.URL,
378 self.USER, self.PASSWD)
379 self.digest_auth_handler.set_qop("auth")
380 with self.opener.open(self.URL) as result:
384 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
385 self.proxy_digest_handler.add_password(self.REALM, self.URL,
386 self.USER, self.PASSWD)
387 self.digest_auth_handler.set_qop("auth-int")
389 result = self.opener.open(self.URL)
410 def do_GET(self):
411 body = self.send_head()
413 done = self.wfile.write(body)
416 def do_POST(self):
417 content_length = self.headers["Content-Length"]
418 post_data = self.rfile.read(int(content_length))
419 self.do_GET()
420 self.requests.append(post_data)
422 def send_head(self):
423 FakeHTTPRequestHandler.headers_received = self.headers
424 self.requests.append(self.path)
427 self.send_response(response_code)
430 self.send_header(header, value % {'port':self.port})
432 self.send_header("Content-type", "text/plain")
433 self.end_headers()
435 self.end_headers()
437 def log_message(self, *args):
453 def setUp(self):
454 super(TestUrlopen, self).setUp()
457 self.addCleanup(urllib.request.urlcleanup)
463 self.addCleanup(restore_environ, os.environ.copy())
467 def urlopen(self, url, data=None, **kwargs):
480 def stop_server(self):
481 self.server.stop()
482 self.server = None
484 def start_server(self, responses=None):
489 self.server = LoopbackHttpServerThread(handler)
490 self.addCleanup(self.stop_server)
491 self.server.start()
492 self.server.ready.wait()
493 port = self.server.port
497 def start_https_server(self, responses=None, **kwargs):
499 self.skipTest('ssl support required')
504 server = make_https_server(self, handler_class=handler, **kwargs)
508 def test_redirection(self):
516 handler = self.start_server(responses)
517 data = self.urlopen("http://localhost:%s/" % handler.port)
518 self.assertEqual(data, expected_response)
519 self.assertEqual(handler.requests, ["/", "/somewhere_else"])
521 def test_chunked(self):
531 handler = self.start_server(response)
532 data = self.urlopen("http://localhost:%s/" % handler.port)
533 self.assertEqual(data, expected_response)
535 def test_404(self):
537 handler = self.start_server([(404, [], expected_response)])
540 self.urlopen("http://localhost:%s/weeble" % handler.port)
545 self.fail("404 should raise URLError")
547 self.assertEqual(data, expected_response)
548 self.assertEqual(handler.requests, ["/weeble"])
550 def test_200(self):
552 handler = self.start_server([(200, [], expected_response)])
553 data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
554 self.assertEqual(data, expected_response)
555 self.assertEqual(handler.requests, ["/bizarre"])
557 def test_200_with_parameters(self):
559 handler = self.start_server([(200, [], expected_response)])
560 data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
562 self.assertEqual(data, expected_response)
563 self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
565 def test_https(self):
566 handler = self.start_https_server()
568 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
569 self.assertEqual(data, b"we care a bit")
571 def test_https_with_cafile(self):
572 handler = self.start_https_server(certfile=CERT_localhost)
575 data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
577 self.assertEqual(data, b"we care a bit")
579 with self.assertRaises(urllib.error.URLError) as cm:
580 self.urlopen("https://localhost:%s/bizarre" % handler.port,
583 handler = self.start_https_server(certfile=CERT_fakehostname)
584 with self.assertRaises(urllib.error.URLError) as cm:
585 self.urlopen("https://localhost:%s/bizarre" % handler.port,
588 def test_https_with_cadefault(self):
589 handler = self.start_https_server(certfile=CERT_localhost)
592 with self.assertRaises(urllib.error.URLError) as cm:
593 self.urlopen("https://localhost:%s/bizarre" % handler.port,
596 def test_https_sni(self):
598 self.skipTest("ssl module required")
600 self.skipTest("SNI support required in OpenSSL")
607 handler = self.start_https_server(context=context, certfile=CERT_localhost)
609 self.urlopen("https://localhost:%s" % handler.port, context=context)
610 self.assertEqual(sni_name, "localhost")
612 def test_sending_headers(self):
613 handler = self.start_server()
618 self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
620 def test_sending_headers_camel(self):
621 handler = self.start_server()
626 self.assertIn("X-Some-Header", handler.headers_received.keys())
627 self.assertNotIn("X-SoMe-hEader", handler.headers_received.keys())
629 def test_basic(self):
630 handler = self.start_server()
633 self.assertTrue(hasattr(open_url, attr), "object returned from "
635 self.assertTrue(open_url.read(), "calling 'read' failed")
637 def test_info(self):
638 handler = self.start_server()
643 self.assertIsInstance(info_obj, email.message.Message,
646 self.assertEqual(info_obj.get_content_subtype(), "plain")
648 def test_geturl(self):
650 handler = self.start_server()
654 self.assertEqual(url, "http://localhost:%s" % handler.port)
656 def test_iteration(self):
658 handler = self.start_server([(200, [], expected_response)])
661 self.assertEqual(line, expected_response)
663 def test_line_iteration(self):
666 handler = self.start_server([(200, [], expected_response)])
669 self.assertEqual(line, lines[index],
673 self.assertEqual(index + 1, len(lines))
675 def test_issue16464(self):
678 handler = self.start_server([
684 self.assertEqual(None, request.data)
687 self.assertEqual(b"1", request.data)
688 self.assertEqual("1", request.get_header("Content-length"))
691 self.assertEqual(b"1234567890", request.data)
692 self.assertEqual("10", request.get_header("Content-length"))