Lines Matching refs:self
37 Queue.cancel_join_thread = lambda self: None
40 Process.pid = property(lambda self: None)
44 def __init__(self, result):
45 self.result = result
46 self.exception = None
49 def __init__(self, exception):
50 self.exception = exception
54 def __init__(self, heartbeat, value):
55 self.heartbeat = heartbeat
56 self.value = value
116 def __init__(self, num_workers, heartbeat_timeout=1, notify_fun=None):
125 self.num_workers = num_workers
126 self.processes = []
127 self.terminated = False
128 self.abort_now = False
137 self.processing_count = 0
138 self.heartbeat_timeout = heartbeat_timeout
139 self.notify = notify_fun or (lambda x: x)
144 self.work_queue = Queue()
145 self.done_queue = Queue()
147 def imap_unordered(self, fn, gen,
164 if self.terminated:
169 self.advance = self._advance_more
174 for w in range(self.num_workers):
176 self.work_queue,
177 self.done_queue,
181 self.processes.append(p)
183 self.advance(gen)
184 while self.processing_count > 0:
190 result = self._get_result_from_queue()
197 if self.abort_now:
204 self.advance(gen)
211 self._terminate()
216 def _advance_more(self, gen):
217 while self.processing_count < self.num_workers * self.BUFFER_FACTOR:
219 self.work_queue.put(next(gen))
220 self.processing_count += 1
222 self.advance = self._advance_empty
225 def _advance_empty(self, gen):
228 def add(self, args):
231 assert not self.terminated
233 self.work_queue.put(args)
234 self.processing_count += 1
236 def abort(self):
242 self.abort_now = True
244 def _terminate_processes(self):
245 for p in self.processes:
251 def _terminate(self):
257 if self.terminated:
259 self.terminated = True
264 self.work_queue.get(True, 0.1)
269 for _ in self.processes:
272 self.work_queue.put("STOP")
274 if self.abort_now:
275 self._terminate_processes()
277 self.notify("Joining workers")
278 for p in self.processes:
283 self.notify("Draining queues")
285 while True: self.work_queue.get(False)
289 while True: self.done_queue.get(False)
293 def _get_result_from_queue(self):
304 result = self.done_queue.get(timeout=self.heartbeat_timeout)
305 self.processing_count -= 1