17db96d56Sopenharmony_ci__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport collections 47db96d56Sopenharmony_ciimport heapq 57db96d56Sopenharmony_cifrom types import GenericAlias 67db96d56Sopenharmony_ci 77db96d56Sopenharmony_cifrom . import locks 87db96d56Sopenharmony_cifrom . import mixins 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_ci 117db96d56Sopenharmony_ciclass QueueEmpty(Exception): 127db96d56Sopenharmony_ci """Raised when Queue.get_nowait() is called on an empty Queue.""" 137db96d56Sopenharmony_ci pass 147db96d56Sopenharmony_ci 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_ciclass QueueFull(Exception): 177db96d56Sopenharmony_ci """Raised when the Queue.put_nowait() method is called on a full Queue.""" 187db96d56Sopenharmony_ci pass 197db96d56Sopenharmony_ci 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_ciclass Queue(mixins._LoopBoundMixin): 227db96d56Sopenharmony_ci """A queue, useful for coordinating producer and consumer coroutines. 237db96d56Sopenharmony_ci 247db96d56Sopenharmony_ci If maxsize is less than or equal to zero, the queue size is infinite. If it 257db96d56Sopenharmony_ci is an integer greater than 0, then "await put()" will block when the 267db96d56Sopenharmony_ci queue reaches maxsize, until an item is removed by get(). 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ci Unlike the standard library Queue, you can reliably know this Queue's size 297db96d56Sopenharmony_ci with qsize(), since your single-threaded asyncio application won't be 307db96d56Sopenharmony_ci interrupted between calling qsize() and doing an operation on the Queue. 317db96d56Sopenharmony_ci """ 327db96d56Sopenharmony_ci 337db96d56Sopenharmony_ci def __init__(self, maxsize=0): 347db96d56Sopenharmony_ci self._maxsize = maxsize 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_ci # Futures. 377db96d56Sopenharmony_ci self._getters = collections.deque() 387db96d56Sopenharmony_ci # Futures. 397db96d56Sopenharmony_ci self._putters = collections.deque() 407db96d56Sopenharmony_ci self._unfinished_tasks = 0 417db96d56Sopenharmony_ci self._finished = locks.Event() 427db96d56Sopenharmony_ci self._finished.set() 437db96d56Sopenharmony_ci self._init(maxsize) 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci # These three are overridable in subclasses. 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_ci def _init(self, maxsize): 487db96d56Sopenharmony_ci self._queue = collections.deque() 497db96d56Sopenharmony_ci 507db96d56Sopenharmony_ci def _get(self): 517db96d56Sopenharmony_ci return self._queue.popleft() 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ci def _put(self, item): 547db96d56Sopenharmony_ci self._queue.append(item) 557db96d56Sopenharmony_ci 567db96d56Sopenharmony_ci # End of the overridable methods. 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ci def _wakeup_next(self, waiters): 597db96d56Sopenharmony_ci # Wake up the next waiter (if any) that isn't cancelled. 607db96d56Sopenharmony_ci while waiters: 617db96d56Sopenharmony_ci waiter = waiters.popleft() 627db96d56Sopenharmony_ci if not waiter.done(): 637db96d56Sopenharmony_ci waiter.set_result(None) 647db96d56Sopenharmony_ci break 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ci def __repr__(self): 677db96d56Sopenharmony_ci return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ci def __str__(self): 707db96d56Sopenharmony_ci return f'<{type(self).__name__} {self._format()}>' 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci __class_getitem__ = classmethod(GenericAlias) 737db96d56Sopenharmony_ci 747db96d56Sopenharmony_ci def _format(self): 757db96d56Sopenharmony_ci result = f'maxsize={self._maxsize!r}' 767db96d56Sopenharmony_ci if getattr(self, '_queue', None): 777db96d56Sopenharmony_ci result += f' _queue={list(self._queue)!r}' 787db96d56Sopenharmony_ci if self._getters: 797db96d56Sopenharmony_ci result += f' _getters[{len(self._getters)}]' 807db96d56Sopenharmony_ci if self._putters: 817db96d56Sopenharmony_ci result += f' _putters[{len(self._putters)}]' 827db96d56Sopenharmony_ci if self._unfinished_tasks: 837db96d56Sopenharmony_ci result += f' tasks={self._unfinished_tasks}' 847db96d56Sopenharmony_ci return result 857db96d56Sopenharmony_ci 867db96d56Sopenharmony_ci def qsize(self): 877db96d56Sopenharmony_ci """Number of items in the queue.""" 887db96d56Sopenharmony_ci return len(self._queue) 897db96d56Sopenharmony_ci 907db96d56Sopenharmony_ci @property 917db96d56Sopenharmony_ci def maxsize(self): 927db96d56Sopenharmony_ci """Number of items allowed in the queue.""" 937db96d56Sopenharmony_ci return self._maxsize 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci def empty(self): 967db96d56Sopenharmony_ci """Return True if the queue is empty, False otherwise.""" 977db96d56Sopenharmony_ci return not self._queue 987db96d56Sopenharmony_ci 997db96d56Sopenharmony_ci def full(self): 1007db96d56Sopenharmony_ci """Return True if there are maxsize items in the queue. 1017db96d56Sopenharmony_ci 1027db96d56Sopenharmony_ci Note: if the Queue was initialized with maxsize=0 (the default), 1037db96d56Sopenharmony_ci then full() is never True. 1047db96d56Sopenharmony_ci """ 1057db96d56Sopenharmony_ci if self._maxsize <= 0: 1067db96d56Sopenharmony_ci return False 1077db96d56Sopenharmony_ci else: 1087db96d56Sopenharmony_ci return self.qsize() >= self._maxsize 1097db96d56Sopenharmony_ci 1107db96d56Sopenharmony_ci async def put(self, item): 1117db96d56Sopenharmony_ci """Put an item into the queue. 1127db96d56Sopenharmony_ci 1137db96d56Sopenharmony_ci Put an item into the queue. If the queue is full, wait until a free 1147db96d56Sopenharmony_ci slot is available before adding item. 1157db96d56Sopenharmony_ci """ 1167db96d56Sopenharmony_ci while self.full(): 1177db96d56Sopenharmony_ci putter = self._get_loop().create_future() 1187db96d56Sopenharmony_ci self._putters.append(putter) 1197db96d56Sopenharmony_ci try: 1207db96d56Sopenharmony_ci await putter 1217db96d56Sopenharmony_ci except: 1227db96d56Sopenharmony_ci putter.cancel() # Just in case putter is not done yet. 1237db96d56Sopenharmony_ci try: 1247db96d56Sopenharmony_ci # Clean self._putters from canceled putters. 1257db96d56Sopenharmony_ci self._putters.remove(putter) 1267db96d56Sopenharmony_ci except ValueError: 1277db96d56Sopenharmony_ci # The putter could be removed from self._putters by a 1287db96d56Sopenharmony_ci # previous get_nowait call. 1297db96d56Sopenharmony_ci pass 1307db96d56Sopenharmony_ci if not self.full() and not putter.cancelled(): 1317db96d56Sopenharmony_ci # We were woken up by get_nowait(), but can't take 1327db96d56Sopenharmony_ci # the call. Wake up the next in line. 1337db96d56Sopenharmony_ci self._wakeup_next(self._putters) 1347db96d56Sopenharmony_ci raise 1357db96d56Sopenharmony_ci return self.put_nowait(item) 1367db96d56Sopenharmony_ci 1377db96d56Sopenharmony_ci def put_nowait(self, item): 1387db96d56Sopenharmony_ci """Put an item into the queue without blocking. 1397db96d56Sopenharmony_ci 1407db96d56Sopenharmony_ci If no free slot is immediately available, raise QueueFull. 1417db96d56Sopenharmony_ci """ 1427db96d56Sopenharmony_ci if self.full(): 1437db96d56Sopenharmony_ci raise QueueFull 1447db96d56Sopenharmony_ci self._put(item) 1457db96d56Sopenharmony_ci self._unfinished_tasks += 1 1467db96d56Sopenharmony_ci self._finished.clear() 1477db96d56Sopenharmony_ci self._wakeup_next(self._getters) 1487db96d56Sopenharmony_ci 1497db96d56Sopenharmony_ci async def get(self): 1507db96d56Sopenharmony_ci """Remove and return an item from the queue. 1517db96d56Sopenharmony_ci 1527db96d56Sopenharmony_ci If queue is empty, wait until an item is available. 1537db96d56Sopenharmony_ci """ 1547db96d56Sopenharmony_ci while self.empty(): 1557db96d56Sopenharmony_ci getter = self._get_loop().create_future() 1567db96d56Sopenharmony_ci self._getters.append(getter) 1577db96d56Sopenharmony_ci try: 1587db96d56Sopenharmony_ci await getter 1597db96d56Sopenharmony_ci except: 1607db96d56Sopenharmony_ci getter.cancel() # Just in case getter is not done yet. 1617db96d56Sopenharmony_ci try: 1627db96d56Sopenharmony_ci # Clean self._getters from canceled getters. 1637db96d56Sopenharmony_ci self._getters.remove(getter) 1647db96d56Sopenharmony_ci except ValueError: 1657db96d56Sopenharmony_ci # The getter could be removed from self._getters by a 1667db96d56Sopenharmony_ci # previous put_nowait call. 1677db96d56Sopenharmony_ci pass 1687db96d56Sopenharmony_ci if not self.empty() and not getter.cancelled(): 1697db96d56Sopenharmony_ci # We were woken up by put_nowait(), but can't take 1707db96d56Sopenharmony_ci # the call. Wake up the next in line. 1717db96d56Sopenharmony_ci self._wakeup_next(self._getters) 1727db96d56Sopenharmony_ci raise 1737db96d56Sopenharmony_ci return self.get_nowait() 1747db96d56Sopenharmony_ci 1757db96d56Sopenharmony_ci def get_nowait(self): 1767db96d56Sopenharmony_ci """Remove and return an item from the queue. 1777db96d56Sopenharmony_ci 1787db96d56Sopenharmony_ci Return an item if one is immediately available, else raise QueueEmpty. 1797db96d56Sopenharmony_ci """ 1807db96d56Sopenharmony_ci if self.empty(): 1817db96d56Sopenharmony_ci raise QueueEmpty 1827db96d56Sopenharmony_ci item = self._get() 1837db96d56Sopenharmony_ci self._wakeup_next(self._putters) 1847db96d56Sopenharmony_ci return item 1857db96d56Sopenharmony_ci 1867db96d56Sopenharmony_ci def task_done(self): 1877db96d56Sopenharmony_ci """Indicate that a formerly enqueued task is complete. 1887db96d56Sopenharmony_ci 1897db96d56Sopenharmony_ci Used by queue consumers. For each get() used to fetch a task, 1907db96d56Sopenharmony_ci a subsequent call to task_done() tells the queue that the processing 1917db96d56Sopenharmony_ci on the task is complete. 1927db96d56Sopenharmony_ci 1937db96d56Sopenharmony_ci If a join() is currently blocking, it will resume when all items have 1947db96d56Sopenharmony_ci been processed (meaning that a task_done() call was received for every 1957db96d56Sopenharmony_ci item that had been put() into the queue). 1967db96d56Sopenharmony_ci 1977db96d56Sopenharmony_ci Raises ValueError if called more times than there were items placed in 1987db96d56Sopenharmony_ci the queue. 1997db96d56Sopenharmony_ci """ 2007db96d56Sopenharmony_ci if self._unfinished_tasks <= 0: 2017db96d56Sopenharmony_ci raise ValueError('task_done() called too many times') 2027db96d56Sopenharmony_ci self._unfinished_tasks -= 1 2037db96d56Sopenharmony_ci if self._unfinished_tasks == 0: 2047db96d56Sopenharmony_ci self._finished.set() 2057db96d56Sopenharmony_ci 2067db96d56Sopenharmony_ci async def join(self): 2077db96d56Sopenharmony_ci """Block until all items in the queue have been gotten and processed. 2087db96d56Sopenharmony_ci 2097db96d56Sopenharmony_ci The count of unfinished tasks goes up whenever an item is added to the 2107db96d56Sopenharmony_ci queue. The count goes down whenever a consumer calls task_done() to 2117db96d56Sopenharmony_ci indicate that the item was retrieved and all work on it is complete. 2127db96d56Sopenharmony_ci When the count of unfinished tasks drops to zero, join() unblocks. 2137db96d56Sopenharmony_ci """ 2147db96d56Sopenharmony_ci if self._unfinished_tasks > 0: 2157db96d56Sopenharmony_ci await self._finished.wait() 2167db96d56Sopenharmony_ci 2177db96d56Sopenharmony_ci 2187db96d56Sopenharmony_ciclass PriorityQueue(Queue): 2197db96d56Sopenharmony_ci """A subclass of Queue; retrieves entries in priority order (lowest first). 2207db96d56Sopenharmony_ci 2217db96d56Sopenharmony_ci Entries are typically tuples of the form: (priority number, data). 2227db96d56Sopenharmony_ci """ 2237db96d56Sopenharmony_ci 2247db96d56Sopenharmony_ci def _init(self, maxsize): 2257db96d56Sopenharmony_ci self._queue = [] 2267db96d56Sopenharmony_ci 2277db96d56Sopenharmony_ci def _put(self, item, heappush=heapq.heappush): 2287db96d56Sopenharmony_ci heappush(self._queue, item) 2297db96d56Sopenharmony_ci 2307db96d56Sopenharmony_ci def _get(self, heappop=heapq.heappop): 2317db96d56Sopenharmony_ci return heappop(self._queue) 2327db96d56Sopenharmony_ci 2337db96d56Sopenharmony_ci 2347db96d56Sopenharmony_ciclass LifoQueue(Queue): 2357db96d56Sopenharmony_ci """A subclass of Queue that retrieves most recently added entries first.""" 2367db96d56Sopenharmony_ci 2377db96d56Sopenharmony_ci def _init(self, maxsize): 2387db96d56Sopenharmony_ci self._queue = [] 2397db96d56Sopenharmony_ci 2407db96d56Sopenharmony_ci def _put(self, item): 2417db96d56Sopenharmony_ci self._queue.append(item) 2427db96d56Sopenharmony_ci 2437db96d56Sopenharmony_ci def _get(self): 2447db96d56Sopenharmony_ci return self._queue.pop() 245