Lines Matching refs:self

92     def my_method(self):
97 def __init__(self, mgr):
98 self.event = mgr.Event()
100 def __del__(self):
101 self.event.set()
109 def setUp(self):
110 self._thread_key = threading_helper.threading_setup()
112 def tearDown(self):
114 threading_helper.threading_cleanup(*self._thread_key)
121 def setUp(self):
124 self.t1 = time.monotonic()
125 if hasattr(self, "ctx"):
126 self.executor = self.executor_type(
127 max_workers=self.worker_count,
128 mp_context=self.get_context(),
129 **self.executor_kwargs)
131 self.executor = self.executor_type(
132 max_workers=self.worker_count,
133 **self.executor_kwargs)
135 def tearDown(self):
136 self.executor.shutdown(wait=True)
137 self.executor = None
139 dt = time.monotonic() - self.t1
142 self.assertLess(dt, 300, "synchronization issue: test lasted too long")
146 def get_context(self):
147 return mp.get_context(self.ctx)
158 def get_context(self):
162 self.skipTest("ProcessPoolExecutor unavailable on this system")
164 self.skipTest("require unix system")
172 def get_context(self):
176 self.skipTest("ProcessPoolExecutor unavailable on this system")
184 def get_context(self):
188 self.skipTest("ProcessPoolExecutor unavailable on this system")
190 self.skipTest("require unix system")
217 def setUp(self):
220 self.executor_kwargs = dict(initializer=init,
224 def test_initializer(self):
225 futures = [self.executor.submit(get_init_status)
226 for _ in range(self.worker_count)]
229 self.assertEqual(f.result(), 'initialized')
235 def setUp(self):
236 if hasattr(self, "ctx"):
238 self.mp_context = self.get_context()
239 self.log_queue = self.mp_context.Queue()
240 self.executor_kwargs = dict(initializer=init_fail,
241 initargs=(self.log_queue,))
245 self.mp_context = None
246 self.log_queue = None
247 self.executor_kwargs = dict(initializer=init_fail)
250 def test_initializer(self):
251 with self._assert_logged('ValueError: error in initializer'):
253 future = self.executor.submit(get_init_status)
258 with self.assertRaises(BrokenExecutor):
262 while not self.executor._broken:
264 self.fail("executor not broken after 5 s.")
267 with self.assertRaises(BrokenExecutor):
268 self.executor.submit(get_init_status)
271 def _assert_logged(self, msg):
272 if self.log_queue is not None:
277 output.append(self.log_queue.get_nowait().getMessage())
281 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
284 self.assertTrue(any(msg in line for line in output),
293 def test_run_after_shutdown(self):
294 self.executor.shutdown()
295 self.assertRaises(RuntimeError,
296 self.executor.submit,
299 def test_interpreter_shutdown(self):
314 """.format(executor_type=self.executor_type.__name__,
315 context=getattr(self, "ctx", "")))
318 self.assertFalse(err)
319 self.assertEqual(out.strip(), b"apple")
321 def test_submit_after_interpreter_shutdown(self):
342 """.format(executor_type=self.executor_type.__name__,
343 context=getattr(self, "ctx", "")))
346 self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
347 self.assertEqual(out.strip(), b"runtime-error")
349 def test_hang_issue12364(self):
350 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
351 self.executor.shutdown()
355 def test_cancel_futures(self):
356 assert self.worker_count <= 5, "test needs few workers"
357 fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
358 self.executor.shutdown(cancel_futures=True)
363 self.assertGreater(len(cancelled), 20)
370 self.assertTrue(fut.done(), msg=f"{fut._state=}")
371 self.assertIsNone(fut.exception())
376 self.assertGreater(len(others), 0)
378 def test_hang_gh83386(self):
383 if self.executor_type == futures.ProcessPoolExecutor:
395 """.format(executor_type=self.executor_type.__name__,
396 context=getattr(self, 'ctx', None)))
397 self.assertFalse(err)
398 self.assertEqual(out.strip(), b"apple")
400 def test_hang_gh94440(self):
414 if getattr(self, 'ctx', None):
415 kwargs['mp_context'] = self.get_context()
416 executor = self.executor_type(max_workers=1, **kwargs)
429 def test_threads_terminate(self):
435 self.executor.submit(acquire_lock, sem)
436 self.assertEqual(len(self.executor._threads), 3)
439 self.executor.shutdown()
440 for t in self.executor._threads:
443 def test_context_manager_shutdown(self):
446 self.assertEqual(list(e.map(abs, range(-5, 5))),
452 def test_del_shutdown(self):
465 def test_shutdown_no_wait(self):
480 def test_thread_names_assigned(self):
489 self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
492 def test_thread_names_default(self):
502 self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
505 def test_cancel_futures_wait_false(self):
518 self.assertFalse(err)
519 self.assertEqual(out.strip(), b"apple")
523 def test_processes_terminate(self):
527 mp_context = self.get_context()
530 expected_num_processes = self.worker_count
536 self.executor.submit(acquire_lock, sem)
537 self.assertEqual(len(self.executor._processes), expected_num_processes)
540 processes = self.executor._processes
541 self.executor.shutdown()
546 def test_context_manager_shutdown(self):
548 max_workers=5, mp_context=self.get_context()) as e:
550 self.assertEqual(list(e.map(abs, range(-5, 5))),
556 def test_del_shutdown(self):
558 max_workers=5, mp_context=self.get_context())
578 def test_shutdown_no_wait(self):
582 max_workers=5, mp_context=self.get_context())
608 def test_20369(self):
610 future = self.executor.submit(time.sleep, 1.5)
613 self.assertEqual({future}, done)
614 self.assertEqual(set(), not_done)
617 def test_first_completed(self):
618 future1 = self.executor.submit(mul, 21, 2)
619 future2 = self.executor.submit(time.sleep, 1.5)
625 self.assertEqual(set([future1]), done)
626 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
628 def test_first_completed_some_already_completed(self):
629 future1 = self.executor.submit(time.sleep, 1.5)
635 self.assertEqual(
638 self.assertEqual(set([future1]), pending)
640 def test_first_exception(self):
641 future1 = self.executor.submit(mul, 2, 21)
642 future2 = self.executor.submit(sleep_and_raise, 1.5)
643 future3 = self.executor.submit(time.sleep, 3)
649 self.assertEqual(set([future1, future2]), finished)
650 self.assertEqual(set([future3]), pending)
652 def test_first_exception_some_already_complete(self):
653 future1 = self.executor.submit(divmod, 21, 0)
654 future2 = self.executor.submit(time.sleep, 1.5)
663 self.assertEqual(set([SUCCESSFUL_FUTURE,
666 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
668 def test_first_exception_one_already_failed(self):
669 future1 = self.executor.submit(time.sleep, 2)
675 self.assertEqual(set([EXCEPTION_FUTURE]), finished)
676 self.assertEqual(set([future1]), pending)
678 def test_all_completed(self):
679 future1 = self.executor.submit(divmod, 2, 0)
680 future2 = self.executor.submit(mul, 2, 21)
690 self.assertEqual(set([SUCCESSFUL_FUTURE,
695 self.assertEqual(set(), pending)
697 def test_timeout(self):
698 future1 = self.executor.submit(mul, 6, 7)
699 future2 = self.executor.submit(time.sleep, 6)
709 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
713 self.assertEqual(set([future2]), pending)
718 def test_pending_calls_race(self):
727 fs = {self.executor.submit(future_func) for i in range(100)}
742 def test_no_timeout(self):
743 future1 = self.executor.submit(mul, 2, 21)
744 future2 = self.executor.submit(mul, 7, 6)
751 self.assertEqual(set(
758 def test_zero_timeout(self):
759 future1 = self.executor.submit(time.sleep, 2)
772 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
777 def test_duplicate_futures(self):
781 future1 = self.executor.submit(time.sleep, 2)
785 self.assertEqual(len(completed), 1)
787 def test_free_reference_yielded_future(self):
794 with self.assertRaises(futures.TimeoutError):
800 self.assertIsNone(wr())
808 self.assertIsNone(wr())
812 def test_correct_timeout_exception_msg(self):
816 with self.assertRaises(futures.TimeoutError) as cm:
819 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
828 def test_submit(self):
829 future = self.executor.submit(pow, 2, 8)
830 self.assertEqual(256, future.result())
832 def test_submit_keyword(self):
833 future = self.executor.submit(mul, 2, y=8)
834 self.assertEqual(16, future.result())
835 future = self.executor.submit(capture, 1, self=2, fn=3)
836 self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
837 with self.assertRaises(TypeError):
838 self.executor.submit(fn=capture, arg=1)
839 with self.assertRaises(TypeError):
840 self.executor.submit(arg=1)
842 def test_map(self):
843 self.assertEqual(
844 list(self.executor.map(pow, range(10), range(10))),
847 self.assertEqual(
848 list(self.executor.map(pow, range(10), range(10), chunksize=3)),
851 def test_map_exception(self):
852 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
853 self.assertEqual(i.__next__(), (0, 1))
854 self.assertEqual(i.__next__(), (0, 1))
855 self.assertRaises(ZeroDivisionError, i.__next__)
857 def test_map_timeout(self):
860 for i in self.executor.map(time.sleep,
867 self.fail('expected TimeoutError')
869 self.assertEqual([None, None], results)
871 def test_shutdown_race_issue12456(self):
875 self.executor.map(str, [2] * (self.worker_count + 1))
876 self.executor.shutdown()
879 def test_no_stale_references(self):
887 self.executor.submit(my_object.my_method)
891 self.assertTrue(collected,
894 def test_max_workers_negative(self):
896 with self.assertRaisesRegex(ValueError,
899 self.executor_type(max_workers=number)
901 def test_free_reference(self):
904 for obj in self.executor.map(make_dummy_object, range(10)):
908 self.assertIsNone(wr())
912 def test_map_submits_without_iteration(self):
918 self.executor.map(record_finished, range(10))
919 self.executor.shutdown(wait=True)
920 self.assertCountEqual(finished, range(10))
922 def test_default_workers(self):
923 executor = self.executor_type()
925 self.assertEqual(executor._max_workers, expected)
927 def test_saturation(self):
928 executor = self.executor_type(4)
935 self.assertEqual(len(executor._threads), executor._max_workers)
940 def test_idle_thread_reuse(self):
941 executor = self.executor_type()
945 self.assertEqual(len(executor._threads), 1)
949 def test_hang_global_shutdown_lock(self):
962 def test_executor_map_current_future_cancel(self):
973 with self.executor_type(max_workers=1) as pool:
980 with self.assertRaises(TimeoutError):
987 self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
993 def test_max_workers_too_large(self):
994 with self.assertRaisesRegex(ValueError,
998 def test_killed_child(self):
1001 futures = [self.executor.submit(time.sleep, 3)]
1003 p = next(iter(self.executor._processes.values()))
1006 self.assertRaises(BrokenProcessPool, fut.result)
1008 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
1010 def test_map_chunksize(self):
1012 list(self.executor.map(pow, range(40), range(40), chunksize=-1))
1015 self.assertEqual(
1016 list(self.executor.map(pow, range(40), range(40), chunksize=6)),
1018 self.assertEqual(
1019 list(self.executor.map(pow, range(40), range(40), chunksize=50)),
1021 self.assertEqual(
1022 list(self.executor.map(pow, range(40), range(40), chunksize=40)),
1024 self.assertRaises(ValueError, bad_map)
1030 def test_traceback(self):
1033 future = self.executor.submit(self._test_traceback)
1034 with self.assertRaises(Exception) as cm:
1038 self.assertIs(type(exc), RuntimeError)
1039 self.assertEqual(exc.args, (123,))
1041 self.assertIs(type(cause), futures.process._RemoteTraceback)
1042 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
1049 self.assertIn('raise RuntimeError(123) # some comment',
1053 def test_ressources_gced_in_workers(self):
1056 mgr = self.get_context().Manager()
1058 future = self.executor.submit(id, obj)
1061 self.assertTrue(obj.event.wait(timeout=1))
1071 def test_saturation(self):
1072 executor = self.executor
1073 mp_context = self.get_context()
1078 self.assertEqual(len(executor._processes), executor._max_workers)
1082 def test_idle_process_reuse_one(self):
1083 executor = self.executor
1085 if self.get_context().get_start_method(allow_none=False) == "fork":
1090 self.assertEqual(len(executor._processes), 1)
1092 def test_idle_process_reuse_multiple(self):
1093 executor = self.executor
1095 if self.get_context().get_start_method(allow_none=False) == "fork":
1103 self.assertLessEqual(len(executor._processes), 3)
1106 def test_max_tasks_per_child(self):
1107 context = self.get_context()
1109 with self.assertRaises(ValueError):
1110 self.executor_type(1, mp_context=context, max_tasks_per_child=3)
1112 # not using self.executor as we need to control construction.
1114 executor = self.executor_type(
1120 self.assertEqual(f2.result(), original_pid)
1121 self.assertEqual(len(executor._processes), 1)
1123 self.assertEqual(f3.result(), original_pid)
1129 self.assertNotEqual(original_pid, new_pid)
1130 self.assertEqual(len(executor._processes), 1)
1134 def test_max_tasks_per_child_defaults_to_spawn_context(self):
1135 # not using self.executor as we need to control construction.
1137 executor = self.executor_type(1, max_tasks_per_child=3)
1138 self.assertEqual(executor._mp_context.get_start_method(), "spawn")
1140 def test_max_tasks_early_shutdown(self):
1141 context = self.get_context()
1144 # not using self.executor as we need to control construction.
1146 executor = self.executor_type(
1153 self.assertEqual(future.result(), mul(i, i))
1194 def __reduce__(self):
1200 def __reduce__(self):
1206 def __reduce__(self):
1212 def __reduce__(self):
1218 def __reduce__(self):
1225 def __reduce__(self):
1233 def _fail_on_deadlock(self, executor):
1249 self.fail(f"Executor deadlock:\n\n{tb}")
1252 def _check_crash(self, error, func, *args, ignore_stderr=False):
1254 self.executor.shutdown(wait=True)
1256 executor = self.executor_type(
1257 max_workers=2, mp_context=self.get_context())
1266 with self.assertRaises(error):
1268 res.result(timeout=self.TIMEOUT)
1272 self._fail_on_deadlock(executor)
1275 def test_error_at_task_pickle(self):
1278 self._check_crash(PicklingError, id, ErrorAtPickle())
1280 def test_exit_at_task_unpickle(self):
1282 self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
1284 def test_error_at_task_unpickle(self):
1286 self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
1288 def test_crash_at_task_unpickle(self):
1290 self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
1292 def test_crash_during_func_exec_on_worker(self):
1294 self._check_crash(BrokenProcessPool, _crash)
1296 def test_exit_during_func_exec_on_worker(self):
1298 self._check_crash(SystemExit, _exit)
1300 def test_error_during_func_exec_on_worker(self):
1302 self._check_crash(RuntimeError, _raise_error, RuntimeError)
1304 def test_crash_during_result_pickle_on_worker(self):
1307 self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
1309 def test_exit_during_result_pickle_on_worker(self):
1312 self._check_crash(SystemExit, _return_instance, ExitAtPickle)
1314 def test_error_during_result_pickle_on_worker(self):
1317 self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
1319 def test_error_during_result_unpickle_in_result_handler(self):
1322 self._check_crash(BrokenProcessPool,
1326 def test_exit_during_result_unpickle_in_result_handler(self):
1329 self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
1331 def test_shutdown_deadlock(self):
1334 self.executor.shutdown(wait=True)
1335 with self.executor_type(max_workers=2,
1336 mp_context=self.get_context()) as executor:
1337 self.executor = executor # Allow clean up in fail_on_deadlock
1340 with self.assertRaises(BrokenProcessPool):
1343 def test_shutdown_deadlock_pickle(self):
1347 self.executor.shutdown(wait=True)
1348 with self.executor_type(max_workers=2,
1349 mp_context=self.get_context()) as executor:
1350 self.executor = executor # Allow clean up in fail_on_deadlock
1362 with self.assertRaises(PicklingError):
1377 def test_done_callback_with_result(self):
1386 self.assertEqual(5, callback_result)
1388 def test_done_callback_with_exception(self):
1397 self.assertEqual(('test',), callback_exception.args)
1399 def test_done_callback_with_cancel(self):
1407 self.assertTrue(f.cancel())
1408 self.assertTrue(was_cancelled)
1410 def test_done_callback_raises(self):
1428 self.assertTrue(raising_was_called)
1429 self.assertTrue(fn_was_called)
1430 self.assertIn('Exception: doh!', stderr.getvalue())
1432 def test_done_callback_already_successful(self):
1441 self.assertEqual(5, callback_result)
1443 def test_done_callback_already_failed(self):
1452 self.assertEqual(('test',), callback_exception.args)
1454 def test_done_callback_already_cancelled(self):
1461 self.assertTrue(f.cancel())
1463 self.assertTrue(was_cancelled)
1465 def test_done_callback_raises_already_succeeded(self):
1477 self.assertIn('exception calling callback for', stderr.getvalue())
1478 self.assertIn('doh!', stderr.getvalue())
1481 def test_repr(self):
1482 self.assertRegex(repr(PENDING_FUTURE),
1484 self.assertRegex(repr(RUNNING_FUTURE),
1486 self.assertRegex(repr(CANCELLED_FUTURE),
1488 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
1490 self.assertRegex(
1493 self.assertRegex(
1498 def test_cancel(self):
1506 self.assertTrue(f1.cancel())
1507 self.assertEqual(f1._state, CANCELLED)
1509 self.assertFalse(f2.cancel())
1510 self.assertEqual(f2._state, RUNNING)
1512 self.assertTrue(f3.cancel())
1513 self.assertEqual(f3._state, CANCELLED)
1515 self.assertTrue(f4.cancel())
1516 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
1518 self.assertFalse(f5.cancel())
1519 self.assertEqual(f5._state, FINISHED)
1521 self.assertFalse(f6.cancel())
1522 self.assertEqual(f6._state, FINISHED)
1524 def test_cancelled(self):
1525 self.assertFalse(PENDING_FUTURE.cancelled())
1526 self.assertFalse(RUNNING_FUTURE.cancelled())
1527 self.assertTrue(CANCELLED_FUTURE.cancelled())
1528 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
1529 self.assertFalse(EXCEPTION_FUTURE.cancelled())
1530 self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
1532 def test_done(self):
1533 self.assertFalse(PENDING_FUTURE.done())
1534 self.assertFalse(RUNNING_FUTURE.done())
1535 self.assertTrue(CANCELLED_FUTURE.done())
1536 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
1537 self.assertTrue(EXCEPTION_FUTURE.done())
1538 self.assertTrue(SUCCESSFUL_FUTURE.done())
1540 def test_running(self):
1541 self.assertFalse(PENDING_FUTURE.running())
1542 self.assertTrue(RUNNING_FUTURE.running())
1543 self.assertFalse(CANCELLED_FUTURE.running())
1544 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
1545 self.assertFalse(EXCEPTION_FUTURE.running())
1546 self.assertFalse(SUCCESSFUL_FUTURE.running())
1548 def test_result_with_timeout(self):
1549 self.assertRaises(futures.TimeoutError,
1551 self.assertRaises(futures.TimeoutError,
1553 self.assertRaises(futures.CancelledError,
1555 self.assertRaises(futures.CancelledError,
1557 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
1558 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
1560 def test_result_with_success(self):
1571 self.assertEqual(f1.result(timeout=5), 42)
1574 def test_result_with_cancel(self):
1585 self.assertRaises(futures.CancelledError,
1589 def test_exception_with_timeout(self):
1590 self.assertRaises(futures.TimeoutError,
1592 self.assertRaises(futures.TimeoutError,
1594 self.assertRaises(futures.CancelledError,
1596 self.assertRaises(futures.CancelledError,
1598 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
1600 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
1602 def test_exception_with_success(self):
1615 self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
1618 def test_multiple_set_result(self):
1622 with self.assertRaisesRegex(
1629 self.assertTrue(f.done())
1630 self.assertEqual(f.result(), 1)
1632 def test_multiple_set_exception(self):
1637 with self.assertRaisesRegex(
1644 self.assertEqual(f.exception(), e)