Lines Matching refs:self

79     def setUp(self):
81 self.port_seed = 0
82 self.test_files = []
84 def tearDown(self):
88 for fn in self.test_files:
93 self.test_files[:] = []
95 def pickaddr(self, proto):
103 self.test_files.append(fn)
106 def make_server(self, addr, svrcls, hdlrbase):
108 def handle_error(self, request, client_address):
109 self.close_request(request)
113 def handle(self):
114 line = self.rfile.readline()
115 self.wfile.write(line)
122 self.skipTest('Cannot create server (%s, %s): %s' %
124 self.assertEqual(server.server_address, server.socket.getsockname())
128 def run_server(self, svrcls, hdlrbase, testfunc):
129 server = self.make_server(self.pickaddr(svrcls.address_family),
155 self.assertEqual(-1, server.socket.fileno())
159 self.assertFalse(server.active_children)
162 def stream_examine(self, proto, addr):
170 self.assertEqual(buf, TEST_STR)
172 def dgram_examine(self, proto, addr):
175 s.bind(self.pickaddr(proto))
181 self.assertEqual(buf, TEST_STR)
183 def test_TCPServer(self):
184 self.run_server(socketserver.TCPServer,
186 self.stream_examine)
188 def test_ThreadingTCPServer(self):
189 self.run_server(socketserver.ThreadingTCPServer,
191 self.stream_examine)
194 def test_ForkingTCPServer(self):
195 with simple_subprocess(self):
196 self.run_server(socketserver.ForkingTCPServer,
198 self.stream_examine)
201 def test_UnixStreamServer(self):
202 self.run_server(socketserver.UnixStreamServer,
204 self.stream_examine)
207 def test_ThreadingUnixStreamServer(self):
208 self.run_server(socketserver.ThreadingUnixStreamServer,
210 self.stream_examine)
214 def test_ForkingUnixStreamServer(self):
215 with simple_subprocess(self):
216 self.run_server(ForkingUnixStreamServer,
218 self.stream_examine)
220 def test_UDPServer(self):
221 self.run_server(socketserver.UDPServer,
223 self.dgram_examine)
225 def test_ThreadingUDPServer(self):
226 self.run_server(socketserver.ThreadingUDPServer,
228 self.dgram_examine)
231 def test_ForkingUDPServer(self):
232 with simple_subprocess(self):
233 self.run_server(socketserver.ForkingUDPServer,
235 self.dgram_examine)
238 def test_UnixDatagramServer(self):
239 self.run_server(socketserver.UnixDatagramServer,
241 self.dgram_examine)
244 def test_ThreadingUnixDatagramServer(self):
245 self.run_server(socketserver.ThreadingUnixDatagramServer,
247 self.dgram_examine)
251 def test_ForkingUnixDatagramServer(self):
252 self.run_server(ForkingUnixDatagramServer,
254 self.dgram_examine)
257 def test_shutdown(self):
282 def test_close_immediately(self):
289 def test_tcpserver_bind_leak(self):
295 with self.assertRaises(OverflowError):
299 def test_context_manager(self):
303 self.assertEqual(-1, server.socket.fileno())
311 def tearDown(self):
314 def test_sync_handled(self):
316 self.check_result(handled=True)
318 def test_sync_not_handled(self):
319 with self.assertRaises(SystemExit):
321 self.check_result(handled=False)
323 def test_threading_handled(self):
325 self.check_result(handled=True)
327 def test_threading_not_handled(self):
330 self.check_result(handled=False)
332 self.assertIs(cm.exc_type, SystemExit)
335 def test_forking_handled(self):
337 self.check_result(handled=True)
340 def test_forking_not_handled(self):
342 self.check_result(handled=False)
344 def check_result(self, handled):
347 self.assertEqual(log.read(), expected)
351 def __init__(self, exception):
352 self.exception = exception
354 with socket.create_connection(self.server_address):
357 self.handle_request()
359 self.server_close()
360 self.wait_done()
362 def handle_error(self, request, client_address):
366 def wait_done(self):
371 def handle(self):
374 raise self.server.exception('Test error')
379 def __init__(self, *pos, **kw):
380 self.done = threading.Event()
383 def shutdown_request(self, *pos, **kw):
385 self.done.set()
387 def wait_done(self):
388 self.done.wait()
397 def test_basics(self):
399 def handle(self):
400 self.server.wfile = self.wfile
401 self.server.wfile_fileno = self.wfile.fileno()
402 self.server.request_fileno = self.request.fileno()
405 self.addCleanup(server.server_close)
411 self.assertIsInstance(server.wfile, io.BufferedIOBase)
412 self.assertEqual(server.wfile_fileno, server.request_fileno)
414 def test_write(self):
420 def handle(self):
421 self.server.sent1 = self.wfile.write(b'write data\n')
423 self.server.received = self.rfile.readline()
425 self.server.sent2 = self.wfile.write(big_chunk)
428 self.addCleanup(server.server_close)
435 self.addCleanup(signal.signal, signal.SIGUSR1, original)
466 self.assertEqual(server.sent1, len(response1))
467 self.assertEqual(response1, b'write data\n')
468 self.assertEqual(server.received, b'client response\n')
469 self.assertEqual(server.sent2, test.support.SOCK_MAX_SIZE)
470 self.assertEqual(received2, test.support.SOCK_MAX_SIZE - 100)
475 def test_all(self):
483 self.assertCountEqual(socketserver.__all__, expected)
485 def test_shutdown_request_called_if_verify_request_false(self):
490 def verify_request(self, request, client_address):
494 def shutdown_request(self, request):
495 self.shutdown_called += 1
496 socketserver.TCPServer.shutdown_request(self, request)
503 self.assertEqual(server.shutdown_called, 1)
506 def test_threads_reaped(self):
519 self.assertLess(len(server._threads), 10)