Lines Matching refs:self
67 def __init__(self):
68 self._closed = False
69 self._reader, self._writer = mp.Pipe(duplex=False)
71 def close(self):
72 if not self._closed:
73 self._closed = True
74 self._writer.close()
75 self._reader.close()
77 def wakeup(self):
78 if not self._closed:
79 self._writer.send_bytes(b"")
81 def clear(self):
82 if not self._closed:
83 while self._reader.poll():
84 self._reader.recv_bytes()
119 def __init__(self, tb):
120 self.tb = tb
121 def __str__(self):
122 return self.tb
125 def __init__(self, exc, tb):
127 self.exc = exc
130 self.exc.__traceback__ = None
131 self.tb = '\n"""\n%s"""' % tb
132 def __reduce__(self):
133 return _rebuild_exc, (self.exc, self.tb)
140 def __init__(self, future, fn, args, kwargs):
141 self.future = future
142 self.fn = fn
143 self.args = args
144 self.kwargs = kwargs
147 def __init__(self, work_id, exception=None, result=None, exit_pid=None):
148 self.work_id = work_id
149 self.exception = exception
150 self.result = result
151 self.exit_pid = exit_pid
154 def __init__(self, work_id, fn, args, kwargs):
155 self.work_id = work_id
156 self.fn = fn
157 self.args = args
158 self.kwargs = kwargs
163 def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
165 self.pending_work_items = pending_work_items
166 self.shutdown_lock = shutdown_lock
167 self.thread_wakeup = thread_wakeup
170 def _on_queue_feeder_error(self, e, obj):
174 work_item = self.pending_work_items.pop(obj.work_id, None)
175 with self.shutdown_lock:
176 self.thread_wakeup.wakeup()
286 def __init__(self, executor):
291 self.thread_wakeup = executor._executor_manager_thread_wakeup
292 self.shutdown_lock = executor._shutdown_lock
301 thread_wakeup=self.thread_wakeup,
302 shutdown_lock=self.shutdown_lock):
308 self.executor_reference = weakref.ref(executor, weakref_cb)
311 self.processes = executor._processes
315 self.call_queue = executor._call_queue
318 self.result_queue = executor._result_queue
321 self.work_ids_queue = executor._work_ids
325 self.max_tasks_per_child = executor._max_tasks_per_child
329 self.pending_work_items = executor._pending_work_items
333 def run(self):
337 self.add_call_item_to_queue()
339 result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
342 self.terminate_broken(cause)
345 self.process_result_item(result_item)
349 p = self.processes.pop(result_item.exit_pid)
356 if executor := self.executor_reference():
358 with self.shutdown_lock:
364 if self.is_shutting_down():
365 self.flag_executor_shutting_down()
370 self.add_call_item_to_queue()
374 if not self.pending_work_items:
375 self.join_executor_internals()
378 def add_call_item_to_queue(self):
382 if self.call_queue.full():
385 work_id = self.work_ids_queue.get(block=False)
389 work_item = self.pending_work_items[work_id]
392 self.call_queue.put(_CallItem(work_id,
398 del self.pending_work_items[work_id]
401 def wait_result_broken_or_wakeup(self):
407 result_reader = self.result_queue._reader
408 assert not self.thread_wakeup._closed
409 wakeup_reader = self.thread_wakeup._reader
411 worker_sentinels = [p.sentinel for p in list(self.processes.values())]
427 with self.shutdown_lock:
428 self.thread_wakeup.clear()
432 def process_result_item(self, result_item):
439 assert self.is_shutting_down()
440 p = self.processes.pop(result_item)
442 if not self.processes:
443 self.join_executor_internals()
447 work_item = self.pending_work_items.pop(result_item.work_id, None)
455 def is_shutting_down(self):
457 executor = self.executor_reference()
465 def terminate_broken(self, cause):
471 executor = self.executor_reference()
489 for work_id, work_item in self.pending_work_items.items():
493 self.pending_work_items.clear()
497 for p in self.processes.values():
501 self.join_executor_internals()
503 def flag_executor_shutting_down(self):
506 executor = self.executor_reference()
514 for work_id, work_item in self.pending_work_items.items():
517 self.pending_work_items = new_pending_work_items
522 self.work_ids_queue.get_nowait()
529 def shutdown_workers(self):
530 n_children_to_stop = self.get_n_children_alive()
535 and self.get_n_children_alive() > 0):
538 self.call_queue.put_nowait(None)
543 def join_executor_internals(self):
544 self.shutdown_workers()
546 self.call_queue.close()
547 self.call_queue.join_thread()
548 with self.shutdown_lock:
549 self.thread_wakeup.close()
552 for p in self.processes.values():
555 def get_n_children_alive(self):
557 return sum(p.is_alive() for p in self.processes.values())
616 def __init__(self, max_workers=None, mp_context=None,
639 self._max_workers = os.cpu_count() or 1
641 self._max_workers = min(_MAX_WINDOWS_WORKERS,
642 self._max_workers)
651 self._max_workers = max_workers
658 self._mp_context = mp_context
661 self._safe_to_dynamically_spawn_children = (
662 self._mp_context.get_start_method(allow_none=False) != "fork")
666 self._initializer = initializer
667 self._initargs = initargs
674 if self._mp_context.get_start_method(allow_none=False) == "fork":
679 self._max_tasks_per_child = max_tasks_per_child
682 self._executor_manager_thread = None
685 self._processes = {}
688 self._shutdown_thread = False
689 self._shutdown_lock = threading.Lock()
690 self._idle_worker_semaphore = threading.Semaphore(0)
691 self._broken = False
692 self._queue_count = 0
693 self._pending_work_items = {}
694 self._cancel_pending_futures = False
704 self._executor_manager_thread_wakeup = _ThreadWakeup()
710 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
711 self._call_queue = _SafeQueue(
712 max_size=queue_size, ctx=self._mp_context,
713 pending_work_items=self._pending_work_items,
714 shutdown_lock=self._shutdown_lock,
715 thread_wakeup=self._executor_manager_thread_wakeup)
719 self._call_queue._ignore_epipe = True
720 self._result_queue = mp_context.SimpleQueue()
721 self._work_ids = queue.Queue()
723 def _start_executor_manager_thread(self):
724 if self._executor_manager_thread is None:
726 if not self._safe_to_dynamically_spawn_children: # ie, using fork.
727 self._launch_processes()
728 self._executor_manager_thread = _ExecutorManagerThread(self)
729 self._executor_manager_thread.start()
730 _threads_wakeups[self._executor_manager_thread] = \
731 self._executor_manager_thread_wakeup
733 def _adjust_process_count(self):
735 if self._idle_worker_semaphore.acquire(blocking=False):
738 process_count = len(self._processes)
739 if process_count < self._max_workers:
744 # we know a thread is running (self._executor_manager_thread).
745 #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
746 self._spawn_process()
748 def _launch_processes(self):
750 assert not self._executor_manager_thread, (
753 for _ in range(len(self._processes), self._max_workers):
754 self._spawn_process()
756 def _spawn_process(self):
757 p = self._mp_context.Process(
759 args=(self._call_queue,
760 self._result_queue,
761 self._initializer,
762 self._initargs,
763 self._max_tasks_per_child))
765 self._processes[p.pid] = p
767 def submit(self, fn, /, *args, **kwargs):
768 with self._shutdown_lock:
769 if self._broken:
770 raise BrokenProcessPool(self._broken)
771 if self._shutdown_thread:
780 self._pending_work_items[self._queue_count] = w
781 self._work_ids.put(self._queue_count)
782 self._queue_count += 1
784 self._executor_manager_thread_wakeup.wakeup()
786 if self._safe_to_dynamically_spawn_children:
787 self._adjust_process_count()
788 self._start_executor_manager_thread()
792 def map(self, fn, *iterables, timeout=None, chunksize=1):
821 def shutdown(self, wait=True, *, cancel_futures=False):
822 with self._shutdown_lock:
823 self._cancel_pending_futures = cancel_futures
824 self._shutdown_thread = True
825 if self._executor_manager_thread_wakeup is not None:
827 self._executor_manager_thread_wakeup.wakeup()
829 if self._executor_manager_thread is not None and wait:
830 self._executor_manager_thread.join()
833 self._executor_manager_thread = None
834 self._call_queue = None
835 if self._result_queue is not None and wait:
836 self._result_queue.close()
837 self._result_queue = None
838 self._processes = None
839 self._executor_manager_thread_wakeup = None