17db96d56Sopenharmony_ci__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
27db96d56Sopenharmony_ci
37db96d56Sopenharmony_ciimport collections
47db96d56Sopenharmony_ciimport heapq
57db96d56Sopenharmony_cifrom types import GenericAlias
67db96d56Sopenharmony_ci
77db96d56Sopenharmony_cifrom . import locks
87db96d56Sopenharmony_cifrom . import mixins
97db96d56Sopenharmony_ci
107db96d56Sopenharmony_ci
117db96d56Sopenharmony_ciclass QueueEmpty(Exception):
127db96d56Sopenharmony_ci    """Raised when Queue.get_nowait() is called on an empty Queue."""
137db96d56Sopenharmony_ci    pass
147db96d56Sopenharmony_ci
157db96d56Sopenharmony_ci
167db96d56Sopenharmony_ciclass QueueFull(Exception):
177db96d56Sopenharmony_ci    """Raised when the Queue.put_nowait() method is called on a full Queue."""
187db96d56Sopenharmony_ci    pass
197db96d56Sopenharmony_ci
207db96d56Sopenharmony_ci
217db96d56Sopenharmony_ciclass Queue(mixins._LoopBoundMixin):
227db96d56Sopenharmony_ci    """A queue, useful for coordinating producer and consumer coroutines.
237db96d56Sopenharmony_ci
247db96d56Sopenharmony_ci    If maxsize is less than or equal to zero, the queue size is infinite. If it
257db96d56Sopenharmony_ci    is an integer greater than 0, then "await put()" will block when the
267db96d56Sopenharmony_ci    queue reaches maxsize, until an item is removed by get().
277db96d56Sopenharmony_ci
287db96d56Sopenharmony_ci    Unlike the standard library Queue, you can reliably know this Queue's size
297db96d56Sopenharmony_ci    with qsize(), since your single-threaded asyncio application won't be
307db96d56Sopenharmony_ci    interrupted between calling qsize() and doing an operation on the Queue.
317db96d56Sopenharmony_ci    """
327db96d56Sopenharmony_ci
337db96d56Sopenharmony_ci    def __init__(self, maxsize=0):
347db96d56Sopenharmony_ci        self._maxsize = maxsize
357db96d56Sopenharmony_ci
367db96d56Sopenharmony_ci        # Futures.
377db96d56Sopenharmony_ci        self._getters = collections.deque()
387db96d56Sopenharmony_ci        # Futures.
397db96d56Sopenharmony_ci        self._putters = collections.deque()
407db96d56Sopenharmony_ci        self._unfinished_tasks = 0
417db96d56Sopenharmony_ci        self._finished = locks.Event()
427db96d56Sopenharmony_ci        self._finished.set()
437db96d56Sopenharmony_ci        self._init(maxsize)
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci    # These three are overridable in subclasses.
467db96d56Sopenharmony_ci
477db96d56Sopenharmony_ci    def _init(self, maxsize):
487db96d56Sopenharmony_ci        self._queue = collections.deque()
497db96d56Sopenharmony_ci
507db96d56Sopenharmony_ci    def _get(self):
517db96d56Sopenharmony_ci        return self._queue.popleft()
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ci    def _put(self, item):
547db96d56Sopenharmony_ci        self._queue.append(item)
557db96d56Sopenharmony_ci
567db96d56Sopenharmony_ci    # End of the overridable methods.
577db96d56Sopenharmony_ci
587db96d56Sopenharmony_ci    def _wakeup_next(self, waiters):
597db96d56Sopenharmony_ci        # Wake up the next waiter (if any) that isn't cancelled.
607db96d56Sopenharmony_ci        while waiters:
617db96d56Sopenharmony_ci            waiter = waiters.popleft()
627db96d56Sopenharmony_ci            if not waiter.done():
637db96d56Sopenharmony_ci                waiter.set_result(None)
647db96d56Sopenharmony_ci                break
657db96d56Sopenharmony_ci
667db96d56Sopenharmony_ci    def __repr__(self):
677db96d56Sopenharmony_ci        return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
687db96d56Sopenharmony_ci
697db96d56Sopenharmony_ci    def __str__(self):
707db96d56Sopenharmony_ci        return f'<{type(self).__name__} {self._format()}>'
717db96d56Sopenharmony_ci
727db96d56Sopenharmony_ci    __class_getitem__ = classmethod(GenericAlias)
737db96d56Sopenharmony_ci
747db96d56Sopenharmony_ci    def _format(self):
757db96d56Sopenharmony_ci        result = f'maxsize={self._maxsize!r}'
767db96d56Sopenharmony_ci        if getattr(self, '_queue', None):
777db96d56Sopenharmony_ci            result += f' _queue={list(self._queue)!r}'
787db96d56Sopenharmony_ci        if self._getters:
797db96d56Sopenharmony_ci            result += f' _getters[{len(self._getters)}]'
807db96d56Sopenharmony_ci        if self._putters:
817db96d56Sopenharmony_ci            result += f' _putters[{len(self._putters)}]'
827db96d56Sopenharmony_ci        if self._unfinished_tasks:
837db96d56Sopenharmony_ci            result += f' tasks={self._unfinished_tasks}'
847db96d56Sopenharmony_ci        return result
857db96d56Sopenharmony_ci
867db96d56Sopenharmony_ci    def qsize(self):
877db96d56Sopenharmony_ci        """Number of items in the queue."""
887db96d56Sopenharmony_ci        return len(self._queue)
897db96d56Sopenharmony_ci
907db96d56Sopenharmony_ci    @property
917db96d56Sopenharmony_ci    def maxsize(self):
927db96d56Sopenharmony_ci        """Number of items allowed in the queue."""
937db96d56Sopenharmony_ci        return self._maxsize
947db96d56Sopenharmony_ci
957db96d56Sopenharmony_ci    def empty(self):
967db96d56Sopenharmony_ci        """Return True if the queue is empty, False otherwise."""
977db96d56Sopenharmony_ci        return not self._queue
987db96d56Sopenharmony_ci
997db96d56Sopenharmony_ci    def full(self):
1007db96d56Sopenharmony_ci        """Return True if there are maxsize items in the queue.
1017db96d56Sopenharmony_ci
1027db96d56Sopenharmony_ci        Note: if the Queue was initialized with maxsize=0 (the default),
1037db96d56Sopenharmony_ci        then full() is never True.
1047db96d56Sopenharmony_ci        """
1057db96d56Sopenharmony_ci        if self._maxsize <= 0:
1067db96d56Sopenharmony_ci            return False
1077db96d56Sopenharmony_ci        else:
1087db96d56Sopenharmony_ci            return self.qsize() >= self._maxsize
1097db96d56Sopenharmony_ci
1107db96d56Sopenharmony_ci    async def put(self, item):
1117db96d56Sopenharmony_ci        """Put an item into the queue.
1127db96d56Sopenharmony_ci
1137db96d56Sopenharmony_ci        Put an item into the queue. If the queue is full, wait until a free
1147db96d56Sopenharmony_ci        slot is available before adding item.
1157db96d56Sopenharmony_ci        """
1167db96d56Sopenharmony_ci        while self.full():
1177db96d56Sopenharmony_ci            putter = self._get_loop().create_future()
1187db96d56Sopenharmony_ci            self._putters.append(putter)
1197db96d56Sopenharmony_ci            try:
1207db96d56Sopenharmony_ci                await putter
1217db96d56Sopenharmony_ci            except:
1227db96d56Sopenharmony_ci                putter.cancel()  # Just in case putter is not done yet.
1237db96d56Sopenharmony_ci                try:
1247db96d56Sopenharmony_ci                    # Clean self._putters from canceled putters.
1257db96d56Sopenharmony_ci                    self._putters.remove(putter)
1267db96d56Sopenharmony_ci                except ValueError:
1277db96d56Sopenharmony_ci                    # The putter could be removed from self._putters by a
1287db96d56Sopenharmony_ci                    # previous get_nowait call.
1297db96d56Sopenharmony_ci                    pass
1307db96d56Sopenharmony_ci                if not self.full() and not putter.cancelled():
1317db96d56Sopenharmony_ci                    # We were woken up by get_nowait(), but can't take
1327db96d56Sopenharmony_ci                    # the call.  Wake up the next in line.
1337db96d56Sopenharmony_ci                    self._wakeup_next(self._putters)
1347db96d56Sopenharmony_ci                raise
1357db96d56Sopenharmony_ci        return self.put_nowait(item)
1367db96d56Sopenharmony_ci
1377db96d56Sopenharmony_ci    def put_nowait(self, item):
1387db96d56Sopenharmony_ci        """Put an item into the queue without blocking.
1397db96d56Sopenharmony_ci
1407db96d56Sopenharmony_ci        If no free slot is immediately available, raise QueueFull.
1417db96d56Sopenharmony_ci        """
1427db96d56Sopenharmony_ci        if self.full():
1437db96d56Sopenharmony_ci            raise QueueFull
1447db96d56Sopenharmony_ci        self._put(item)
1457db96d56Sopenharmony_ci        self._unfinished_tasks += 1
1467db96d56Sopenharmony_ci        self._finished.clear()
1477db96d56Sopenharmony_ci        self._wakeup_next(self._getters)
1487db96d56Sopenharmony_ci
1497db96d56Sopenharmony_ci    async def get(self):
1507db96d56Sopenharmony_ci        """Remove and return an item from the queue.
1517db96d56Sopenharmony_ci
1527db96d56Sopenharmony_ci        If queue is empty, wait until an item is available.
1537db96d56Sopenharmony_ci        """
1547db96d56Sopenharmony_ci        while self.empty():
1557db96d56Sopenharmony_ci            getter = self._get_loop().create_future()
1567db96d56Sopenharmony_ci            self._getters.append(getter)
1577db96d56Sopenharmony_ci            try:
1587db96d56Sopenharmony_ci                await getter
1597db96d56Sopenharmony_ci            except:
1607db96d56Sopenharmony_ci                getter.cancel()  # Just in case getter is not done yet.
1617db96d56Sopenharmony_ci                try:
1627db96d56Sopenharmony_ci                    # Clean self._getters from canceled getters.
1637db96d56Sopenharmony_ci                    self._getters.remove(getter)
1647db96d56Sopenharmony_ci                except ValueError:
1657db96d56Sopenharmony_ci                    # The getter could be removed from self._getters by a
1667db96d56Sopenharmony_ci                    # previous put_nowait call.
1677db96d56Sopenharmony_ci                    pass
1687db96d56Sopenharmony_ci                if not self.empty() and not getter.cancelled():
1697db96d56Sopenharmony_ci                    # We were woken up by put_nowait(), but can't take
1707db96d56Sopenharmony_ci                    # the call.  Wake up the next in line.
1717db96d56Sopenharmony_ci                    self._wakeup_next(self._getters)
1727db96d56Sopenharmony_ci                raise
1737db96d56Sopenharmony_ci        return self.get_nowait()
1747db96d56Sopenharmony_ci
1757db96d56Sopenharmony_ci    def get_nowait(self):
1767db96d56Sopenharmony_ci        """Remove and return an item from the queue.
1777db96d56Sopenharmony_ci
1787db96d56Sopenharmony_ci        Return an item if one is immediately available, else raise QueueEmpty.
1797db96d56Sopenharmony_ci        """
1807db96d56Sopenharmony_ci        if self.empty():
1817db96d56Sopenharmony_ci            raise QueueEmpty
1827db96d56Sopenharmony_ci        item = self._get()
1837db96d56Sopenharmony_ci        self._wakeup_next(self._putters)
1847db96d56Sopenharmony_ci        return item
1857db96d56Sopenharmony_ci
1867db96d56Sopenharmony_ci    def task_done(self):
1877db96d56Sopenharmony_ci        """Indicate that a formerly enqueued task is complete.
1887db96d56Sopenharmony_ci
1897db96d56Sopenharmony_ci        Used by queue consumers. For each get() used to fetch a task,
1907db96d56Sopenharmony_ci        a subsequent call to task_done() tells the queue that the processing
1917db96d56Sopenharmony_ci        on the task is complete.
1927db96d56Sopenharmony_ci
1937db96d56Sopenharmony_ci        If a join() is currently blocking, it will resume when all items have
1947db96d56Sopenharmony_ci        been processed (meaning that a task_done() call was received for every
1957db96d56Sopenharmony_ci        item that had been put() into the queue).
1967db96d56Sopenharmony_ci
1977db96d56Sopenharmony_ci        Raises ValueError if called more times than there were items placed in
1987db96d56Sopenharmony_ci        the queue.
1997db96d56Sopenharmony_ci        """
2007db96d56Sopenharmony_ci        if self._unfinished_tasks <= 0:
2017db96d56Sopenharmony_ci            raise ValueError('task_done() called too many times')
2027db96d56Sopenharmony_ci        self._unfinished_tasks -= 1
2037db96d56Sopenharmony_ci        if self._unfinished_tasks == 0:
2047db96d56Sopenharmony_ci            self._finished.set()
2057db96d56Sopenharmony_ci
2067db96d56Sopenharmony_ci    async def join(self):
2077db96d56Sopenharmony_ci        """Block until all items in the queue have been gotten and processed.
2087db96d56Sopenharmony_ci
2097db96d56Sopenharmony_ci        The count of unfinished tasks goes up whenever an item is added to the
2107db96d56Sopenharmony_ci        queue. The count goes down whenever a consumer calls task_done() to
2117db96d56Sopenharmony_ci        indicate that the item was retrieved and all work on it is complete.
2127db96d56Sopenharmony_ci        When the count of unfinished tasks drops to zero, join() unblocks.
2137db96d56Sopenharmony_ci        """
2147db96d56Sopenharmony_ci        if self._unfinished_tasks > 0:
2157db96d56Sopenharmony_ci            await self._finished.wait()
2167db96d56Sopenharmony_ci
2177db96d56Sopenharmony_ci
2187db96d56Sopenharmony_ciclass PriorityQueue(Queue):
2197db96d56Sopenharmony_ci    """A subclass of Queue; retrieves entries in priority order (lowest first).
2207db96d56Sopenharmony_ci
2217db96d56Sopenharmony_ci    Entries are typically tuples of the form: (priority number, data).
2227db96d56Sopenharmony_ci    """
2237db96d56Sopenharmony_ci
2247db96d56Sopenharmony_ci    def _init(self, maxsize):
2257db96d56Sopenharmony_ci        self._queue = []
2267db96d56Sopenharmony_ci
2277db96d56Sopenharmony_ci    def _put(self, item, heappush=heapq.heappush):
2287db96d56Sopenharmony_ci        heappush(self._queue, item)
2297db96d56Sopenharmony_ci
2307db96d56Sopenharmony_ci    def _get(self, heappop=heapq.heappop):
2317db96d56Sopenharmony_ci        return heappop(self._queue)
2327db96d56Sopenharmony_ci
2337db96d56Sopenharmony_ci
2347db96d56Sopenharmony_ciclass LifoQueue(Queue):
2357db96d56Sopenharmony_ci    """A subclass of Queue that retrieves most recently added entries first."""
2367db96d56Sopenharmony_ci
2377db96d56Sopenharmony_ci    def _init(self, maxsize):
2387db96d56Sopenharmony_ci        self._queue = []
2397db96d56Sopenharmony_ci
2407db96d56Sopenharmony_ci    def _put(self, item):
2417db96d56Sopenharmony_ci        self._queue.append(item)
2427db96d56Sopenharmony_ci
2437db96d56Sopenharmony_ci    def _get(self):
2447db96d56Sopenharmony_ci        return self._queue.pop()
245