Lines Matching refs:self
37 def __init__(self, maxsize=0, *, ctx):
41 self._maxsize = maxsize
42 self._reader, self._writer = connection.Pipe(duplex=False)
43 self._rlock = ctx.Lock()
44 self._opid = os.getpid()
46 self._wlock = None
48 self._wlock = ctx.Lock()
49 self._sem = ctx.BoundedSemaphore(maxsize)
51 self._ignore_epipe = False
52 self._reset()
55 register_after_fork(self, Queue._after_fork)
57 def __getstate__(self):
58 context.assert_spawning(self)
59 return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
60 self._rlock, self._wlock, self._sem, self._opid)
62 def __setstate__(self, state):
63 (self._ignore_epipe, self._maxsize, self._reader, self._writer,
64 self._rlock, self._wlock, self._sem, self._opid) = state
65 self._reset()
67 def _after_fork(self):
69 self._reset(after_fork=True)
71 def _reset(self, after_fork=False):
73 self._notempty._at_fork_reinit()
75 self._notempty = threading.Condition(threading.Lock())
76 self._buffer = collections.deque()
77 self._thread = None
78 self._jointhread = None
79 self._joincancelled = False
80 self._closed = False
81 self._close = None
82 self._send_bytes = self._writer.send_bytes
83 self._recv_bytes = self._reader.recv_bytes
84 self._poll = self._reader.poll
86 def put(self, obj, block=True, timeout=None):
87 if self._closed:
88 raise ValueError(f"Queue {self!r} is closed")
89 if not self._sem.acquire(block, timeout):
92 with self._notempty:
93 if self._thread is None:
94 self._start_thread()
95 self._buffer.append(obj)
96 self._notempty.notify()
98 def get(self, block=True, timeout=None):
99 if self._closed:
100 raise ValueError(f"Queue {self!r} is closed")
102 with self._rlock:
103 res = self._recv_bytes()
104 self._sem.release()
108 if not self._rlock.acquire(block, timeout):
113 if not self._poll(timeout):
115 elif not self._poll():
117 res = self._recv_bytes()
118 self._sem.release()
120 self._rlock.release()
124 def qsize(self):
126 return self._maxsize - self._sem._semlock._get_value()
128 def empty(self):
129 return not self._poll()
131 def full(self):
132 return self._sem._semlock._is_zero()
134 def get_nowait(self):
135 return self.get(False)
137 def put_nowait(self, obj):
138 return self.put(obj, False)
140 def close(self):
141 self._closed = True
142 close = self._close
144 self._close = None
147 def join_thread(self):
149 assert self._closed, "Queue {0!r} not closed".format(self)
150 if self._jointhread:
151 self._jointhread()
153 def cancel_join_thread(self):
155 self._joincancelled = True
157 self._jointhread.cancel()
161 def _start_thread(self):
165 self._buffer.clear()
166 self._thread = threading.Thread(
168 args=(self._buffer, self._notempty, self._send_bytes,
169 self._wlock, self._reader.close, self._writer.close,
170 self._ignore_epipe, self._on_queue_feeder_error,
171 self._sem),
174 self._thread.daemon = True
176 debug('doing self._thread.start()')
177 self._thread.start()
178 debug('... done self._thread.start()')
180 if not self._joincancelled:
181 self._jointhread = Finalize(
182 self._thread, Queue._finalize_join,
183 [weakref.ref(self._thread)],
188 self._close = Finalize(
189 self, Queue._finalize_close,
190 [self._buffer, self._notempty],
296 def __init__(self, maxsize=0, *, ctx):
297 Queue.__init__(self, maxsize, ctx=ctx)
298 self._unfinished_tasks = ctx.Semaphore(0)
299 self._cond = ctx.Condition()
301 def __getstate__(self):
302 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
304 def __setstate__(self, state):
305 Queue.__setstate__(self, state[:-2])
306 self._cond, self._unfinished_tasks = state[-2:]
308 def put(self, obj, block=True, timeout=None):
309 if self._closed:
310 raise ValueError(f"Queue {self!r} is closed")
311 if not self._sem.acquire(block, timeout):
314 with self._notempty, self._cond:
315 if self._thread is None:
316 self._start_thread()
317 self._buffer.append(obj)
318 self._unfinished_tasks.release()
319 self._notempty.notify()
321 def task_done(self):
322 with self._cond:
323 if not self._unfinished_tasks.acquire(False):
325 if self._unfinished_tasks._semlock._is_zero():
326 self._cond.notify_all()
328 def join(self):
329 with self._cond:
330 if not self._unfinished_tasks._semlock._is_zero():
331 self._cond.wait()
339 def __init__(self, *, ctx):
340 self._reader, self._writer = connection.Pipe(duplex=False)
341 self._rlock = ctx.Lock()
342 self._poll = self._reader.poll
344 self._wlock = None
346 self._wlock = ctx.Lock()
348 def close(self):
349 self._reader.close()
350 self._writer.close()
352 def empty(self):
353 return not self._poll()
355 def __getstate__(self):
356 context.assert_spawning(self)
357 return (self._reader, self._writer, self._rlock, self._wlock)
359 def __setstate__(self, state):
360 (self._reader, self._writer, self._rlock, self._wlock) = state
361 self._poll = self._reader.poll
363 def get(self):
364 with self._rlock:
365 res = self._reader.recv_bytes()
369 def put(self, obj):
372 if self._wlock is None:
374 self._writer.send_bytes(obj)
376 with self._wlock:
377 self._writer.send_bytes(obj)