1import asyncio 2import unittest 3import time 4 5 6def tearDownModule(): 7 asyncio.set_event_loop_policy(None) 8 9 10# The following value can be used as a very small timeout: 11# it passes check "timeout > 0", but has almost 12# no effect on the test performance 13_EPSILON = 0.0001 14 15 16class SlowTask: 17 """ Task will run for this defined time, ignoring cancel requests """ 18 TASK_TIMEOUT = 0.2 19 20 def __init__(self): 21 self.exited = False 22 23 async def run(self): 24 exitat = time.monotonic() + self.TASK_TIMEOUT 25 26 while True: 27 tosleep = exitat - time.monotonic() 28 if tosleep <= 0: 29 break 30 31 try: 32 await asyncio.sleep(tosleep) 33 except asyncio.CancelledError: 34 pass 35 36 self.exited = True 37 38 39class AsyncioWaitForTest(unittest.IsolatedAsyncioTestCase): 40 41 async def test_asyncio_wait_for_cancelled(self): 42 t = SlowTask() 43 44 waitfortask = asyncio.create_task( 45 asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2)) 46 await asyncio.sleep(0) 47 waitfortask.cancel() 48 await asyncio.wait({waitfortask}) 49 50 self.assertTrue(t.exited) 51 52 async def test_asyncio_wait_for_timeout(self): 53 t = SlowTask() 54 55 try: 56 await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2) 57 except asyncio.TimeoutError: 58 pass 59 60 self.assertTrue(t.exited) 61 62 async def test_wait_for_timeout_less_then_0_or_0_future_done(self): 63 loop = asyncio.get_running_loop() 64 65 fut = loop.create_future() 66 fut.set_result('done') 67 68 t0 = loop.time() 69 ret = await asyncio.wait_for(fut, 0) 70 t1 = loop.time() 71 72 self.assertEqual(ret, 'done') 73 self.assertTrue(fut.done()) 74 self.assertLess(t1 - t0, 0.1) 75 76 async def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self): 77 loop = asyncio.get_running_loop() 78 79 foo_started = False 80 81 async def foo(): 82 nonlocal foo_started 83 foo_started = True 84 85 with self.assertRaises(asyncio.TimeoutError): 86 t0 = loop.time() 87 await asyncio.wait_for(foo(), 0) 88 t1 = loop.time() 89 90 self.assertEqual(foo_started, False) 91 self.assertLess(t1 - t0, 0.1) 92 93 async def test_wait_for_timeout_less_then_0_or_0(self): 94 loop = asyncio.get_running_loop() 95 96 for timeout in [0, -1]: 97 with self.subTest(timeout=timeout): 98 foo_running = None 99 started = loop.create_future() 100 101 async def foo(): 102 nonlocal foo_running 103 foo_running = True 104 started.set_result(None) 105 try: 106 await asyncio.sleep(10) 107 finally: 108 foo_running = False 109 return 'done' 110 111 fut = asyncio.create_task(foo()) 112 await started 113 114 with self.assertRaises(asyncio.TimeoutError): 115 t0 = loop.time() 116 await asyncio.wait_for(fut, timeout) 117 t1 = loop.time() 118 119 self.assertTrue(fut.done()) 120 # it should have been cancelled due to the timeout 121 self.assertTrue(fut.cancelled()) 122 self.assertEqual(foo_running, False) 123 self.assertLess(t1 - t0, 0.1) 124 125 async def test_wait_for(self): 126 loop = asyncio.get_running_loop() 127 foo_running = None 128 129 async def foo(): 130 nonlocal foo_running 131 foo_running = True 132 try: 133 await asyncio.sleep(10) 134 finally: 135 foo_running = False 136 return 'done' 137 138 fut = asyncio.create_task(foo()) 139 140 with self.assertRaises(asyncio.TimeoutError): 141 t0 = loop.time() 142 await asyncio.wait_for(fut, 0.1) 143 t1 = loop.time() 144 self.assertTrue(fut.done()) 145 # it should have been cancelled due to the timeout 146 self.assertTrue(fut.cancelled()) 147 self.assertLess(t1 - t0, 0.5) 148 self.assertEqual(foo_running, False) 149 150 async def test_wait_for_blocking(self): 151 async def coro(): 152 return 'done' 153 154 res = await asyncio.wait_for(coro(), timeout=None) 155 self.assertEqual(res, 'done') 156 157 async def test_wait_for_race_condition(self): 158 loop = asyncio.get_running_loop() 159 160 fut = loop.create_future() 161 task = asyncio.wait_for(fut, timeout=0.2) 162 loop.call_later(0.1, fut.set_result, "ok") 163 res = await task 164 self.assertEqual(res, "ok") 165 166 async def test_wait_for_cancellation_race_condition(self): 167 async def inner(): 168 with self.assertRaises(asyncio.CancelledError): 169 await asyncio.sleep(1) 170 return 1 171 172 result = await asyncio.wait_for(inner(), timeout=.01) 173 self.assertEqual(result, 1) 174 175 async def test_wait_for_waits_for_task_cancellation(self): 176 task_done = False 177 178 async def inner(): 179 nonlocal task_done 180 try: 181 await asyncio.sleep(10) 182 except asyncio.CancelledError: 183 await asyncio.sleep(_EPSILON) 184 raise 185 finally: 186 task_done = True 187 188 inner_task = asyncio.create_task(inner()) 189 190 with self.assertRaises(asyncio.TimeoutError) as cm: 191 await asyncio.wait_for(inner_task, timeout=_EPSILON) 192 193 self.assertTrue(task_done) 194 chained = cm.exception.__context__ 195 self.assertEqual(type(chained), asyncio.CancelledError) 196 197 async def test_wait_for_waits_for_task_cancellation_w_timeout_0(self): 198 task_done = False 199 200 async def foo(): 201 async def inner(): 202 nonlocal task_done 203 try: 204 await asyncio.sleep(10) 205 except asyncio.CancelledError: 206 await asyncio.sleep(_EPSILON) 207 raise 208 finally: 209 task_done = True 210 211 inner_task = asyncio.create_task(inner()) 212 await asyncio.sleep(_EPSILON) 213 await asyncio.wait_for(inner_task, timeout=0) 214 215 with self.assertRaises(asyncio.TimeoutError) as cm: 216 await foo() 217 218 self.assertTrue(task_done) 219 chained = cm.exception.__context__ 220 self.assertEqual(type(chained), asyncio.CancelledError) 221 222 async def test_wait_for_reraises_exception_during_cancellation(self): 223 class FooException(Exception): 224 pass 225 226 async def foo(): 227 async def inner(): 228 try: 229 await asyncio.sleep(0.2) 230 finally: 231 raise FooException 232 233 inner_task = asyncio.create_task(inner()) 234 235 await asyncio.wait_for(inner_task, timeout=_EPSILON) 236 237 with self.assertRaises(FooException): 238 await foo() 239 240 async def test_wait_for_self_cancellation(self): 241 async def inner(): 242 try: 243 await asyncio.sleep(0.3) 244 except asyncio.CancelledError: 245 try: 246 await asyncio.sleep(0.3) 247 except asyncio.CancelledError: 248 await asyncio.sleep(0.3) 249 250 return 42 251 252 inner_task = asyncio.create_task(inner()) 253 254 wait = asyncio.wait_for(inner_task, timeout=0.1) 255 256 # Test that wait_for itself is properly cancellable 257 # even when the initial task holds up the initial cancellation. 258 task = asyncio.create_task(wait) 259 await asyncio.sleep(0.2) 260 task.cancel() 261 262 with self.assertRaises(asyncio.CancelledError): 263 await task 264 265 self.assertEqual(await inner_task, 42) 266 267 async def _test_cancel_wait_for(self, timeout): 268 loop = asyncio.get_running_loop() 269 270 async def blocking_coroutine(): 271 fut = loop.create_future() 272 # Block: fut result is never set 273 await fut 274 275 task = asyncio.create_task(blocking_coroutine()) 276 277 wait = asyncio.create_task(asyncio.wait_for(task, timeout)) 278 loop.call_soon(wait.cancel) 279 280 with self.assertRaises(asyncio.CancelledError): 281 await wait 282 283 # Python issue #23219: cancelling the wait must also cancel the task 284 self.assertTrue(task.cancelled()) 285 286 async def test_cancel_blocking_wait_for(self): 287 await self._test_cancel_wait_for(None) 288 289 async def test_cancel_wait_for(self): 290 await self._test_cancel_wait_for(60.0) 291 292 293if __name__ == '__main__': 294 unittest.main() 295