13f4cbf05Sopenharmony_ci/* 23f4cbf05Sopenharmony_ci * Copyright (c) 2021-2023 Huawei Device Co., Ltd. 33f4cbf05Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 43f4cbf05Sopenharmony_ci * you may not use this file except in compliance with the License. 53f4cbf05Sopenharmony_ci * You may obtain a copy of the License at 63f4cbf05Sopenharmony_ci * 73f4cbf05Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 83f4cbf05Sopenharmony_ci * 93f4cbf05Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 103f4cbf05Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 113f4cbf05Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 123f4cbf05Sopenharmony_ci * See the License for the specific language governing permissions and 133f4cbf05Sopenharmony_ci * limitations under the License. 143f4cbf05Sopenharmony_ci */ 153f4cbf05Sopenharmony_ci 163f4cbf05Sopenharmony_ci/** 173f4cbf05Sopenharmony_ci * @file safe_block_queue.h 183f4cbf05Sopenharmony_ci * 193f4cbf05Sopenharmony_ci * Provides interfaces for thread-safe blocking queues in c_utils. 203f4cbf05Sopenharmony_ci * The file includes the <b>SafeBlockQueue</b> class and 213f4cbf05Sopenharmony_ci * the <b>SafeBlockQueueTracking</b> class for trackable tasks. 223f4cbf05Sopenharmony_ci */ 233f4cbf05Sopenharmony_ci 243f4cbf05Sopenharmony_ci#ifndef UTILS_BASE_BLOCK_QUEUE_H 253f4cbf05Sopenharmony_ci#define UTILS_BASE_BLOCK_QUEUE_H 263f4cbf05Sopenharmony_ci 273f4cbf05Sopenharmony_ci#include <climits> 283f4cbf05Sopenharmony_ci#include <condition_variable> 293f4cbf05Sopenharmony_ci#include <mutex> 303f4cbf05Sopenharmony_ci#include <queue> 313f4cbf05Sopenharmony_ci#include <atomic> 323f4cbf05Sopenharmony_ci 333f4cbf05Sopenharmony_cinamespace OHOS { 343f4cbf05Sopenharmony_ci 353f4cbf05Sopenharmony_ci/** 363f4cbf05Sopenharmony_ci * @brief Provides interfaces for thread-safe blocking queues. 373f4cbf05Sopenharmony_ci * 383f4cbf05Sopenharmony_ci * The interfaces can be used to perform blocking and non-blocking push and 393f4cbf05Sopenharmony_ci * pop operations on queues. 403f4cbf05Sopenharmony_ci */ 413f4cbf05Sopenharmony_citemplate <typename T> 423f4cbf05Sopenharmony_ciclass SafeBlockQueue { 433f4cbf05Sopenharmony_cipublic: 443f4cbf05Sopenharmony_ci explicit SafeBlockQueue(int capacity) : maxSize_(capacity) 453f4cbf05Sopenharmony_ci { 463f4cbf05Sopenharmony_ci } 473f4cbf05Sopenharmony_ci 483f4cbf05Sopenharmony_ci/** 493f4cbf05Sopenharmony_ci * @brief Inserts an element at the end of this queue in blocking mode. 503f4cbf05Sopenharmony_ci * 513f4cbf05Sopenharmony_ci * If the queue is full, the thread of the push operation will be blocked 523f4cbf05Sopenharmony_ci * until the queue has space. 533f4cbf05Sopenharmony_ci * If the queue is not full, the push operation can be performed and one of the 543f4cbf05Sopenharmony_ci * pop threads (blocked when the queue is empty) is woken up. 553f4cbf05Sopenharmony_ci * 563f4cbf05Sopenharmony_ci * @param elem Indicates the element to insert. 573f4cbf05Sopenharmony_ci */ 583f4cbf05Sopenharmony_ci virtual void Push(T const& elem) 593f4cbf05Sopenharmony_ci { 603f4cbf05Sopenharmony_ci std::unique_lock<std::mutex> lock(mutexLock_); 613f4cbf05Sopenharmony_ci while (queueT_.size() >= maxSize_) { 623f4cbf05Sopenharmony_ci // If the queue is full, wait for jobs to be taken. 633f4cbf05Sopenharmony_ci cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); }); 643f4cbf05Sopenharmony_ci } 653f4cbf05Sopenharmony_ci 663f4cbf05Sopenharmony_ci // Insert the element into the queue if the queue is not full. 673f4cbf05Sopenharmony_ci queueT_.push(elem); 683f4cbf05Sopenharmony_ci cvNotEmpty_.notify_one(); 693f4cbf05Sopenharmony_ci } 703f4cbf05Sopenharmony_ci 713f4cbf05Sopenharmony_ci/** 723f4cbf05Sopenharmony_ci * @brief Removes the first element from this queue in blocking mode. 733f4cbf05Sopenharmony_ci * 743f4cbf05Sopenharmony_ci * If the queue is empty, the thread of the pop operation will be blocked 753f4cbf05Sopenharmony_ci * until the queue has elements. 763f4cbf05Sopenharmony_ci * If the queue is not empty, the pop operation can be performed, the first 773f4cbf05Sopenharmony_ci * element of the queue is returned, and one of the push threads (blocked 783f4cbf05Sopenharmony_ci * when the queue is full) is woken up. 793f4cbf05Sopenharmony_ci */ 803f4cbf05Sopenharmony_ci T Pop() 813f4cbf05Sopenharmony_ci { 823f4cbf05Sopenharmony_ci std::unique_lock<std::mutex> lock(mutexLock_); 833f4cbf05Sopenharmony_ci 843f4cbf05Sopenharmony_ci while (queueT_.empty()) { 853f4cbf05Sopenharmony_ci // If the queue is empty, wait for elements to be pushed in. 863f4cbf05Sopenharmony_ci cvNotEmpty_.wait(lock, [&] { return !queueT_.empty(); }); 873f4cbf05Sopenharmony_ci } 883f4cbf05Sopenharmony_ci 893f4cbf05Sopenharmony_ci T elem = queueT_.front(); 903f4cbf05Sopenharmony_ci queueT_.pop(); 913f4cbf05Sopenharmony_ci cvNotFull_.notify_one(); 923f4cbf05Sopenharmony_ci return elem; 933f4cbf05Sopenharmony_ci } 943f4cbf05Sopenharmony_ci 953f4cbf05Sopenharmony_ci/** 963f4cbf05Sopenharmony_ci * @brief Inserts an element at the end of this queue in non-blocking mode. 973f4cbf05Sopenharmony_ci * 983f4cbf05Sopenharmony_ci * If the queue is full, <b>false</b> is returned directly. 993f4cbf05Sopenharmony_ci * If the queue is not full, the push operation can be performed, one of the 1003f4cbf05Sopenharmony_ci * pop threads (blocked when the queue is empty) is woken up, and <b>true</b> 1013f4cbf05Sopenharmony_ci * is returned. 1023f4cbf05Sopenharmony_ci * 1033f4cbf05Sopenharmony_ci * @param elem Indicates the element to insert. 1043f4cbf05Sopenharmony_ci */ 1053f4cbf05Sopenharmony_ci virtual bool PushNoWait(T const& elem) 1063f4cbf05Sopenharmony_ci { 1073f4cbf05Sopenharmony_ci std::unique_lock<std::mutex> lock(mutexLock_); 1083f4cbf05Sopenharmony_ci if (queueT_.size() >= maxSize_) { 1093f4cbf05Sopenharmony_ci return false; 1103f4cbf05Sopenharmony_ci } 1113f4cbf05Sopenharmony_ci // Insert the element if the queue is not full. 1123f4cbf05Sopenharmony_ci queueT_.push(elem); 1133f4cbf05Sopenharmony_ci cvNotEmpty_.notify_one(); 1143f4cbf05Sopenharmony_ci return true; 1153f4cbf05Sopenharmony_ci } 1163f4cbf05Sopenharmony_ci 1173f4cbf05Sopenharmony_ci/** 1183f4cbf05Sopenharmony_ci * @brief Removes the first element from this queue in non-blocking mode. 1193f4cbf05Sopenharmony_ci * 1203f4cbf05Sopenharmony_ci * If the queue is empty, <b>false</b> is returned directly. 1213f4cbf05Sopenharmony_ci * If the queue is not empty, the pop operation can be performed, one of the 1223f4cbf05Sopenharmony_ci * push threads (blocked when the queue is full) is woken up, and <b>true</b> 1233f4cbf05Sopenharmony_ci * is returned. 1243f4cbf05Sopenharmony_ci * 1253f4cbf05Sopenharmony_ci * @param outtask Indicates the data of the pop operation. 1263f4cbf05Sopenharmony_ci */ 1273f4cbf05Sopenharmony_ci bool PopNotWait(T& outtask) 1283f4cbf05Sopenharmony_ci { 1293f4cbf05Sopenharmony_ci std::unique_lock<std::mutex> lock(mutexLock_); 1303f4cbf05Sopenharmony_ci if (queueT_.empty()) { 1313f4cbf05Sopenharmony_ci return false; 1323f4cbf05Sopenharmony_ci } 1333f4cbf05Sopenharmony_ci outtask = queueT_.front(); 1343f4cbf05Sopenharmony_ci queueT_.pop(); 1353f4cbf05Sopenharmony_ci 1363f4cbf05Sopenharmony_ci cvNotFull_.notify_one(); 1373f4cbf05Sopenharmony_ci 1383f4cbf05Sopenharmony_ci return true; 1393f4cbf05Sopenharmony_ci } 1403f4cbf05Sopenharmony_ci 1413f4cbf05Sopenharmony_ci unsigned int Size() 1423f4cbf05Sopenharmony_ci { 1433f4cbf05Sopenharmony_ci std::unique_lock<std::mutex> lock(mutexLock_); 1443f4cbf05Sopenharmony_ci return queueT_.size(); 1453f4cbf05Sopenharmony_ci } 1463f4cbf05Sopenharmony_ci 1473f4cbf05Sopenharmony_ci bool IsEmpty() 1483f4cbf05Sopenharmony_ci { 1493f4cbf05Sopenharmony_ci std::unique_lock<std::mutex> lock(mutexLock_); 1503f4cbf05Sopenharmony_ci return queueT_.empty(); 1513f4cbf05Sopenharmony_ci } 1523f4cbf05Sopenharmony_ci 1533f4cbf05Sopenharmony_ci bool IsFull() 1543f4cbf05Sopenharmony_ci { 1553f4cbf05Sopenharmony_ci std::unique_lock<std::mutex> lock(mutexLock_); 1563f4cbf05Sopenharmony_ci return queueT_.size() == maxSize_; 1573f4cbf05Sopenharmony_ci } 1583f4cbf05Sopenharmony_ci 1593f4cbf05Sopenharmony_ci virtual ~SafeBlockQueue() {} 1603f4cbf05Sopenharmony_ci 1613f4cbf05Sopenharmony_ciprotected: 1623f4cbf05Sopenharmony_ci unsigned long maxSize_; // Capacity of the queue 1633f4cbf05Sopenharmony_ci std::mutex mutexLock_; 1643f4cbf05Sopenharmony_ci std::condition_variable cvNotEmpty_; 1653f4cbf05Sopenharmony_ci std::condition_variable cvNotFull_; 1663f4cbf05Sopenharmony_ci std::queue<T> queueT_; 1673f4cbf05Sopenharmony_ci}; 1683f4cbf05Sopenharmony_ci 1693f4cbf05Sopenharmony_ci/** 1703f4cbf05Sopenharmony_ci * @brief Provides interfaces for operating the thread-safe blocking queues 1713f4cbf05Sopenharmony_ci * and tracking the number of pending tasks. 1723f4cbf05Sopenharmony_ci * This class inherits from <b>SafeBlockQueue</b>. 1733f4cbf05Sopenharmony_ci */ 1743f4cbf05Sopenharmony_citemplate <typename T> 1753f4cbf05Sopenharmony_ciclass SafeBlockQueueTracking : public SafeBlockQueue<T> { 1763f4cbf05Sopenharmony_cipublic: 1773f4cbf05Sopenharmony_ci explicit SafeBlockQueueTracking(int capacity) : SafeBlockQueue<T>(capacity) 1783f4cbf05Sopenharmony_ci { 1793f4cbf05Sopenharmony_ci unfinishedTaskCount_ = 0; 1803f4cbf05Sopenharmony_ci } 1813f4cbf05Sopenharmony_ci 1823f4cbf05Sopenharmony_ci virtual ~SafeBlockQueueTracking() {} 1833f4cbf05Sopenharmony_ci 1843f4cbf05Sopenharmony_ci/** 1853f4cbf05Sopenharmony_ci * @brief Inserts an element at the end of this queue in blocking mode. 1863f4cbf05Sopenharmony_ci * 1873f4cbf05Sopenharmony_ci * If the queue is full, the thread of the push operation will be blocked 1883f4cbf05Sopenharmony_ci * until the queue has space. 1893f4cbf05Sopenharmony_ci * If the queue is not full, the push operation can be performed and one of the 1903f4cbf05Sopenharmony_ci * pop threads (blocked when the queue is empty) is woken up. 1913f4cbf05Sopenharmony_ci */ 1923f4cbf05Sopenharmony_ci virtual void Push(T const& elem) 1933f4cbf05Sopenharmony_ci { 1943f4cbf05Sopenharmony_ci unfinishedTaskCount_++; 1953f4cbf05Sopenharmony_ci std::unique_lock<std::mutex> lock(mutexLock_); 1963f4cbf05Sopenharmony_ci while (queueT_.size() >= maxSize_) { 1973f4cbf05Sopenharmony_ci // If the queue is full, wait for jobs to be taken. 1983f4cbf05Sopenharmony_ci cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); }); 1993f4cbf05Sopenharmony_ci } 2003f4cbf05Sopenharmony_ci 2013f4cbf05Sopenharmony_ci // If the queue is not full, insert the element. 2023f4cbf05Sopenharmony_ci queueT_.push(elem); 2033f4cbf05Sopenharmony_ci 2043f4cbf05Sopenharmony_ci cvNotEmpty_.notify_one(); 2053f4cbf05Sopenharmony_ci } 2063f4cbf05Sopenharmony_ci 2073f4cbf05Sopenharmony_ci/** 2083f4cbf05Sopenharmony_ci * @brief Inserts an element at the end of this queue in non-blocking mode. 2093f4cbf05Sopenharmony_ci * 2103f4cbf05Sopenharmony_ci * If the queue is full, <b>false</b> is returned directly. 2113f4cbf05Sopenharmony_ci * If the queue is not full, the push operation can be performed, 2123f4cbf05Sopenharmony_ci * one of the pop threads (blocked when the queue is empty) is woken up, 2133f4cbf05Sopenharmony_ci * and <b>true</b> is returned. 2143f4cbf05Sopenharmony_ci */ 2153f4cbf05Sopenharmony_ci virtual bool PushNoWait(T const& elem) 2163f4cbf05Sopenharmony_ci { 2173f4cbf05Sopenharmony_ci std::unique_lock<std::mutex> lock(mutexLock_); 2183f4cbf05Sopenharmony_ci if (queueT_.size() >= maxSize_) { 2193f4cbf05Sopenharmony_ci return false; 2203f4cbf05Sopenharmony_ci } 2213f4cbf05Sopenharmony_ci // Insert the element if the queue is not full. 2223f4cbf05Sopenharmony_ci queueT_.push(elem); 2233f4cbf05Sopenharmony_ci unfinishedTaskCount_++; 2243f4cbf05Sopenharmony_ci cvNotEmpty_.notify_one(); 2253f4cbf05Sopenharmony_ci return true; 2263f4cbf05Sopenharmony_ci } 2273f4cbf05Sopenharmony_ci 2283f4cbf05Sopenharmony_ci/** 2293f4cbf05Sopenharmony_ci * @brief Called to return the result when a task is complete. 2303f4cbf05Sopenharmony_ci * 2313f4cbf05Sopenharmony_ci * If the count of unfinished tasks < 1, <b>false</b> is returned directly. 2323f4cbf05Sopenharmony_ci * If the count of unfinished tasks = 1, all the threads blocked 2333f4cbf05Sopenharmony_ci * by calling Join() will be woken up, 2343f4cbf05Sopenharmony_ci * the count of unfinished tasks decrements by 1, and <b>true</b> is returned. 2353f4cbf05Sopenharmony_ci * If the count of unfinished tasks > 1, 2363f4cbf05Sopenharmony_ci * the count of unfinished tasks decrements by 1, and <b>true</b> is returned. 2373f4cbf05Sopenharmony_ci */ 2383f4cbf05Sopenharmony_ci bool OneTaskDone() 2393f4cbf05Sopenharmony_ci { 2403f4cbf05Sopenharmony_ci std::unique_lock<std::mutex> lock(mutexLock_); 2413f4cbf05Sopenharmony_ci int unfinished = unfinishedTaskCount_ - 1; 2423f4cbf05Sopenharmony_ci 2433f4cbf05Sopenharmony_ci if (unfinished <= 0) { 2443f4cbf05Sopenharmony_ci if (unfinished < 0) { 2453f4cbf05Sopenharmony_ci return false; // false mean call elem done too many times 2463f4cbf05Sopenharmony_ci } 2473f4cbf05Sopenharmony_ci cvAllTasksDone_.notify_all(); 2483f4cbf05Sopenharmony_ci } 2493f4cbf05Sopenharmony_ci 2503f4cbf05Sopenharmony_ci unfinishedTaskCount_ = unfinished; 2513f4cbf05Sopenharmony_ci return true; 2523f4cbf05Sopenharmony_ci } 2533f4cbf05Sopenharmony_ci 2543f4cbf05Sopenharmony_ci/** 2553f4cbf05Sopenharmony_ci * @brief Waits for all tasks to complete. 2563f4cbf05Sopenharmony_ci * 2573f4cbf05Sopenharmony_ci * If there is any task not completed, the current thread will be 2583f4cbf05Sopenharmony_ci * blocked even if it is just woken up. 2593f4cbf05Sopenharmony_ci */ 2603f4cbf05Sopenharmony_ci void Join() 2613f4cbf05Sopenharmony_ci { 2623f4cbf05Sopenharmony_ci std::unique_lock<std::mutex> lock(mutexLock_); 2633f4cbf05Sopenharmony_ci cvAllTasksDone_.wait(lock, [&] { return unfinishedTaskCount_ == 0; }); 2643f4cbf05Sopenharmony_ci } 2653f4cbf05Sopenharmony_ci 2663f4cbf05Sopenharmony_ci/** 2673f4cbf05Sopenharmony_ci * @brief Obtains the number of unfinished tasks. 2683f4cbf05Sopenharmony_ci */ 2693f4cbf05Sopenharmony_ci int GetUnfinishTaskNum() 2703f4cbf05Sopenharmony_ci { 2713f4cbf05Sopenharmony_ci return unfinishedTaskCount_; 2723f4cbf05Sopenharmony_ci } 2733f4cbf05Sopenharmony_ci 2743f4cbf05Sopenharmony_ciprotected: 2753f4cbf05Sopenharmony_ci using SafeBlockQueue<T>::maxSize_; 2763f4cbf05Sopenharmony_ci using SafeBlockQueue<T>::mutexLock_; 2773f4cbf05Sopenharmony_ci using SafeBlockQueue<T>::cvNotEmpty_; 2783f4cbf05Sopenharmony_ci using SafeBlockQueue<T>::cvNotFull_; 2793f4cbf05Sopenharmony_ci using SafeBlockQueue<T>::queueT_; 2803f4cbf05Sopenharmony_ci 2813f4cbf05Sopenharmony_ci std::atomic<int> unfinishedTaskCount_; 2823f4cbf05Sopenharmony_ci std::condition_variable cvAllTasksDone_; 2833f4cbf05Sopenharmony_ci}; 2843f4cbf05Sopenharmony_ci 2853f4cbf05Sopenharmony_ci} // namespace OHOS 2863f4cbf05Sopenharmony_ci 2873f4cbf05Sopenharmony_ci#endif 288