Lines Matching refs:self
58 def __init__(self, tb):
59 self.tb = tb
60 def __str__(self):
61 return self.tb
64 def __init__(self, exc, tb):
67 self.exc = exc
68 self.tb = '\n"""\n%s"""' % tb
69 def __reduce__(self):
70 return rebuild_exc, (self.exc, self.tb)
84 def __init__(self, exc, value):
85 self.exc = repr(exc)
86 self.value = repr(value)
87 super(MaybeEncodingError, self).__init__(self.exc, self.value)
89 def __str__(self):
90 return "Error sending result: '%s'. Reason: '%s'" % (self.value,
91 self.exc)
93 def __repr__(self):
94 return "<%s: %s>" % (self.__class__.__name__, self)
157 def __init__(self, /, *args, notifier=None, **kwds):
158 self.notifier = notifier
161 def __delitem__(self, item):
170 if not self:
171 self.notifier.put(None)
183 def __init__(self, processes=None, initializer=None, initargs=(),
187 self._pool = []
188 self._state = INIT
190 self._ctx = context or get_context()
191 self._setup_queues()
192 self._taskqueue = queue.SimpleQueue()
193 # The _change_notifier queue exist to wake up self._handle_workers()
194 # when the cache (self._cache) is empty or when there is a change in
196 self._change_notifier = self._ctx.SimpleQueue()
197 self._cache = _PoolCache(notifier=self._change_notifier)
198 self._maxtasksperchild = maxtasksperchild
199 self._initializer = initializer
200 self._initargs = initargs
213 self._processes = processes
215 self._repopulate_pool()
217 for p in self._pool:
220 for p in self._pool:
224 sentinels = self._get_sentinels()
226 self._worker_handler = threading.Thread(
228 args=(self._cache, self._taskqueue, self._ctx, self.Process,
229 self._processes, self._pool, self._inqueue, self._outqueue,
230 self._initializer, self._initargs, self._maxtasksperchild,
231 self._wrap_exception, sentinels, self._change_notifier)
233 self._worker_handler.daemon = True
234 self._worker_handler._state = RUN
235 self._worker_handler.start()
238 self._task_handler = threading.Thread(
240 args=(self._taskqueue, self._quick_put, self._outqueue,
241 self._pool, self._cache)
243 self._task_handler.daemon = True
244 self._task_handler._state = RUN
245 self._task_handler.start()
247 self._result_handler = threading.Thread(
249 args=(self._outqueue, self._quick_get, self._cache)
251 self._result_handler.daemon = True
252 self._result_handler._state = RUN
253 self._result_handler.start()
255 self._terminate = util.Finalize(
256 self, self._terminate_pool,
257 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
258 self._change_notifier, self._worker_handler, self._task_handler,
259 self._result_handler, self._cache),
262 self._state = RUN
266 def __del__(self, _warn=warnings.warn, RUN=RUN):
267 if self._state == RUN:
268 _warn(f"unclosed running multiprocessing pool {self!r}",
269 ResourceWarning, source=self)
270 if getattr(self, '_change_notifier', None) is not None:
271 self._change_notifier.put(None)
273 def __repr__(self):
274 cls = self.__class__
276 f'state={self._state} '
277 f'pool_size={len(self._pool)}>')
279 def _get_sentinels(self):
280 task_queue_sentinels = [self._outqueue._reader]
281 self_notifier_sentinels = [self._change_notifier._reader]
305 def _repopulate_pool(self):
306 return self._repopulate_pool_static(self._ctx, self.Process,
307 self._processes,
308 self._pool, self._inqueue,
309 self._outqueue, self._initializer,
310 self._initargs,
311 self._maxtasksperchild,
312 self._wrap_exception)
345 def _setup_queues(self):
346 self._inqueue = self._ctx.SimpleQueue()
347 self._outqueue = self._ctx.SimpleQueue()
348 self._quick_put = self._inqueue._writer.send
349 self._quick_get = self._outqueue._reader.recv
351 def _check_running(self):
352 if self._state != RUN:
355 def apply(self, func, args=(), kwds={}):
360 return self.apply_async(func, args, kwds).get()
362 def map(self, func, iterable, chunksize=None):
367 return self._map_async(func, iterable, mapstar, chunksize).get()
369 def starmap(self, func, iterable, chunksize=None):
375 return self._map_async(func, iterable, starmapstar, chunksize).get()
377 def starmap_async(self, func, iterable, chunksize=None, callback=None,
382 return self._map_async(func, iterable, starmapstar, chunksize,
385 def _guarded_task_generation(self, result_job, func, iterable):
396 def imap(self, func, iterable, chunksize=1):
400 self._check_running()
402 result = IMapIterator(self)
403 self._taskqueue.put(
405 self._guarded_task_generation(result._job, func, iterable),
415 result = IMapIterator(self)
416 self._taskqueue.put(
418 self._guarded_task_generation(result._job,
425 def imap_unordered(self, func, iterable, chunksize=1):
429 self._check_running()
431 result = IMapUnorderedIterator(self)
432 self._taskqueue.put(
434 self._guarded_task_generation(result._job, func, iterable),
443 result = IMapUnorderedIterator(self)
444 self._taskqueue.put(
446 self._guarded_task_generation(result._job,
453 def apply_async(self, func, args=(), kwds={}, callback=None,
458 self._check_running()
459 result = ApplyResult(self, callback, error_callback)
460 self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
463 def map_async(self, func, iterable, chunksize=None, callback=None,
468 return self._map_async(func, iterable, mapstar, chunksize, callback,
471 def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
476 self._check_running()
481 chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
488 result = MapResult(self, chunksize, len(iterable), callback,
490 self._taskqueue.put(
492 self._guarded_task_generation(result._job,
642 def __reduce__(self):
647 def close(self):
649 if self._state == RUN:
650 self._state = CLOSE
651 self._worker_handler._state = CLOSE
652 self._change_notifier.put(None)
654 def terminate(self):
656 self._state = TERMINATE
657 self._terminate()
659 def join(self):
661 if self._state == RUN:
663 elif self._state not in (CLOSE, TERMINATE):
665 self._worker_handler.join()
666 self._task_handler.join()
667 self._result_handler.join()
668 for p in self._pool:
734 def __enter__(self):
735 self._check_running()
736 return self
738 def __exit__(self, exc_type, exc_val, exc_tb):
739 self.terminate()
747 def __init__(self, pool, callback, error_callback):
748 self._pool = pool
749 self._event = threading.Event()
750 self._job = next(job_counter)
751 self._cache = pool._cache
752 self._callback = callback
753 self._error_callback = error_callback
754 self._cache[self._job] = self
756 def ready(self):
757 return self._event.is_set()
759 def successful(self):
760 if not self.ready():
761 raise ValueError("{0!r} not ready".format(self))
762 return self._success
764 def wait(self, timeout=None):
765 self._event.wait(timeout)
767 def get(self, timeout=None):
768 self.wait(timeout)
769 if not self.ready():
771 if self._success:
772 return self._value
774 raise self._value
776 def _set(self, i, obj):
777 self._success, self._value = obj
778 if self._callback and self._success:
779 self._callback(self._value)
780 if self._error_callback and not self._success:
781 self._error_callback(self._value)
782 self._event.set()
783 del self._cache[self._job]
784 self._pool = None
796 def __init__(self, pool, chunksize, length, callback, error_callback):
797 ApplyResult.__init__(self, pool, callback,
799 self._success = True
800 self._value = [None] * length
801 self._chunksize = chunksize
803 self._number_left = 0
804 self._event.set()
805 del self._cache[self._job]
807 self._number_left = length//chunksize + bool(length % chunksize)
809 def _set(self, i, success_result):
810 self._number_left -= 1
812 if success and self._success:
813 self._value[i*self._chunksize:(i+1)*self._chunksize] = result
814 if self._number_left == 0:
815 if self._callback:
816 self._callback(self._value)
817 del self._cache[self._job]
818 self._event.set()
819 self._pool = None
821 if not success and self._success:
823 self._success = False
824 self._value = result
825 if self._number_left == 0:
827 if self._error_callback:
828 self._error_callback(self._value)
829 del self._cache[self._job]
830 self._event.set()
831 self._pool = None
839 def __init__(self, pool):
840 self._pool = pool
841 self._cond = threading.Condition(threading.Lock())
842 self._job = next(job_counter)
843 self._cache = pool._cache
844 self._items = collections.deque()
845 self._index = 0
846 self._length = None
847 self._unsorted = {}
848 self._cache[self._job] = self
850 def __iter__(self):
851 return self
853 def next(self, timeout=None):
854 with self._cond:
856 item = self._items.popleft()
858 if self._index == self._length:
859 self._pool = None
861 self._cond.wait(timeout)
863 item = self._items.popleft()
865 if self._index == self._length:
866 self._pool = None
877 def _set(self, i, obj):
878 with self._cond:
879 if self._index == i:
880 self._items.append(obj)
881 self._index += 1
882 while self._index in self._unsorted:
883 obj = self._unsorted.pop(self._index)
884 self._items.append(obj)
885 self._index += 1
886 self._cond.notify()
888 self._unsorted[i] = obj
890 if self._index == self._length:
891 del self._cache[self._job]
892 self._pool = None
894 def _set_length(self, length):
895 with self._cond:
896 self._length = length
897 if self._index == self._length:
898 self._cond.notify()
899 del self._cache[self._job]
900 self._pool = None
908 def _set(self, i, obj):
909 with self._cond:
910 self._items.append(obj)
911 self._index += 1
912 self._cond.notify()
913 if self._index == self._length:
914 del self._cache[self._job]
915 self._pool = None
929 def __init__(self, processes=None, initializer=None, initargs=()):
930 Pool.__init__(self, processes, initializer, initargs)
932 def _setup_queues(self):
933 self._inqueue = queue.SimpleQueue()
934 self._outqueue = queue.SimpleQueue()
935 self._quick_put = self._inqueue.put
936 self._quick_get = self._outqueue.get
938 def _get_sentinels(self):
939 return [self._change_notifier._reader]
956 def _wait_for_updates(self, sentinels, change_notifier, timeout):