Lines Matching refs:self
30 def __init__(self, loop=None, close_after=0):
31 self.transport = None
32 self.state = 'INITIAL'
33 self.nbytes = 0
35 self.connected = loop.create_future()
36 self.done = loop.create_future()
37 self.data = bytearray()
38 self.close_after = close_after
40 def _assert_state(self, *expected):
41 if self.state not in expected:
42 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
44 def connection_made(self, transport):
45 self.transport = transport
46 self._assert_state('INITIAL')
47 self.state = 'CONNECTED'
48 if self.connected:
49 self.connected.set_result(None)
51 def eof_received(self):
52 self._assert_state('CONNECTED')
53 self.state = 'EOF'
55 def connection_lost(self, exc):
56 self._assert_state('CONNECTED', 'EOF')
57 self.state = 'CLOSED'
58 if self.done:
59 self.done.set_result(None)
61 def data_received(self, data):
62 self._assert_state('CONNECTED')
63 self.nbytes += len(data)
64 self.data.extend(data)
66 if self.close_after and self.nbytes >= self.close_after:
67 self.transport.close()
72 def __init__(self, loop):
73 self.started = False
74 self.closed = False
75 self.data = bytearray()
76 self.fut = loop.create_future()
77 self.transport = None
79 def connection_made(self, transport):
80 self.started = True
81 self.transport = transport
83 def data_received(self, data):
84 self.data.extend(data)
86 def connection_lost(self, exc):
87 self.closed = True
88 self.fut.set_result(None)
90 async def wait_closed(self):
91 await self.fut
106 def create_event_loop(self):
120 def setUp(self):
121 self.file = open(os_helper.TESTFN, 'rb')
122 self.addCleanup(self.file.close)
123 self.loop = self.create_event_loop()
124 self.set_event_loop(self.loop)
127 def tearDown(self):
129 if not self.loop.is_closed():
130 test_utils.run_briefly(self.loop)
132 self.doCleanups()
136 def run_loop(self, coro):
137 return self.loop.run_until_complete(coro)
153 def make_socket(self, cleanup=True):
157 self.addCleanup(sock.close)
160 def reduce_receive_buffer_size(self, sock):
163 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.BUF_SIZE)
165 def reduce_send_buffer_size(self, sock, transport=None):
170 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.BUF_SIZE)
173 transport.set_write_buffer_limits(high=self.BUF_SIZE)
175 def prepare_socksendfile(self):
176 proto = MyProto(self.loop)
178 srv_sock = self.make_socket(cleanup=False)
180 server = self.run_loop(self.loop.create_server(
182 self.reduce_receive_buffer_size(srv_sock)
184 sock = self.make_socket()
185 self.run_loop(self.loop.sock_connect(sock, ('127.0.0.1', port)))
186 self.reduce_send_buffer_size(sock)
193 self.run_loop(proto.wait_closed())
196 self.run_loop(server.wait_closed())
198 self.addCleanup(cleanup)
202 def test_sock_sendfile_success(self):
203 sock, proto = self.prepare_socksendfile()
204 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
206 self.run_loop(proto.wait_closed())
208 self.assertEqual(ret, len(self.DATA))
209 self.assertEqual(proto.data, self.DATA)
210 self.assertEqual(self.file.tell(), len(self.DATA))
212 def test_sock_sendfile_with_offset_and_count(self):
213 sock, proto = self.prepare_socksendfile()
214 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
217 self.run_loop(proto.wait_closed())
219 self.assertEqual(proto.data, self.DATA[1000:3000])
220 self.assertEqual(self.file.tell(), 3000)
221 self.assertEqual(ret, 2000)
223 def test_sock_sendfile_zero_size(self):
224 sock, proto = self.prepare_socksendfile()
226 ret = self.run_loop(self.loop.sock_sendfile(sock, f,
229 self.run_loop(proto.wait_closed())
231 self.assertEqual(ret, 0)
232 self.assertEqual(self.file.tell(), 0)
234 def test_sock_sendfile_mix_with_regular_send(self):
236 sock, proto = self.prepare_socksendfile()
237 self.run_loop(self.loop.sock_sendall(sock, buf))
238 ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
239 self.run_loop(self.loop.sock_sendall(sock, buf))
241 self.run_loop(proto.wait_closed())
243 self.assertEqual(ret, len(self.DATA))
244 expected = buf + self.DATA + buf
245 self.assertEqual(proto.data, expected)
246 self.assertEqual(self.file.tell(), len(self.DATA))
253 def prepare_sendfile(self, *, is_ssl=False, close_after=0):
255 srv_proto = MySendfileProto(loop=self.loop,
259 self.skipTest("No ssl module")
267 server = self.run_loop(self.loop.create_server(
269 self.reduce_receive_buffer_size(srv_sock)
278 cli_proto = MySendfileProto(loop=self.loop)
279 tr, pr = self.run_loop(self.loop.create_connection(
282 self.reduce_send_buffer_size(cli_sock, transport=tr)
287 self.run_loop(srv_proto.done)
288 self.run_loop(cli_proto.done)
291 self.run_loop(server.wait_closed())
293 self.addCleanup(cleanup)
297 def test_sendfile_not_supported(self):
298 tr, pr = self.run_loop(
299 self.loop.create_datagram_endpoint(
303 with self.assertRaisesRegex(RuntimeError, "not supported"):
304 self.run_loop(
305 self.loop.sendfile(tr, self.file))
306 self.assertEqual(0, self.file.tell())
308 # don't use self.addCleanup because it produces resource warning
311 def test_sendfile(self):
312 srv_proto, cli_proto = self.prepare_sendfile()
313 ret = self.run_loop(
314 self.loop.sendfile(cli_proto.transport, self.file))
316 self.run_loop(srv_proto.done)
317 self.assertEqual(ret, len(self.DATA))
318 self.assertEqual(srv_proto.nbytes, len(self.DATA))
319 self.assertEqual(srv_proto.data, self.DATA)
320 self.assertEqual(self.file.tell(), len(self.DATA))
322 def test_sendfile_force_fallback(self):
323 srv_proto, cli_proto = self.prepare_sendfile()
328 self.loop, transp, file, offset, count)
330 self.loop._sendfile_native = sendfile_native
332 ret = self.run_loop(
333 self.loop.sendfile(cli_proto.transport, self.file))
335 self.run_loop(srv_proto.done)
336 self.assertEqual(ret, len(self.DATA))
337 self.assertEqual(srv_proto.nbytes, len(self.DATA))
338 self.assertEqual(srv_proto.data, self.DATA)
339 self.assertEqual(self.file.tell(), len(self.DATA))
341 def test_sendfile_force_unsupported_native(self):
343 if isinstance(self.loop, asyncio.ProactorEventLoop):
344 self.skipTest("Fails on proactor event loop")
345 srv_proto, cli_proto = self.prepare_sendfile()
350 self.loop, transp, file, offset, count)
352 self.loop._sendfile_native = sendfile_native
354 with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
356 self.run_loop(
357 self.loop.sendfile(cli_proto.transport, self.file,
361 self.run_loop(srv_proto.done)
362 self.assertEqual(srv_proto.nbytes, 0)
363 self.assertEqual(self.file.tell(), 0)
365 def test_sendfile_ssl(self):
366 srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True)
367 ret = self.run_loop(
368 self.loop.sendfile(cli_proto.transport, self.file))
370 self.run_loop(srv_proto.done)
371 self.assertEqual(ret, len(self.DATA))
372 self.assertEqual(srv_proto.nbytes, len(self.DATA))
373 self.assertEqual(srv_proto.data, self.DATA)
374 self.assertEqual(self.file.tell(), len(self.DATA))
376 def test_sendfile_for_closing_transp(self):
377 srv_proto, cli_proto = self.prepare_sendfile()
379 with self.assertRaisesRegex(RuntimeError, "is closing"):
380 self.run_loop(self.loop.sendfile(cli_proto.transport, self.file))
381 self.run_loop(srv_proto.done)
382 self.assertEqual(srv_proto.nbytes, 0)
383 self.assertEqual(self.file.tell(), 0)
385 def test_sendfile_pre_and_post_data(self):
386 srv_proto, cli_proto = self.prepare_sendfile()
390 ret = self.run_loop(
391 self.loop.sendfile(cli_proto.transport, self.file))
394 self.run_loop(srv_proto.done)
395 self.assertEqual(ret, len(self.DATA))
396 self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX)
397 self.assertEqual(self.file.tell(), len(self.DATA))
399 def test_sendfile_ssl_pre_and_post_data(self):
400 srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True)
404 ret = self.run_loop(
405 self.loop.sendfile(cli_proto.transport, self.file))
408 self.run_loop(srv_proto.done)
409 self.assertEqual(ret, len(self.DATA))
410 self.assertEqual(srv_proto.data, PREFIX + self.DATA + SUFFIX)
411 self.assertEqual(self.file.tell(), len(self.DATA))
413 def test_sendfile_partial(self):
414 srv_proto, cli_proto = self.prepare_sendfile()
415 ret = self.run_loop(
416 self.loop.sendfile(cli_proto.transport, self.file, 1000, 100))
418 self.run_loop(srv_proto.done)
419 self.assertEqual(ret, 100)
420 self.assertEqual(srv_proto.nbytes, 100)
421 self.assertEqual(srv_proto.data, self.DATA[1000:1100])
422 self.assertEqual(self.file.tell(), 1100)
424 def test_sendfile_ssl_partial(self):
425 srv_proto, cli_proto = self.prepare_sendfile(is_ssl=True)
426 ret = self.run_loop(
427 self.loop.sendfile(cli_proto.transport, self.file, 1000, 100))
429 self.run_loop(srv_proto.done)
430 self.assertEqual(ret, 100)
431 self.assertEqual(srv_proto.nbytes, 100)
432 self.assertEqual(srv_proto.data, self.DATA[1000:1100])
433 self.assertEqual(self.file.tell(), 1100)
435 def test_sendfile_close_peer_after_receiving(self):
436 srv_proto, cli_proto = self.prepare_sendfile(
437 close_after=len(self.DATA))
438 ret = self.run_loop(
439 self.loop.sendfile(cli_proto.transport, self.file))
441 self.run_loop(srv_proto.done)
442 self.assertEqual(ret, len(self.DATA))
443 self.assertEqual(srv_proto.nbytes, len(self.DATA))
444 self.assertEqual(srv_proto.data, self.DATA)
445 self.assertEqual(self.file.tell(), len(self.DATA))
447 def test_sendfile_ssl_close_peer_after_receiving(self):
448 srv_proto, cli_proto = self.prepare_sendfile(
449 is_ssl=True, close_after=len(self.DATA))
450 ret = self.run_loop(
451 self.loop.sendfile(cli_proto.transport, self.file))
452 self.run_loop(srv_proto.done)
453 self.assertEqual(ret, len(self.DATA))
454 self.assertEqual(srv_proto.nbytes, len(self.DATA))
455 self.assertEqual(srv_proto.data, self.DATA)
456 self.assertEqual(self.file.tell(), len(self.DATA))
464 def test_sendfile_close_peer_in_the_middle_of_receiving(self):
465 srv_proto, cli_proto = self.prepare_sendfile(close_after=1024)
466 with self.assertRaises(ConnectionError):
467 self.run_loop(
468 self.loop.sendfile(cli_proto.transport, self.file))
469 self.run_loop(srv_proto.done)
471 self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA),
473 self.assertTrue(1024 <= self.file.tell() < len(self.DATA),
474 self.file.tell())
475 self.assertTrue(cli_proto.transport.is_closing())
477 def test_sendfile_fallback_close_peer_in_the_middle_of_receiving(self):
482 self.loop, transp, file, offset, count)
484 self.loop._sendfile_native = sendfile_native
486 srv_proto, cli_proto = self.prepare_sendfile(close_after=1024)
487 with self.assertRaises(ConnectionError):
489 self.run_loop(
490 self.loop.sendfile(cli_proto.transport, self.file))
499 self.run_loop(srv_proto.done)
501 self.assertTrue(1024 <= srv_proto.nbytes < len(self.DATA),
503 self.assertTrue(1024 <= self.file.tell() < len(self.DATA),
504 self.file.tell())
508 def test_sendfile_prevents_bare_write(self):
509 srv_proto, cli_proto = self.prepare_sendfile()
510 fut = self.loop.create_future()
514 return await self.loop.sendfile(cli_proto.transport, self.file)
516 t = self.loop.create_task(coro())
517 self.run_loop(fut)
518 with self.assertRaisesRegex(RuntimeError,
521 ret = self.run_loop(t)
522 self.assertEqual(ret, len(self.DATA))
524 def test_sendfile_no_fallback_for_fallback_transport(self):
528 with self.assertRaisesRegex(RuntimeError, 'fallback is disabled'):
529 self.loop.run_until_complete(
530 self.loop.sendfile(transport, None, fallback=False))
542 def create_event_loop(self):
548 def create_event_loop(self):
558 def create_event_loop(self):
566 def create_event_loop(self):
573 def create_event_loop(self):
580 def create_event_loop(self):