1import asyncio
2import unittest
3import time
4
5
6def tearDownModule():
7    asyncio.set_event_loop_policy(None)
8
9
10# The following value can be used as a very small timeout:
11# it passes check "timeout > 0", but has almost
12# no effect on the test performance
13_EPSILON = 0.0001
14
15
16class SlowTask:
17    """ Task will run for this defined time, ignoring cancel requests """
18    TASK_TIMEOUT = 0.2
19
20    def __init__(self):
21        self.exited = False
22
23    async def run(self):
24        exitat = time.monotonic() + self.TASK_TIMEOUT
25
26        while True:
27            tosleep = exitat - time.monotonic()
28            if tosleep <= 0:
29                break
30
31            try:
32                await asyncio.sleep(tosleep)
33            except asyncio.CancelledError:
34                pass
35
36        self.exited = True
37
38
39class AsyncioWaitForTest(unittest.IsolatedAsyncioTestCase):
40
41    async def test_asyncio_wait_for_cancelled(self):
42        t = SlowTask()
43
44        waitfortask = asyncio.create_task(
45            asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2))
46        await asyncio.sleep(0)
47        waitfortask.cancel()
48        await asyncio.wait({waitfortask})
49
50        self.assertTrue(t.exited)
51
52    async def test_asyncio_wait_for_timeout(self):
53        t = SlowTask()
54
55        try:
56            await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2)
57        except asyncio.TimeoutError:
58            pass
59
60        self.assertTrue(t.exited)
61
62    async def test_wait_for_timeout_less_then_0_or_0_future_done(self):
63        loop = asyncio.get_running_loop()
64
65        fut = loop.create_future()
66        fut.set_result('done')
67
68        t0 = loop.time()
69        ret = await asyncio.wait_for(fut, 0)
70        t1 = loop.time()
71
72        self.assertEqual(ret, 'done')
73        self.assertTrue(fut.done())
74        self.assertLess(t1 - t0, 0.1)
75
76    async def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
77        loop = asyncio.get_running_loop()
78
79        foo_started = False
80
81        async def foo():
82            nonlocal foo_started
83            foo_started = True
84
85        with self.assertRaises(asyncio.TimeoutError):
86            t0 = loop.time()
87            await asyncio.wait_for(foo(), 0)
88        t1 = loop.time()
89
90        self.assertEqual(foo_started, False)
91        self.assertLess(t1 - t0, 0.1)
92
93    async def test_wait_for_timeout_less_then_0_or_0(self):
94        loop = asyncio.get_running_loop()
95
96        for timeout in [0, -1]:
97            with self.subTest(timeout=timeout):
98                foo_running = None
99                started = loop.create_future()
100
101                async def foo():
102                    nonlocal foo_running
103                    foo_running = True
104                    started.set_result(None)
105                    try:
106                        await asyncio.sleep(10)
107                    finally:
108                        foo_running = False
109                    return 'done'
110
111                fut = asyncio.create_task(foo())
112                await started
113
114                with self.assertRaises(asyncio.TimeoutError):
115                    t0 = loop.time()
116                    await asyncio.wait_for(fut, timeout)
117                t1 = loop.time()
118
119                self.assertTrue(fut.done())
120                # it should have been cancelled due to the timeout
121                self.assertTrue(fut.cancelled())
122                self.assertEqual(foo_running, False)
123                self.assertLess(t1 - t0, 0.1)
124
125    async def test_wait_for(self):
126        loop = asyncio.get_running_loop()
127        foo_running = None
128
129        async def foo():
130            nonlocal foo_running
131            foo_running = True
132            try:
133                await asyncio.sleep(10)
134            finally:
135                foo_running = False
136            return 'done'
137
138        fut = asyncio.create_task(foo())
139
140        with self.assertRaises(asyncio.TimeoutError):
141            t0 = loop.time()
142            await asyncio.wait_for(fut, 0.1)
143        t1 = loop.time()
144        self.assertTrue(fut.done())
145        # it should have been cancelled due to the timeout
146        self.assertTrue(fut.cancelled())
147        self.assertLess(t1 - t0, 0.5)
148        self.assertEqual(foo_running, False)
149
150    async def test_wait_for_blocking(self):
151        async def coro():
152            return 'done'
153
154        res = await asyncio.wait_for(coro(), timeout=None)
155        self.assertEqual(res, 'done')
156
157    async def test_wait_for_race_condition(self):
158        loop = asyncio.get_running_loop()
159
160        fut = loop.create_future()
161        task = asyncio.wait_for(fut, timeout=0.2)
162        loop.call_later(0.1, fut.set_result, "ok")
163        res = await task
164        self.assertEqual(res, "ok")
165
166    async def test_wait_for_cancellation_race_condition(self):
167        async def inner():
168            with self.assertRaises(asyncio.CancelledError):
169                await asyncio.sleep(1)
170            return 1
171
172        result = await asyncio.wait_for(inner(), timeout=.01)
173        self.assertEqual(result, 1)
174
175    async def test_wait_for_waits_for_task_cancellation(self):
176        task_done = False
177
178        async def inner():
179            nonlocal task_done
180            try:
181                await asyncio.sleep(10)
182            except asyncio.CancelledError:
183                await asyncio.sleep(_EPSILON)
184                raise
185            finally:
186                task_done = True
187
188        inner_task = asyncio.create_task(inner())
189
190        with self.assertRaises(asyncio.TimeoutError) as cm:
191            await asyncio.wait_for(inner_task, timeout=_EPSILON)
192
193        self.assertTrue(task_done)
194        chained = cm.exception.__context__
195        self.assertEqual(type(chained), asyncio.CancelledError)
196
197    async def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):
198        task_done = False
199
200        async def foo():
201            async def inner():
202                nonlocal task_done
203                try:
204                    await asyncio.sleep(10)
205                except asyncio.CancelledError:
206                    await asyncio.sleep(_EPSILON)
207                    raise
208                finally:
209                    task_done = True
210
211            inner_task = asyncio.create_task(inner())
212            await asyncio.sleep(_EPSILON)
213            await asyncio.wait_for(inner_task, timeout=0)
214
215        with self.assertRaises(asyncio.TimeoutError) as cm:
216            await foo()
217
218        self.assertTrue(task_done)
219        chained = cm.exception.__context__
220        self.assertEqual(type(chained), asyncio.CancelledError)
221
222    async def test_wait_for_reraises_exception_during_cancellation(self):
223        class FooException(Exception):
224            pass
225
226        async def foo():
227            async def inner():
228                try:
229                    await asyncio.sleep(0.2)
230                finally:
231                    raise FooException
232
233            inner_task = asyncio.create_task(inner())
234
235            await asyncio.wait_for(inner_task, timeout=_EPSILON)
236
237        with self.assertRaises(FooException):
238            await foo()
239
240    async def test_wait_for_self_cancellation(self):
241        async def inner():
242            try:
243                await asyncio.sleep(0.3)
244            except asyncio.CancelledError:
245                try:
246                    await asyncio.sleep(0.3)
247                except asyncio.CancelledError:
248                    await asyncio.sleep(0.3)
249
250            return 42
251
252        inner_task = asyncio.create_task(inner())
253
254        wait = asyncio.wait_for(inner_task, timeout=0.1)
255
256        # Test that wait_for itself is properly cancellable
257        # even when the initial task holds up the initial cancellation.
258        task = asyncio.create_task(wait)
259        await asyncio.sleep(0.2)
260        task.cancel()
261
262        with self.assertRaises(asyncio.CancelledError):
263            await task
264
265        self.assertEqual(await inner_task, 42)
266
267    async def _test_cancel_wait_for(self, timeout):
268        loop = asyncio.get_running_loop()
269
270        async def blocking_coroutine():
271            fut = loop.create_future()
272            # Block: fut result is never set
273            await fut
274
275        task = asyncio.create_task(blocking_coroutine())
276
277        wait = asyncio.create_task(asyncio.wait_for(task, timeout))
278        loop.call_soon(wait.cancel)
279
280        with self.assertRaises(asyncio.CancelledError):
281            await wait
282
283        # Python issue #23219: cancelling the wait must also cancel the task
284        self.assertTrue(task.cancelled())
285
286    async def test_cancel_blocking_wait_for(self):
287        await self._test_cancel_wait_for(None)
288
289    async def test_cancel_wait_for(self):
290        await self._test_cancel_wait_for(60.0)
291
292
293if __name__ == '__main__':
294    unittest.main()
295