17db96d56Sopenharmony_ci'''A multi-producer, multi-consumer queue.''' 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport threading 47db96d56Sopenharmony_ciimport types 57db96d56Sopenharmony_cifrom collections import deque 67db96d56Sopenharmony_cifrom heapq import heappush, heappop 77db96d56Sopenharmony_cifrom time import monotonic as time 87db96d56Sopenharmony_citry: 97db96d56Sopenharmony_ci from _queue import SimpleQueue 107db96d56Sopenharmony_ciexcept ImportError: 117db96d56Sopenharmony_ci SimpleQueue = None 127db96d56Sopenharmony_ci 137db96d56Sopenharmony_ci__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue'] 147db96d56Sopenharmony_ci 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_citry: 177db96d56Sopenharmony_ci from _queue import Empty 187db96d56Sopenharmony_ciexcept ImportError: 197db96d56Sopenharmony_ci class Empty(Exception): 207db96d56Sopenharmony_ci 'Exception raised by Queue.get(block=0)/get_nowait().' 217db96d56Sopenharmony_ci pass 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ciclass Full(Exception): 247db96d56Sopenharmony_ci 'Exception raised by Queue.put(block=0)/put_nowait().' 257db96d56Sopenharmony_ci pass 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ciclass Queue: 297db96d56Sopenharmony_ci '''Create a queue object with a given maximum size. 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci If maxsize is <= 0, the queue size is infinite. 327db96d56Sopenharmony_ci ''' 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ci def __init__(self, maxsize=0): 357db96d56Sopenharmony_ci self.maxsize = maxsize 367db96d56Sopenharmony_ci self._init(maxsize) 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ci # mutex must be held whenever the queue is mutating. All methods 397db96d56Sopenharmony_ci # that acquire mutex must release it before returning. mutex 407db96d56Sopenharmony_ci # is shared between the three conditions, so acquiring and 417db96d56Sopenharmony_ci # releasing the conditions also acquires and releases mutex. 427db96d56Sopenharmony_ci self.mutex = threading.Lock() 437db96d56Sopenharmony_ci 447db96d56Sopenharmony_ci # Notify not_empty whenever an item is added to the queue; a 457db96d56Sopenharmony_ci # thread waiting to get is notified then. 467db96d56Sopenharmony_ci self.not_empty = threading.Condition(self.mutex) 477db96d56Sopenharmony_ci 487db96d56Sopenharmony_ci # Notify not_full whenever an item is removed from the queue; 497db96d56Sopenharmony_ci # a thread waiting to put is notified then. 507db96d56Sopenharmony_ci self.not_full = threading.Condition(self.mutex) 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ci # Notify all_tasks_done whenever the number of unfinished tasks 537db96d56Sopenharmony_ci # drops to zero; thread waiting to join() is notified to resume 547db96d56Sopenharmony_ci self.all_tasks_done = threading.Condition(self.mutex) 557db96d56Sopenharmony_ci self.unfinished_tasks = 0 567db96d56Sopenharmony_ci 577db96d56Sopenharmony_ci def task_done(self): 587db96d56Sopenharmony_ci '''Indicate that a formerly enqueued task is complete. 597db96d56Sopenharmony_ci 607db96d56Sopenharmony_ci Used by Queue consumer threads. For each get() used to fetch a task, 617db96d56Sopenharmony_ci a subsequent call to task_done() tells the queue that the processing 627db96d56Sopenharmony_ci on the task is complete. 637db96d56Sopenharmony_ci 647db96d56Sopenharmony_ci If a join() is currently blocking, it will resume when all items 657db96d56Sopenharmony_ci have been processed (meaning that a task_done() call was received 667db96d56Sopenharmony_ci for every item that had been put() into the queue). 677db96d56Sopenharmony_ci 687db96d56Sopenharmony_ci Raises a ValueError if called more times than there were items 697db96d56Sopenharmony_ci placed in the queue. 707db96d56Sopenharmony_ci ''' 717db96d56Sopenharmony_ci with self.all_tasks_done: 727db96d56Sopenharmony_ci unfinished = self.unfinished_tasks - 1 737db96d56Sopenharmony_ci if unfinished <= 0: 747db96d56Sopenharmony_ci if unfinished < 0: 757db96d56Sopenharmony_ci raise ValueError('task_done() called too many times') 767db96d56Sopenharmony_ci self.all_tasks_done.notify_all() 777db96d56Sopenharmony_ci self.unfinished_tasks = unfinished 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_ci def join(self): 807db96d56Sopenharmony_ci '''Blocks until all items in the Queue have been gotten and processed. 817db96d56Sopenharmony_ci 827db96d56Sopenharmony_ci The count of unfinished tasks goes up whenever an item is added to the 837db96d56Sopenharmony_ci queue. The count goes down whenever a consumer thread calls task_done() 847db96d56Sopenharmony_ci to indicate the item was retrieved and all work on it is complete. 857db96d56Sopenharmony_ci 867db96d56Sopenharmony_ci When the count of unfinished tasks drops to zero, join() unblocks. 877db96d56Sopenharmony_ci ''' 887db96d56Sopenharmony_ci with self.all_tasks_done: 897db96d56Sopenharmony_ci while self.unfinished_tasks: 907db96d56Sopenharmony_ci self.all_tasks_done.wait() 917db96d56Sopenharmony_ci 927db96d56Sopenharmony_ci def qsize(self): 937db96d56Sopenharmony_ci '''Return the approximate size of the queue (not reliable!).''' 947db96d56Sopenharmony_ci with self.mutex: 957db96d56Sopenharmony_ci return self._qsize() 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_ci def empty(self): 987db96d56Sopenharmony_ci '''Return True if the queue is empty, False otherwise (not reliable!). 997db96d56Sopenharmony_ci 1007db96d56Sopenharmony_ci This method is likely to be removed at some point. Use qsize() == 0 1017db96d56Sopenharmony_ci as a direct substitute, but be aware that either approach risks a race 1027db96d56Sopenharmony_ci condition where a queue can grow before the result of empty() or 1037db96d56Sopenharmony_ci qsize() can be used. 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_ci To create code that needs to wait for all queued tasks to be 1067db96d56Sopenharmony_ci completed, the preferred technique is to use the join() method. 1077db96d56Sopenharmony_ci ''' 1087db96d56Sopenharmony_ci with self.mutex: 1097db96d56Sopenharmony_ci return not self._qsize() 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ci def full(self): 1127db96d56Sopenharmony_ci '''Return True if the queue is full, False otherwise (not reliable!). 1137db96d56Sopenharmony_ci 1147db96d56Sopenharmony_ci This method is likely to be removed at some point. Use qsize() >= n 1157db96d56Sopenharmony_ci as a direct substitute, but be aware that either approach risks a race 1167db96d56Sopenharmony_ci condition where a queue can shrink before the result of full() or 1177db96d56Sopenharmony_ci qsize() can be used. 1187db96d56Sopenharmony_ci ''' 1197db96d56Sopenharmony_ci with self.mutex: 1207db96d56Sopenharmony_ci return 0 < self.maxsize <= self._qsize() 1217db96d56Sopenharmony_ci 1227db96d56Sopenharmony_ci def put(self, item, block=True, timeout=None): 1237db96d56Sopenharmony_ci '''Put an item into the queue. 1247db96d56Sopenharmony_ci 1257db96d56Sopenharmony_ci If optional args 'block' is true and 'timeout' is None (the default), 1267db96d56Sopenharmony_ci block if necessary until a free slot is available. If 'timeout' is 1277db96d56Sopenharmony_ci a non-negative number, it blocks at most 'timeout' seconds and raises 1287db96d56Sopenharmony_ci the Full exception if no free slot was available within that time. 1297db96d56Sopenharmony_ci Otherwise ('block' is false), put an item on the queue if a free slot 1307db96d56Sopenharmony_ci is immediately available, else raise the Full exception ('timeout' 1317db96d56Sopenharmony_ci is ignored in that case). 1327db96d56Sopenharmony_ci ''' 1337db96d56Sopenharmony_ci with self.not_full: 1347db96d56Sopenharmony_ci if self.maxsize > 0: 1357db96d56Sopenharmony_ci if not block: 1367db96d56Sopenharmony_ci if self._qsize() >= self.maxsize: 1377db96d56Sopenharmony_ci raise Full 1387db96d56Sopenharmony_ci elif timeout is None: 1397db96d56Sopenharmony_ci while self._qsize() >= self.maxsize: 1407db96d56Sopenharmony_ci self.not_full.wait() 1417db96d56Sopenharmony_ci elif timeout < 0: 1427db96d56Sopenharmony_ci raise ValueError("'timeout' must be a non-negative number") 1437db96d56Sopenharmony_ci else: 1447db96d56Sopenharmony_ci endtime = time() + timeout 1457db96d56Sopenharmony_ci while self._qsize() >= self.maxsize: 1467db96d56Sopenharmony_ci remaining = endtime - time() 1477db96d56Sopenharmony_ci if remaining <= 0.0: 1487db96d56Sopenharmony_ci raise Full 1497db96d56Sopenharmony_ci self.not_full.wait(remaining) 1507db96d56Sopenharmony_ci self._put(item) 1517db96d56Sopenharmony_ci self.unfinished_tasks += 1 1527db96d56Sopenharmony_ci self.not_empty.notify() 1537db96d56Sopenharmony_ci 1547db96d56Sopenharmony_ci def get(self, block=True, timeout=None): 1557db96d56Sopenharmony_ci '''Remove and return an item from the queue. 1567db96d56Sopenharmony_ci 1577db96d56Sopenharmony_ci If optional args 'block' is true and 'timeout' is None (the default), 1587db96d56Sopenharmony_ci block if necessary until an item is available. If 'timeout' is 1597db96d56Sopenharmony_ci a non-negative number, it blocks at most 'timeout' seconds and raises 1607db96d56Sopenharmony_ci the Empty exception if no item was available within that time. 1617db96d56Sopenharmony_ci Otherwise ('block' is false), return an item if one is immediately 1627db96d56Sopenharmony_ci available, else raise the Empty exception ('timeout' is ignored 1637db96d56Sopenharmony_ci in that case). 1647db96d56Sopenharmony_ci ''' 1657db96d56Sopenharmony_ci with self.not_empty: 1667db96d56Sopenharmony_ci if not block: 1677db96d56Sopenharmony_ci if not self._qsize(): 1687db96d56Sopenharmony_ci raise Empty 1697db96d56Sopenharmony_ci elif timeout is None: 1707db96d56Sopenharmony_ci while not self._qsize(): 1717db96d56Sopenharmony_ci self.not_empty.wait() 1727db96d56Sopenharmony_ci elif timeout < 0: 1737db96d56Sopenharmony_ci raise ValueError("'timeout' must be a non-negative number") 1747db96d56Sopenharmony_ci else: 1757db96d56Sopenharmony_ci endtime = time() + timeout 1767db96d56Sopenharmony_ci while not self._qsize(): 1777db96d56Sopenharmony_ci remaining = endtime - time() 1787db96d56Sopenharmony_ci if remaining <= 0.0: 1797db96d56Sopenharmony_ci raise Empty 1807db96d56Sopenharmony_ci self.not_empty.wait(remaining) 1817db96d56Sopenharmony_ci item = self._get() 1827db96d56Sopenharmony_ci self.not_full.notify() 1837db96d56Sopenharmony_ci return item 1847db96d56Sopenharmony_ci 1857db96d56Sopenharmony_ci def put_nowait(self, item): 1867db96d56Sopenharmony_ci '''Put an item into the queue without blocking. 1877db96d56Sopenharmony_ci 1887db96d56Sopenharmony_ci Only enqueue the item if a free slot is immediately available. 1897db96d56Sopenharmony_ci Otherwise raise the Full exception. 1907db96d56Sopenharmony_ci ''' 1917db96d56Sopenharmony_ci return self.put(item, block=False) 1927db96d56Sopenharmony_ci 1937db96d56Sopenharmony_ci def get_nowait(self): 1947db96d56Sopenharmony_ci '''Remove and return an item from the queue without blocking. 1957db96d56Sopenharmony_ci 1967db96d56Sopenharmony_ci Only get an item if one is immediately available. Otherwise 1977db96d56Sopenharmony_ci raise the Empty exception. 1987db96d56Sopenharmony_ci ''' 1997db96d56Sopenharmony_ci return self.get(block=False) 2007db96d56Sopenharmony_ci 2017db96d56Sopenharmony_ci # Override these methods to implement other queue organizations 2027db96d56Sopenharmony_ci # (e.g. stack or priority queue). 2037db96d56Sopenharmony_ci # These will only be called with appropriate locks held 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_ci # Initialize the queue representation 2067db96d56Sopenharmony_ci def _init(self, maxsize): 2077db96d56Sopenharmony_ci self.queue = deque() 2087db96d56Sopenharmony_ci 2097db96d56Sopenharmony_ci def _qsize(self): 2107db96d56Sopenharmony_ci return len(self.queue) 2117db96d56Sopenharmony_ci 2127db96d56Sopenharmony_ci # Put a new item in the queue 2137db96d56Sopenharmony_ci def _put(self, item): 2147db96d56Sopenharmony_ci self.queue.append(item) 2157db96d56Sopenharmony_ci 2167db96d56Sopenharmony_ci # Get an item from the queue 2177db96d56Sopenharmony_ci def _get(self): 2187db96d56Sopenharmony_ci return self.queue.popleft() 2197db96d56Sopenharmony_ci 2207db96d56Sopenharmony_ci __class_getitem__ = classmethod(types.GenericAlias) 2217db96d56Sopenharmony_ci 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_ciclass PriorityQueue(Queue): 2247db96d56Sopenharmony_ci '''Variant of Queue that retrieves open entries in priority order (lowest first). 2257db96d56Sopenharmony_ci 2267db96d56Sopenharmony_ci Entries are typically tuples of the form: (priority number, data). 2277db96d56Sopenharmony_ci ''' 2287db96d56Sopenharmony_ci 2297db96d56Sopenharmony_ci def _init(self, maxsize): 2307db96d56Sopenharmony_ci self.queue = [] 2317db96d56Sopenharmony_ci 2327db96d56Sopenharmony_ci def _qsize(self): 2337db96d56Sopenharmony_ci return len(self.queue) 2347db96d56Sopenharmony_ci 2357db96d56Sopenharmony_ci def _put(self, item): 2367db96d56Sopenharmony_ci heappush(self.queue, item) 2377db96d56Sopenharmony_ci 2387db96d56Sopenharmony_ci def _get(self): 2397db96d56Sopenharmony_ci return heappop(self.queue) 2407db96d56Sopenharmony_ci 2417db96d56Sopenharmony_ci 2427db96d56Sopenharmony_ciclass LifoQueue(Queue): 2437db96d56Sopenharmony_ci '''Variant of Queue that retrieves most recently added entries first.''' 2447db96d56Sopenharmony_ci 2457db96d56Sopenharmony_ci def _init(self, maxsize): 2467db96d56Sopenharmony_ci self.queue = [] 2477db96d56Sopenharmony_ci 2487db96d56Sopenharmony_ci def _qsize(self): 2497db96d56Sopenharmony_ci return len(self.queue) 2507db96d56Sopenharmony_ci 2517db96d56Sopenharmony_ci def _put(self, item): 2527db96d56Sopenharmony_ci self.queue.append(item) 2537db96d56Sopenharmony_ci 2547db96d56Sopenharmony_ci def _get(self): 2557db96d56Sopenharmony_ci return self.queue.pop() 2567db96d56Sopenharmony_ci 2577db96d56Sopenharmony_ci 2587db96d56Sopenharmony_ciclass _PySimpleQueue: 2597db96d56Sopenharmony_ci '''Simple, unbounded FIFO queue. 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ci This pure Python implementation is not reentrant. 2627db96d56Sopenharmony_ci ''' 2637db96d56Sopenharmony_ci # Note: while this pure Python version provides fairness 2647db96d56Sopenharmony_ci # (by using a threading.Semaphore which is itself fair, being based 2657db96d56Sopenharmony_ci # on threading.Condition), fairness is not part of the API contract. 2667db96d56Sopenharmony_ci # This allows the C version to use a different implementation. 2677db96d56Sopenharmony_ci 2687db96d56Sopenharmony_ci def __init__(self): 2697db96d56Sopenharmony_ci self._queue = deque() 2707db96d56Sopenharmony_ci self._count = threading.Semaphore(0) 2717db96d56Sopenharmony_ci 2727db96d56Sopenharmony_ci def put(self, item, block=True, timeout=None): 2737db96d56Sopenharmony_ci '''Put the item on the queue. 2747db96d56Sopenharmony_ci 2757db96d56Sopenharmony_ci The optional 'block' and 'timeout' arguments are ignored, as this method 2767db96d56Sopenharmony_ci never blocks. They are provided for compatibility with the Queue class. 2777db96d56Sopenharmony_ci ''' 2787db96d56Sopenharmony_ci self._queue.append(item) 2797db96d56Sopenharmony_ci self._count.release() 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci def get(self, block=True, timeout=None): 2827db96d56Sopenharmony_ci '''Remove and return an item from the queue. 2837db96d56Sopenharmony_ci 2847db96d56Sopenharmony_ci If optional args 'block' is true and 'timeout' is None (the default), 2857db96d56Sopenharmony_ci block if necessary until an item is available. If 'timeout' is 2867db96d56Sopenharmony_ci a non-negative number, it blocks at most 'timeout' seconds and raises 2877db96d56Sopenharmony_ci the Empty exception if no item was available within that time. 2887db96d56Sopenharmony_ci Otherwise ('block' is false), return an item if one is immediately 2897db96d56Sopenharmony_ci available, else raise the Empty exception ('timeout' is ignored 2907db96d56Sopenharmony_ci in that case). 2917db96d56Sopenharmony_ci ''' 2927db96d56Sopenharmony_ci if timeout is not None and timeout < 0: 2937db96d56Sopenharmony_ci raise ValueError("'timeout' must be a non-negative number") 2947db96d56Sopenharmony_ci if not self._count.acquire(block, timeout): 2957db96d56Sopenharmony_ci raise Empty 2967db96d56Sopenharmony_ci return self._queue.popleft() 2977db96d56Sopenharmony_ci 2987db96d56Sopenharmony_ci def put_nowait(self, item): 2997db96d56Sopenharmony_ci '''Put an item into the queue without blocking. 3007db96d56Sopenharmony_ci 3017db96d56Sopenharmony_ci This is exactly equivalent to `put(item, block=False)` and is only provided 3027db96d56Sopenharmony_ci for compatibility with the Queue class. 3037db96d56Sopenharmony_ci ''' 3047db96d56Sopenharmony_ci return self.put(item, block=False) 3057db96d56Sopenharmony_ci 3067db96d56Sopenharmony_ci def get_nowait(self): 3077db96d56Sopenharmony_ci '''Remove and return an item from the queue without blocking. 3087db96d56Sopenharmony_ci 3097db96d56Sopenharmony_ci Only get an item if one is immediately available. Otherwise 3107db96d56Sopenharmony_ci raise the Empty exception. 3117db96d56Sopenharmony_ci ''' 3127db96d56Sopenharmony_ci return self.get(block=False) 3137db96d56Sopenharmony_ci 3147db96d56Sopenharmony_ci def empty(self): 3157db96d56Sopenharmony_ci '''Return True if the queue is empty, False otherwise (not reliable!).''' 3167db96d56Sopenharmony_ci return len(self._queue) == 0 3177db96d56Sopenharmony_ci 3187db96d56Sopenharmony_ci def qsize(self): 3197db96d56Sopenharmony_ci '''Return the approximate size of the queue (not reliable!).''' 3207db96d56Sopenharmony_ci return len(self._queue) 3217db96d56Sopenharmony_ci 3227db96d56Sopenharmony_ci __class_getitem__ = classmethod(types.GenericAlias) 3237db96d56Sopenharmony_ci 3247db96d56Sopenharmony_ci 3257db96d56Sopenharmony_ciif SimpleQueue is None: 3267db96d56Sopenharmony_ci SimpleQueue = _PySimpleQueue 327