13f4cbf05Sopenharmony_ci/*
23f4cbf05Sopenharmony_ci * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
33f4cbf05Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
43f4cbf05Sopenharmony_ci * you may not use this file except in compliance with the License.
53f4cbf05Sopenharmony_ci * You may obtain a copy of the License at
63f4cbf05Sopenharmony_ci *
73f4cbf05Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
83f4cbf05Sopenharmony_ci *
93f4cbf05Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
103f4cbf05Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
113f4cbf05Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
123f4cbf05Sopenharmony_ci * See the License for the specific language governing permissions and
133f4cbf05Sopenharmony_ci * limitations under the License.
143f4cbf05Sopenharmony_ci */
153f4cbf05Sopenharmony_ci
163f4cbf05Sopenharmony_ci/**
173f4cbf05Sopenharmony_ci * @file safe_block_queue.h
183f4cbf05Sopenharmony_ci *
193f4cbf05Sopenharmony_ci * Provides interfaces for thread-safe blocking queues in c_utils.
203f4cbf05Sopenharmony_ci * The file includes the <b>SafeBlockQueue</b> class and
213f4cbf05Sopenharmony_ci * the <b>SafeBlockQueueTracking</b> class for trackable tasks.
223f4cbf05Sopenharmony_ci */
233f4cbf05Sopenharmony_ci
243f4cbf05Sopenharmony_ci#ifndef UTILS_BASE_BLOCK_QUEUE_H
253f4cbf05Sopenharmony_ci#define UTILS_BASE_BLOCK_QUEUE_H
263f4cbf05Sopenharmony_ci
273f4cbf05Sopenharmony_ci#include <climits>
283f4cbf05Sopenharmony_ci#include <condition_variable>
293f4cbf05Sopenharmony_ci#include <mutex>
303f4cbf05Sopenharmony_ci#include <queue>
313f4cbf05Sopenharmony_ci#include <atomic>
323f4cbf05Sopenharmony_ci
333f4cbf05Sopenharmony_cinamespace OHOS {
343f4cbf05Sopenharmony_ci
353f4cbf05Sopenharmony_ci/**
363f4cbf05Sopenharmony_ci * @brief Provides interfaces for thread-safe blocking queues.
373f4cbf05Sopenharmony_ci *
383f4cbf05Sopenharmony_ci * The interfaces can be used to perform blocking and non-blocking push and
393f4cbf05Sopenharmony_ci * pop operations on queues.
403f4cbf05Sopenharmony_ci */
413f4cbf05Sopenharmony_citemplate <typename T>
423f4cbf05Sopenharmony_ciclass SafeBlockQueue {
433f4cbf05Sopenharmony_cipublic:
443f4cbf05Sopenharmony_ci    explicit SafeBlockQueue(int capacity) : maxSize_(capacity)
453f4cbf05Sopenharmony_ci    {
463f4cbf05Sopenharmony_ci    }
473f4cbf05Sopenharmony_ci
483f4cbf05Sopenharmony_ci/**
493f4cbf05Sopenharmony_ci * @brief Inserts an element at the end of this queue in blocking mode.
503f4cbf05Sopenharmony_ci *
513f4cbf05Sopenharmony_ci * If the queue is full, the thread of the push operation will be blocked
523f4cbf05Sopenharmony_ci * until the queue has space.
533f4cbf05Sopenharmony_ci * If the queue is not full, the push operation can be performed and one of the
543f4cbf05Sopenharmony_ci * pop threads (blocked when the queue is empty) is woken up.
553f4cbf05Sopenharmony_ci *
563f4cbf05Sopenharmony_ci * @param elem Indicates the element to insert.
573f4cbf05Sopenharmony_ci */
583f4cbf05Sopenharmony_ci    virtual void Push(T const& elem)
593f4cbf05Sopenharmony_ci    {
603f4cbf05Sopenharmony_ci        std::unique_lock<std::mutex> lock(mutexLock_);
613f4cbf05Sopenharmony_ci        while (queueT_.size() >= maxSize_) {
623f4cbf05Sopenharmony_ci            // If the queue is full, wait for jobs to be taken.
633f4cbf05Sopenharmony_ci            cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); });
643f4cbf05Sopenharmony_ci        }
653f4cbf05Sopenharmony_ci
663f4cbf05Sopenharmony_ci        // Insert the element into the queue if the queue is not full.
673f4cbf05Sopenharmony_ci        queueT_.push(elem);
683f4cbf05Sopenharmony_ci        cvNotEmpty_.notify_one();
693f4cbf05Sopenharmony_ci    }
703f4cbf05Sopenharmony_ci
713f4cbf05Sopenharmony_ci/**
723f4cbf05Sopenharmony_ci * @brief Removes the first element from this queue in blocking mode.
733f4cbf05Sopenharmony_ci *
743f4cbf05Sopenharmony_ci * If the queue is empty, the thread of the pop operation will be blocked
753f4cbf05Sopenharmony_ci * until the queue has elements.
763f4cbf05Sopenharmony_ci * If the queue is not empty, the pop operation can be performed, the first
773f4cbf05Sopenharmony_ci * element of the queue is returned, and one of the push threads (blocked
783f4cbf05Sopenharmony_ci * when the queue is full) is woken up.
793f4cbf05Sopenharmony_ci */
803f4cbf05Sopenharmony_ci    T Pop()
813f4cbf05Sopenharmony_ci    {
823f4cbf05Sopenharmony_ci        std::unique_lock<std::mutex> lock(mutexLock_);
833f4cbf05Sopenharmony_ci
843f4cbf05Sopenharmony_ci        while (queueT_.empty()) {
853f4cbf05Sopenharmony_ci            // If the queue is empty, wait for elements to be pushed in.
863f4cbf05Sopenharmony_ci            cvNotEmpty_.wait(lock, [&] { return !queueT_.empty(); });
873f4cbf05Sopenharmony_ci        }
883f4cbf05Sopenharmony_ci
893f4cbf05Sopenharmony_ci        T elem = queueT_.front();
903f4cbf05Sopenharmony_ci        queueT_.pop();
913f4cbf05Sopenharmony_ci        cvNotFull_.notify_one();
923f4cbf05Sopenharmony_ci        return elem;
933f4cbf05Sopenharmony_ci    }
943f4cbf05Sopenharmony_ci
953f4cbf05Sopenharmony_ci/**
963f4cbf05Sopenharmony_ci * @brief Inserts an element at the end of this queue in non-blocking mode.
973f4cbf05Sopenharmony_ci *
983f4cbf05Sopenharmony_ci * If the queue is full, <b>false</b> is returned directly.
993f4cbf05Sopenharmony_ci * If the queue is not full, the push operation can be performed, one of the
1003f4cbf05Sopenharmony_ci * pop threads (blocked when the queue is empty) is woken up, and <b>true</b>
1013f4cbf05Sopenharmony_ci * is returned.
1023f4cbf05Sopenharmony_ci *
1033f4cbf05Sopenharmony_ci * @param elem Indicates the element to insert.
1043f4cbf05Sopenharmony_ci */
1053f4cbf05Sopenharmony_ci    virtual bool PushNoWait(T const& elem)
1063f4cbf05Sopenharmony_ci    {
1073f4cbf05Sopenharmony_ci        std::unique_lock<std::mutex> lock(mutexLock_);
1083f4cbf05Sopenharmony_ci        if (queueT_.size() >= maxSize_) {
1093f4cbf05Sopenharmony_ci            return false;
1103f4cbf05Sopenharmony_ci        }
1113f4cbf05Sopenharmony_ci        // Insert the element if the queue is not full.
1123f4cbf05Sopenharmony_ci        queueT_.push(elem);
1133f4cbf05Sopenharmony_ci        cvNotEmpty_.notify_one();
1143f4cbf05Sopenharmony_ci        return true;
1153f4cbf05Sopenharmony_ci    }
1163f4cbf05Sopenharmony_ci
1173f4cbf05Sopenharmony_ci/**
1183f4cbf05Sopenharmony_ci * @brief Removes the first element from this queue in non-blocking mode.
1193f4cbf05Sopenharmony_ci *
1203f4cbf05Sopenharmony_ci * If the queue is empty, <b>false</b> is returned directly.
1213f4cbf05Sopenharmony_ci * If the queue is not empty, the pop operation can be performed, one of the
1223f4cbf05Sopenharmony_ci * push threads (blocked when the queue is full) is woken up, and <b>true</b>
1233f4cbf05Sopenharmony_ci * is returned.
1243f4cbf05Sopenharmony_ci *
1253f4cbf05Sopenharmony_ci * @param outtask Indicates the data of the pop operation.
1263f4cbf05Sopenharmony_ci */
1273f4cbf05Sopenharmony_ci    bool PopNotWait(T& outtask)
1283f4cbf05Sopenharmony_ci    {
1293f4cbf05Sopenharmony_ci        std::unique_lock<std::mutex> lock(mutexLock_);
1303f4cbf05Sopenharmony_ci        if (queueT_.empty()) {
1313f4cbf05Sopenharmony_ci            return false;
1323f4cbf05Sopenharmony_ci        }
1333f4cbf05Sopenharmony_ci        outtask = queueT_.front();
1343f4cbf05Sopenharmony_ci        queueT_.pop();
1353f4cbf05Sopenharmony_ci
1363f4cbf05Sopenharmony_ci        cvNotFull_.notify_one();
1373f4cbf05Sopenharmony_ci
1383f4cbf05Sopenharmony_ci        return true;
1393f4cbf05Sopenharmony_ci    }
1403f4cbf05Sopenharmony_ci
1413f4cbf05Sopenharmony_ci    unsigned int Size()
1423f4cbf05Sopenharmony_ci    {
1433f4cbf05Sopenharmony_ci        std::unique_lock<std::mutex> lock(mutexLock_);
1443f4cbf05Sopenharmony_ci        return queueT_.size();
1453f4cbf05Sopenharmony_ci    }
1463f4cbf05Sopenharmony_ci
1473f4cbf05Sopenharmony_ci    bool IsEmpty()
1483f4cbf05Sopenharmony_ci    {
1493f4cbf05Sopenharmony_ci        std::unique_lock<std::mutex> lock(mutexLock_);
1503f4cbf05Sopenharmony_ci        return queueT_.empty();
1513f4cbf05Sopenharmony_ci    }
1523f4cbf05Sopenharmony_ci
1533f4cbf05Sopenharmony_ci    bool IsFull()
1543f4cbf05Sopenharmony_ci    {
1553f4cbf05Sopenharmony_ci        std::unique_lock<std::mutex> lock(mutexLock_);
1563f4cbf05Sopenharmony_ci        return queueT_.size() == maxSize_;
1573f4cbf05Sopenharmony_ci    }
1583f4cbf05Sopenharmony_ci
1593f4cbf05Sopenharmony_ci    virtual ~SafeBlockQueue() {}
1603f4cbf05Sopenharmony_ci
1613f4cbf05Sopenharmony_ciprotected:
1623f4cbf05Sopenharmony_ci    unsigned long maxSize_;  // Capacity of the queue
1633f4cbf05Sopenharmony_ci    std::mutex mutexLock_;
1643f4cbf05Sopenharmony_ci    std::condition_variable cvNotEmpty_;
1653f4cbf05Sopenharmony_ci    std::condition_variable cvNotFull_;
1663f4cbf05Sopenharmony_ci    std::queue<T> queueT_;
1673f4cbf05Sopenharmony_ci};
1683f4cbf05Sopenharmony_ci
1693f4cbf05Sopenharmony_ci/**
1703f4cbf05Sopenharmony_ci * @brief Provides interfaces for operating the thread-safe blocking queues
1713f4cbf05Sopenharmony_ci * and tracking the number of pending tasks.
1723f4cbf05Sopenharmony_ci * This class inherits from <b>SafeBlockQueue</b>.
1733f4cbf05Sopenharmony_ci */
1743f4cbf05Sopenharmony_citemplate <typename T>
1753f4cbf05Sopenharmony_ciclass SafeBlockQueueTracking : public SafeBlockQueue<T> {
1763f4cbf05Sopenharmony_cipublic:
1773f4cbf05Sopenharmony_ci    explicit SafeBlockQueueTracking(int capacity) : SafeBlockQueue<T>(capacity)
1783f4cbf05Sopenharmony_ci    {
1793f4cbf05Sopenharmony_ci        unfinishedTaskCount_ = 0;
1803f4cbf05Sopenharmony_ci    }
1813f4cbf05Sopenharmony_ci
1823f4cbf05Sopenharmony_ci    virtual ~SafeBlockQueueTracking() {}
1833f4cbf05Sopenharmony_ci
1843f4cbf05Sopenharmony_ci/**
1853f4cbf05Sopenharmony_ci * @brief Inserts an element at the end of this queue in blocking mode.
1863f4cbf05Sopenharmony_ci *
1873f4cbf05Sopenharmony_ci * If the queue is full, the thread of the push operation will be blocked
1883f4cbf05Sopenharmony_ci * until the queue has space.
1893f4cbf05Sopenharmony_ci * If the queue is not full, the push operation can be performed and one of the
1903f4cbf05Sopenharmony_ci * pop threads (blocked when the queue is empty) is woken up.
1913f4cbf05Sopenharmony_ci */
1923f4cbf05Sopenharmony_ci    virtual void Push(T const& elem)
1933f4cbf05Sopenharmony_ci    {
1943f4cbf05Sopenharmony_ci        unfinishedTaskCount_++;
1953f4cbf05Sopenharmony_ci        std::unique_lock<std::mutex> lock(mutexLock_);
1963f4cbf05Sopenharmony_ci        while (queueT_.size() >= maxSize_) {
1973f4cbf05Sopenharmony_ci            // If the queue is full, wait for jobs to be taken.
1983f4cbf05Sopenharmony_ci            cvNotFull_.wait(lock, [&]() { return (queueT_.size() < maxSize_); });
1993f4cbf05Sopenharmony_ci        }
2003f4cbf05Sopenharmony_ci
2013f4cbf05Sopenharmony_ci        // If the queue is not full, insert the element.
2023f4cbf05Sopenharmony_ci        queueT_.push(elem);
2033f4cbf05Sopenharmony_ci
2043f4cbf05Sopenharmony_ci        cvNotEmpty_.notify_one();
2053f4cbf05Sopenharmony_ci    }
2063f4cbf05Sopenharmony_ci
2073f4cbf05Sopenharmony_ci/**
2083f4cbf05Sopenharmony_ci * @brief Inserts an element at the end of this queue in non-blocking mode.
2093f4cbf05Sopenharmony_ci *
2103f4cbf05Sopenharmony_ci * If the queue is full, <b>false</b> is returned directly.
2113f4cbf05Sopenharmony_ci * If the queue is not full, the push operation can be performed,
2123f4cbf05Sopenharmony_ci * one of the pop threads (blocked when the queue is empty) is woken up,
2133f4cbf05Sopenharmony_ci * and <b>true</b> is returned.
2143f4cbf05Sopenharmony_ci */
2153f4cbf05Sopenharmony_ci    virtual bool PushNoWait(T const& elem)
2163f4cbf05Sopenharmony_ci    {
2173f4cbf05Sopenharmony_ci        std::unique_lock<std::mutex> lock(mutexLock_);
2183f4cbf05Sopenharmony_ci        if (queueT_.size() >= maxSize_) {
2193f4cbf05Sopenharmony_ci            return false;
2203f4cbf05Sopenharmony_ci        }
2213f4cbf05Sopenharmony_ci        // Insert the element if the queue is not full.
2223f4cbf05Sopenharmony_ci        queueT_.push(elem);
2233f4cbf05Sopenharmony_ci        unfinishedTaskCount_++;
2243f4cbf05Sopenharmony_ci        cvNotEmpty_.notify_one();
2253f4cbf05Sopenharmony_ci        return true;
2263f4cbf05Sopenharmony_ci    }
2273f4cbf05Sopenharmony_ci
2283f4cbf05Sopenharmony_ci/**
2293f4cbf05Sopenharmony_ci * @brief Called to return the result when a task is complete.
2303f4cbf05Sopenharmony_ci *
2313f4cbf05Sopenharmony_ci * If the count of unfinished tasks < 1, <b>false</b> is returned directly.
2323f4cbf05Sopenharmony_ci * If the count of unfinished tasks = 1, all the threads blocked
2333f4cbf05Sopenharmony_ci * by calling Join() will be woken up,
2343f4cbf05Sopenharmony_ci * the count of unfinished tasks decrements by 1, and <b>true</b> is returned.
2353f4cbf05Sopenharmony_ci * If the count of unfinished tasks > 1,
2363f4cbf05Sopenharmony_ci * the count of unfinished tasks decrements by 1, and <b>true</b> is returned.
2373f4cbf05Sopenharmony_ci */
2383f4cbf05Sopenharmony_ci    bool OneTaskDone()
2393f4cbf05Sopenharmony_ci    {
2403f4cbf05Sopenharmony_ci        std::unique_lock<std::mutex> lock(mutexLock_);
2413f4cbf05Sopenharmony_ci        int unfinished = unfinishedTaskCount_ - 1;
2423f4cbf05Sopenharmony_ci
2433f4cbf05Sopenharmony_ci        if (unfinished <= 0) {
2443f4cbf05Sopenharmony_ci            if (unfinished < 0) {
2453f4cbf05Sopenharmony_ci                return false; // false mean call elem done too many times
2463f4cbf05Sopenharmony_ci            }
2473f4cbf05Sopenharmony_ci            cvAllTasksDone_.notify_all();
2483f4cbf05Sopenharmony_ci        }
2493f4cbf05Sopenharmony_ci
2503f4cbf05Sopenharmony_ci        unfinishedTaskCount_ = unfinished;
2513f4cbf05Sopenharmony_ci        return true;
2523f4cbf05Sopenharmony_ci    }
2533f4cbf05Sopenharmony_ci
2543f4cbf05Sopenharmony_ci/**
2553f4cbf05Sopenharmony_ci * @brief Waits for all tasks to complete.
2563f4cbf05Sopenharmony_ci *
2573f4cbf05Sopenharmony_ci * If there is any task not completed, the current thread will be
2583f4cbf05Sopenharmony_ci * blocked even if it is just woken up.
2593f4cbf05Sopenharmony_ci */
2603f4cbf05Sopenharmony_ci    void Join()
2613f4cbf05Sopenharmony_ci    {
2623f4cbf05Sopenharmony_ci        std::unique_lock<std::mutex> lock(mutexLock_);
2633f4cbf05Sopenharmony_ci        cvAllTasksDone_.wait(lock, [&] { return unfinishedTaskCount_ == 0; });
2643f4cbf05Sopenharmony_ci    }
2653f4cbf05Sopenharmony_ci
2663f4cbf05Sopenharmony_ci/**
2673f4cbf05Sopenharmony_ci * @brief Obtains the number of unfinished tasks.
2683f4cbf05Sopenharmony_ci */
2693f4cbf05Sopenharmony_ci    int GetUnfinishTaskNum()
2703f4cbf05Sopenharmony_ci    {
2713f4cbf05Sopenharmony_ci        return unfinishedTaskCount_;
2723f4cbf05Sopenharmony_ci    }
2733f4cbf05Sopenharmony_ci
2743f4cbf05Sopenharmony_ciprotected:
2753f4cbf05Sopenharmony_ci    using SafeBlockQueue<T>::maxSize_;
2763f4cbf05Sopenharmony_ci    using SafeBlockQueue<T>::mutexLock_;
2773f4cbf05Sopenharmony_ci    using SafeBlockQueue<T>::cvNotEmpty_;
2783f4cbf05Sopenharmony_ci    using SafeBlockQueue<T>::cvNotFull_;
2793f4cbf05Sopenharmony_ci    using SafeBlockQueue<T>::queueT_;
2803f4cbf05Sopenharmony_ci
2813f4cbf05Sopenharmony_ci    std::atomic<int> unfinishedTaskCount_;
2823f4cbf05Sopenharmony_ci    std::condition_variable cvAllTasksDone_;
2833f4cbf05Sopenharmony_ci};
2843f4cbf05Sopenharmony_ci
2853f4cbf05Sopenharmony_ci} // namespace OHOS
2863f4cbf05Sopenharmony_ci
2873f4cbf05Sopenharmony_ci#endif
288