17db96d56Sopenharmony_ci'''A multi-producer, multi-consumer queue.'''
27db96d56Sopenharmony_ci
37db96d56Sopenharmony_ciimport threading
47db96d56Sopenharmony_ciimport types
57db96d56Sopenharmony_cifrom collections import deque
67db96d56Sopenharmony_cifrom heapq import heappush, heappop
77db96d56Sopenharmony_cifrom time import monotonic as time
87db96d56Sopenharmony_citry:
97db96d56Sopenharmony_ci    from _queue import SimpleQueue
107db96d56Sopenharmony_ciexcept ImportError:
117db96d56Sopenharmony_ci    SimpleQueue = None
127db96d56Sopenharmony_ci
137db96d56Sopenharmony_ci__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']
147db96d56Sopenharmony_ci
157db96d56Sopenharmony_ci
167db96d56Sopenharmony_citry:
177db96d56Sopenharmony_ci    from _queue import Empty
187db96d56Sopenharmony_ciexcept ImportError:
197db96d56Sopenharmony_ci    class Empty(Exception):
207db96d56Sopenharmony_ci        'Exception raised by Queue.get(block=0)/get_nowait().'
217db96d56Sopenharmony_ci        pass
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_ciclass Full(Exception):
247db96d56Sopenharmony_ci    'Exception raised by Queue.put(block=0)/put_nowait().'
257db96d56Sopenharmony_ci    pass
267db96d56Sopenharmony_ci
277db96d56Sopenharmony_ci
287db96d56Sopenharmony_ciclass Queue:
297db96d56Sopenharmony_ci    '''Create a queue object with a given maximum size.
307db96d56Sopenharmony_ci
317db96d56Sopenharmony_ci    If maxsize is <= 0, the queue size is infinite.
327db96d56Sopenharmony_ci    '''
337db96d56Sopenharmony_ci
347db96d56Sopenharmony_ci    def __init__(self, maxsize=0):
357db96d56Sopenharmony_ci        self.maxsize = maxsize
367db96d56Sopenharmony_ci        self._init(maxsize)
377db96d56Sopenharmony_ci
387db96d56Sopenharmony_ci        # mutex must be held whenever the queue is mutating.  All methods
397db96d56Sopenharmony_ci        # that acquire mutex must release it before returning.  mutex
407db96d56Sopenharmony_ci        # is shared between the three conditions, so acquiring and
417db96d56Sopenharmony_ci        # releasing the conditions also acquires and releases mutex.
427db96d56Sopenharmony_ci        self.mutex = threading.Lock()
437db96d56Sopenharmony_ci
447db96d56Sopenharmony_ci        # Notify not_empty whenever an item is added to the queue; a
457db96d56Sopenharmony_ci        # thread waiting to get is notified then.
467db96d56Sopenharmony_ci        self.not_empty = threading.Condition(self.mutex)
477db96d56Sopenharmony_ci
487db96d56Sopenharmony_ci        # Notify not_full whenever an item is removed from the queue;
497db96d56Sopenharmony_ci        # a thread waiting to put is notified then.
507db96d56Sopenharmony_ci        self.not_full = threading.Condition(self.mutex)
517db96d56Sopenharmony_ci
527db96d56Sopenharmony_ci        # Notify all_tasks_done whenever the number of unfinished tasks
537db96d56Sopenharmony_ci        # drops to zero; thread waiting to join() is notified to resume
547db96d56Sopenharmony_ci        self.all_tasks_done = threading.Condition(self.mutex)
557db96d56Sopenharmony_ci        self.unfinished_tasks = 0
567db96d56Sopenharmony_ci
577db96d56Sopenharmony_ci    def task_done(self):
587db96d56Sopenharmony_ci        '''Indicate that a formerly enqueued task is complete.
597db96d56Sopenharmony_ci
607db96d56Sopenharmony_ci        Used by Queue consumer threads.  For each get() used to fetch a task,
617db96d56Sopenharmony_ci        a subsequent call to task_done() tells the queue that the processing
627db96d56Sopenharmony_ci        on the task is complete.
637db96d56Sopenharmony_ci
647db96d56Sopenharmony_ci        If a join() is currently blocking, it will resume when all items
657db96d56Sopenharmony_ci        have been processed (meaning that a task_done() call was received
667db96d56Sopenharmony_ci        for every item that had been put() into the queue).
677db96d56Sopenharmony_ci
687db96d56Sopenharmony_ci        Raises a ValueError if called more times than there were items
697db96d56Sopenharmony_ci        placed in the queue.
707db96d56Sopenharmony_ci        '''
717db96d56Sopenharmony_ci        with self.all_tasks_done:
727db96d56Sopenharmony_ci            unfinished = self.unfinished_tasks - 1
737db96d56Sopenharmony_ci            if unfinished <= 0:
747db96d56Sopenharmony_ci                if unfinished < 0:
757db96d56Sopenharmony_ci                    raise ValueError('task_done() called too many times')
767db96d56Sopenharmony_ci                self.all_tasks_done.notify_all()
777db96d56Sopenharmony_ci            self.unfinished_tasks = unfinished
787db96d56Sopenharmony_ci
797db96d56Sopenharmony_ci    def join(self):
807db96d56Sopenharmony_ci        '''Blocks until all items in the Queue have been gotten and processed.
817db96d56Sopenharmony_ci
827db96d56Sopenharmony_ci        The count of unfinished tasks goes up whenever an item is added to the
837db96d56Sopenharmony_ci        queue. The count goes down whenever a consumer thread calls task_done()
847db96d56Sopenharmony_ci        to indicate the item was retrieved and all work on it is complete.
857db96d56Sopenharmony_ci
867db96d56Sopenharmony_ci        When the count of unfinished tasks drops to zero, join() unblocks.
877db96d56Sopenharmony_ci        '''
887db96d56Sopenharmony_ci        with self.all_tasks_done:
897db96d56Sopenharmony_ci            while self.unfinished_tasks:
907db96d56Sopenharmony_ci                self.all_tasks_done.wait()
917db96d56Sopenharmony_ci
927db96d56Sopenharmony_ci    def qsize(self):
937db96d56Sopenharmony_ci        '''Return the approximate size of the queue (not reliable!).'''
947db96d56Sopenharmony_ci        with self.mutex:
957db96d56Sopenharmony_ci            return self._qsize()
967db96d56Sopenharmony_ci
977db96d56Sopenharmony_ci    def empty(self):
987db96d56Sopenharmony_ci        '''Return True if the queue is empty, False otherwise (not reliable!).
997db96d56Sopenharmony_ci
1007db96d56Sopenharmony_ci        This method is likely to be removed at some point.  Use qsize() == 0
1017db96d56Sopenharmony_ci        as a direct substitute, but be aware that either approach risks a race
1027db96d56Sopenharmony_ci        condition where a queue can grow before the result of empty() or
1037db96d56Sopenharmony_ci        qsize() can be used.
1047db96d56Sopenharmony_ci
1057db96d56Sopenharmony_ci        To create code that needs to wait for all queued tasks to be
1067db96d56Sopenharmony_ci        completed, the preferred technique is to use the join() method.
1077db96d56Sopenharmony_ci        '''
1087db96d56Sopenharmony_ci        with self.mutex:
1097db96d56Sopenharmony_ci            return not self._qsize()
1107db96d56Sopenharmony_ci
1117db96d56Sopenharmony_ci    def full(self):
1127db96d56Sopenharmony_ci        '''Return True if the queue is full, False otherwise (not reliable!).
1137db96d56Sopenharmony_ci
1147db96d56Sopenharmony_ci        This method is likely to be removed at some point.  Use qsize() >= n
1157db96d56Sopenharmony_ci        as a direct substitute, but be aware that either approach risks a race
1167db96d56Sopenharmony_ci        condition where a queue can shrink before the result of full() or
1177db96d56Sopenharmony_ci        qsize() can be used.
1187db96d56Sopenharmony_ci        '''
1197db96d56Sopenharmony_ci        with self.mutex:
1207db96d56Sopenharmony_ci            return 0 < self.maxsize <= self._qsize()
1217db96d56Sopenharmony_ci
1227db96d56Sopenharmony_ci    def put(self, item, block=True, timeout=None):
1237db96d56Sopenharmony_ci        '''Put an item into the queue.
1247db96d56Sopenharmony_ci
1257db96d56Sopenharmony_ci        If optional args 'block' is true and 'timeout' is None (the default),
1267db96d56Sopenharmony_ci        block if necessary until a free slot is available. If 'timeout' is
1277db96d56Sopenharmony_ci        a non-negative number, it blocks at most 'timeout' seconds and raises
1287db96d56Sopenharmony_ci        the Full exception if no free slot was available within that time.
1297db96d56Sopenharmony_ci        Otherwise ('block' is false), put an item on the queue if a free slot
1307db96d56Sopenharmony_ci        is immediately available, else raise the Full exception ('timeout'
1317db96d56Sopenharmony_ci        is ignored in that case).
1327db96d56Sopenharmony_ci        '''
1337db96d56Sopenharmony_ci        with self.not_full:
1347db96d56Sopenharmony_ci            if self.maxsize > 0:
1357db96d56Sopenharmony_ci                if not block:
1367db96d56Sopenharmony_ci                    if self._qsize() >= self.maxsize:
1377db96d56Sopenharmony_ci                        raise Full
1387db96d56Sopenharmony_ci                elif timeout is None:
1397db96d56Sopenharmony_ci                    while self._qsize() >= self.maxsize:
1407db96d56Sopenharmony_ci                        self.not_full.wait()
1417db96d56Sopenharmony_ci                elif timeout < 0:
1427db96d56Sopenharmony_ci                    raise ValueError("'timeout' must be a non-negative number")
1437db96d56Sopenharmony_ci                else:
1447db96d56Sopenharmony_ci                    endtime = time() + timeout
1457db96d56Sopenharmony_ci                    while self._qsize() >= self.maxsize:
1467db96d56Sopenharmony_ci                        remaining = endtime - time()
1477db96d56Sopenharmony_ci                        if remaining <= 0.0:
1487db96d56Sopenharmony_ci                            raise Full
1497db96d56Sopenharmony_ci                        self.not_full.wait(remaining)
1507db96d56Sopenharmony_ci            self._put(item)
1517db96d56Sopenharmony_ci            self.unfinished_tasks += 1
1527db96d56Sopenharmony_ci            self.not_empty.notify()
1537db96d56Sopenharmony_ci
1547db96d56Sopenharmony_ci    def get(self, block=True, timeout=None):
1557db96d56Sopenharmony_ci        '''Remove and return an item from the queue.
1567db96d56Sopenharmony_ci
1577db96d56Sopenharmony_ci        If optional args 'block' is true and 'timeout' is None (the default),
1587db96d56Sopenharmony_ci        block if necessary until an item is available. If 'timeout' is
1597db96d56Sopenharmony_ci        a non-negative number, it blocks at most 'timeout' seconds and raises
1607db96d56Sopenharmony_ci        the Empty exception if no item was available within that time.
1617db96d56Sopenharmony_ci        Otherwise ('block' is false), return an item if one is immediately
1627db96d56Sopenharmony_ci        available, else raise the Empty exception ('timeout' is ignored
1637db96d56Sopenharmony_ci        in that case).
1647db96d56Sopenharmony_ci        '''
1657db96d56Sopenharmony_ci        with self.not_empty:
1667db96d56Sopenharmony_ci            if not block:
1677db96d56Sopenharmony_ci                if not self._qsize():
1687db96d56Sopenharmony_ci                    raise Empty
1697db96d56Sopenharmony_ci            elif timeout is None:
1707db96d56Sopenharmony_ci                while not self._qsize():
1717db96d56Sopenharmony_ci                    self.not_empty.wait()
1727db96d56Sopenharmony_ci            elif timeout < 0:
1737db96d56Sopenharmony_ci                raise ValueError("'timeout' must be a non-negative number")
1747db96d56Sopenharmony_ci            else:
1757db96d56Sopenharmony_ci                endtime = time() + timeout
1767db96d56Sopenharmony_ci                while not self._qsize():
1777db96d56Sopenharmony_ci                    remaining = endtime - time()
1787db96d56Sopenharmony_ci                    if remaining <= 0.0:
1797db96d56Sopenharmony_ci                        raise Empty
1807db96d56Sopenharmony_ci                    self.not_empty.wait(remaining)
1817db96d56Sopenharmony_ci            item = self._get()
1827db96d56Sopenharmony_ci            self.not_full.notify()
1837db96d56Sopenharmony_ci            return item
1847db96d56Sopenharmony_ci
1857db96d56Sopenharmony_ci    def put_nowait(self, item):
1867db96d56Sopenharmony_ci        '''Put an item into the queue without blocking.
1877db96d56Sopenharmony_ci
1887db96d56Sopenharmony_ci        Only enqueue the item if a free slot is immediately available.
1897db96d56Sopenharmony_ci        Otherwise raise the Full exception.
1907db96d56Sopenharmony_ci        '''
1917db96d56Sopenharmony_ci        return self.put(item, block=False)
1927db96d56Sopenharmony_ci
1937db96d56Sopenharmony_ci    def get_nowait(self):
1947db96d56Sopenharmony_ci        '''Remove and return an item from the queue without blocking.
1957db96d56Sopenharmony_ci
1967db96d56Sopenharmony_ci        Only get an item if one is immediately available. Otherwise
1977db96d56Sopenharmony_ci        raise the Empty exception.
1987db96d56Sopenharmony_ci        '''
1997db96d56Sopenharmony_ci        return self.get(block=False)
2007db96d56Sopenharmony_ci
2017db96d56Sopenharmony_ci    # Override these methods to implement other queue organizations
2027db96d56Sopenharmony_ci    # (e.g. stack or priority queue).
2037db96d56Sopenharmony_ci    # These will only be called with appropriate locks held
2047db96d56Sopenharmony_ci
2057db96d56Sopenharmony_ci    # Initialize the queue representation
2067db96d56Sopenharmony_ci    def _init(self, maxsize):
2077db96d56Sopenharmony_ci        self.queue = deque()
2087db96d56Sopenharmony_ci
2097db96d56Sopenharmony_ci    def _qsize(self):
2107db96d56Sopenharmony_ci        return len(self.queue)
2117db96d56Sopenharmony_ci
2127db96d56Sopenharmony_ci    # Put a new item in the queue
2137db96d56Sopenharmony_ci    def _put(self, item):
2147db96d56Sopenharmony_ci        self.queue.append(item)
2157db96d56Sopenharmony_ci
2167db96d56Sopenharmony_ci    # Get an item from the queue
2177db96d56Sopenharmony_ci    def _get(self):
2187db96d56Sopenharmony_ci        return self.queue.popleft()
2197db96d56Sopenharmony_ci
2207db96d56Sopenharmony_ci    __class_getitem__ = classmethod(types.GenericAlias)
2217db96d56Sopenharmony_ci
2227db96d56Sopenharmony_ci
2237db96d56Sopenharmony_ciclass PriorityQueue(Queue):
2247db96d56Sopenharmony_ci    '''Variant of Queue that retrieves open entries in priority order (lowest first).
2257db96d56Sopenharmony_ci
2267db96d56Sopenharmony_ci    Entries are typically tuples of the form:  (priority number, data).
2277db96d56Sopenharmony_ci    '''
2287db96d56Sopenharmony_ci
2297db96d56Sopenharmony_ci    def _init(self, maxsize):
2307db96d56Sopenharmony_ci        self.queue = []
2317db96d56Sopenharmony_ci
2327db96d56Sopenharmony_ci    def _qsize(self):
2337db96d56Sopenharmony_ci        return len(self.queue)
2347db96d56Sopenharmony_ci
2357db96d56Sopenharmony_ci    def _put(self, item):
2367db96d56Sopenharmony_ci        heappush(self.queue, item)
2377db96d56Sopenharmony_ci
2387db96d56Sopenharmony_ci    def _get(self):
2397db96d56Sopenharmony_ci        return heappop(self.queue)
2407db96d56Sopenharmony_ci
2417db96d56Sopenharmony_ci
2427db96d56Sopenharmony_ciclass LifoQueue(Queue):
2437db96d56Sopenharmony_ci    '''Variant of Queue that retrieves most recently added entries first.'''
2447db96d56Sopenharmony_ci
2457db96d56Sopenharmony_ci    def _init(self, maxsize):
2467db96d56Sopenharmony_ci        self.queue = []
2477db96d56Sopenharmony_ci
2487db96d56Sopenharmony_ci    def _qsize(self):
2497db96d56Sopenharmony_ci        return len(self.queue)
2507db96d56Sopenharmony_ci
2517db96d56Sopenharmony_ci    def _put(self, item):
2527db96d56Sopenharmony_ci        self.queue.append(item)
2537db96d56Sopenharmony_ci
2547db96d56Sopenharmony_ci    def _get(self):
2557db96d56Sopenharmony_ci        return self.queue.pop()
2567db96d56Sopenharmony_ci
2577db96d56Sopenharmony_ci
2587db96d56Sopenharmony_ciclass _PySimpleQueue:
2597db96d56Sopenharmony_ci    '''Simple, unbounded FIFO queue.
2607db96d56Sopenharmony_ci
2617db96d56Sopenharmony_ci    This pure Python implementation is not reentrant.
2627db96d56Sopenharmony_ci    '''
2637db96d56Sopenharmony_ci    # Note: while this pure Python version provides fairness
2647db96d56Sopenharmony_ci    # (by using a threading.Semaphore which is itself fair, being based
2657db96d56Sopenharmony_ci    #  on threading.Condition), fairness is not part of the API contract.
2667db96d56Sopenharmony_ci    # This allows the C version to use a different implementation.
2677db96d56Sopenharmony_ci
2687db96d56Sopenharmony_ci    def __init__(self):
2697db96d56Sopenharmony_ci        self._queue = deque()
2707db96d56Sopenharmony_ci        self._count = threading.Semaphore(0)
2717db96d56Sopenharmony_ci
2727db96d56Sopenharmony_ci    def put(self, item, block=True, timeout=None):
2737db96d56Sopenharmony_ci        '''Put the item on the queue.
2747db96d56Sopenharmony_ci
2757db96d56Sopenharmony_ci        The optional 'block' and 'timeout' arguments are ignored, as this method
2767db96d56Sopenharmony_ci        never blocks.  They are provided for compatibility with the Queue class.
2777db96d56Sopenharmony_ci        '''
2787db96d56Sopenharmony_ci        self._queue.append(item)
2797db96d56Sopenharmony_ci        self._count.release()
2807db96d56Sopenharmony_ci
2817db96d56Sopenharmony_ci    def get(self, block=True, timeout=None):
2827db96d56Sopenharmony_ci        '''Remove and return an item from the queue.
2837db96d56Sopenharmony_ci
2847db96d56Sopenharmony_ci        If optional args 'block' is true and 'timeout' is None (the default),
2857db96d56Sopenharmony_ci        block if necessary until an item is available. If 'timeout' is
2867db96d56Sopenharmony_ci        a non-negative number, it blocks at most 'timeout' seconds and raises
2877db96d56Sopenharmony_ci        the Empty exception if no item was available within that time.
2887db96d56Sopenharmony_ci        Otherwise ('block' is false), return an item if one is immediately
2897db96d56Sopenharmony_ci        available, else raise the Empty exception ('timeout' is ignored
2907db96d56Sopenharmony_ci        in that case).
2917db96d56Sopenharmony_ci        '''
2927db96d56Sopenharmony_ci        if timeout is not None and timeout < 0:
2937db96d56Sopenharmony_ci            raise ValueError("'timeout' must be a non-negative number")
2947db96d56Sopenharmony_ci        if not self._count.acquire(block, timeout):
2957db96d56Sopenharmony_ci            raise Empty
2967db96d56Sopenharmony_ci        return self._queue.popleft()
2977db96d56Sopenharmony_ci
2987db96d56Sopenharmony_ci    def put_nowait(self, item):
2997db96d56Sopenharmony_ci        '''Put an item into the queue without blocking.
3007db96d56Sopenharmony_ci
3017db96d56Sopenharmony_ci        This is exactly equivalent to `put(item, block=False)` and is only provided
3027db96d56Sopenharmony_ci        for compatibility with the Queue class.
3037db96d56Sopenharmony_ci        '''
3047db96d56Sopenharmony_ci        return self.put(item, block=False)
3057db96d56Sopenharmony_ci
3067db96d56Sopenharmony_ci    def get_nowait(self):
3077db96d56Sopenharmony_ci        '''Remove and return an item from the queue without blocking.
3087db96d56Sopenharmony_ci
3097db96d56Sopenharmony_ci        Only get an item if one is immediately available. Otherwise
3107db96d56Sopenharmony_ci        raise the Empty exception.
3117db96d56Sopenharmony_ci        '''
3127db96d56Sopenharmony_ci        return self.get(block=False)
3137db96d56Sopenharmony_ci
3147db96d56Sopenharmony_ci    def empty(self):
3157db96d56Sopenharmony_ci        '''Return True if the queue is empty, False otherwise (not reliable!).'''
3167db96d56Sopenharmony_ci        return len(self._queue) == 0
3177db96d56Sopenharmony_ci
3187db96d56Sopenharmony_ci    def qsize(self):
3197db96d56Sopenharmony_ci        '''Return the approximate size of the queue (not reliable!).'''
3207db96d56Sopenharmony_ci        return len(self._queue)
3217db96d56Sopenharmony_ci
3227db96d56Sopenharmony_ci    __class_getitem__ = classmethod(types.GenericAlias)
3237db96d56Sopenharmony_ci
3247db96d56Sopenharmony_ci
3257db96d56Sopenharmony_ciif SimpleQueue is None:
3267db96d56Sopenharmony_ci    SimpleQueue = _PySimpleQueue
327