1/*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef UTILS_BLOCK_QUEUE_H
17#define UTILS_BLOCK_QUEUE_H
18#include <atomic>
19#include <mutex>
20#include <condition_variable>
21#include <queue>
22#include "avcodec_log.h"
23
24namespace OHOS {
25namespace MediaAVCodec {
26namespace {
27constexpr size_t DEFAULT_QUEUE_SIZE = 10;
28}
29
30template <typename T>
31class BlockQueue {
32public:
33    explicit BlockQueue(std::string name, size_t capacity = DEFAULT_QUEUE_SIZE)
34        : name_(std::move(name)), capacity_(capacity), isActive_(true)
35    {
36    }
37
38    ~BlockQueue() = default;
39
40    size_t Size()
41    {
42        std::lock_guard<std::mutex> lock(mutex_);
43        return que_.size();
44    }
45
46    size_t Capacity()
47    {
48        return capacity_;
49    }
50
51    bool Empty()
52    {
53        std::lock_guard<std::mutex> lock(mutex_);
54        return que_.empty();
55    }
56
57    bool Push(const T& block)
58    {
59        AVCODEC_LOGD("block queue %{public}s Push enter.", name_.c_str());
60        std::unique_lock<std::mutex> lock(mutex_);
61        if (!isActive_) {
62            AVCODEC_LOGD("block queue %{public}s is inactive for Push.", name_.c_str());
63            return false;
64        }
65        if (que_.size() >= capacity_) {
66            AVCODEC_LOGD("block queue %{public}s is full, please waiting for Pop.", name_.c_str());
67            condFull_.wait(lock, [this] { return !isActive_ || que_.size() < capacity_; });
68        }
69        if (!isActive_) {
70            AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, isFull: %{public}d.",
71                name_.c_str(), isActive_.load(), que_.size() < capacity_);
72            return false;
73        }
74        que_.push(block);
75        condEmpty_.notify_one();
76        AVCODEC_LOGD("block queue %{public}s Push ok.", name_.c_str());
77        return true;
78    }
79
80    T Pop()
81    {
82        AVCODEC_LOGD("block queue %{public}s Pop enter.", name_.c_str());
83        std::unique_lock<std::mutex> lock(mutex_);
84        if (que_.empty() && !isActive_) {
85            AVCODEC_LOGD("block queue %{public}s is inactive for Pop.", name_.c_str());
86            return {};
87        } else if (que_.empty() && isActive_) {
88            AVCODEC_LOGD("block queue %{public}s is empty, please waiting for Push.", name_.c_str());
89            condEmpty_.wait(lock, [this] { return !isActive_ || !que_.empty(); });
90        }
91        if (que_.empty()) {
92            AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, size: %{public}zu.",
93                name_.c_str(), isActive_.load(), que_.size());
94            return {};
95        }
96        T element = que_.front();
97        que_.pop();
98        condFull_.notify_one();
99        AVCODEC_LOGD("block queue %{public}s Pop ok.", name_.c_str());
100        return element;
101    }
102
103    T Front()
104    {
105        AVCODEC_LOGD("block queue %{public}s Front enter.", name_.c_str());
106        std::unique_lock<std::mutex> lock(mutex_);
107        if (que_.empty() && !isActive_) {
108            AVCODEC_LOGD("block queue %{public}s is inactive for Front.", name_.c_str());
109            return {};
110        } else if (que_.empty() && isActive_) {
111            AVCODEC_LOGD("block queue %{public}s is empty, please waiting for Push.", name_.c_str());
112            condEmpty_.wait(lock, [this] { return !isActive_ || !que_.empty(); });
113        }
114        if (que_.empty()) {
115            AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, size: %{public}zu.",
116                name_.c_str(), isActive_.load(), que_.size());
117            return {};
118        }
119        T element = que_.front();
120        condFull_.notify_one();
121        AVCODEC_LOGD("block queue %{public}s Front ok.", name_.c_str());
122        return element;
123    }
124
125    void Clear()
126    {
127        std::lock_guard<std::mutex> lock(mutex_);
128        ClearUnprotected();
129    }
130
131    void SetActive(bool active, bool cleanData = true)
132    {
133        std::lock_guard<std::mutex> lock(mutex_);
134        AVCODEC_LOGD("SetActive %{public}s: %{public}d.", name_.c_str(), isActive_.load());
135        isActive_ = active;
136        if (!active) {
137            if (cleanData) {
138                ClearUnprotected();
139            }
140            condEmpty_.notify_one();
141        }
142    }
143
144private:
145    void ClearUnprotected()
146    {
147        if (que_.empty()) {
148            return;
149        }
150        bool needNotify = que_.size() == capacity_;
151        std::queue<T>().swap(que_);
152        if (needNotify) {
153            condFull_.notify_one();
154        }
155    }
156
157    std::mutex mutex_;
158    std::condition_variable condFull_;
159    std::condition_variable condEmpty_;
160    std::queue<T> que_;
161    std::string name_;
162    const size_t capacity_;
163    std::atomic<bool> isActive_;
164    const OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_FRAMEWORK, "BlockQueue"};
165};
166} // namespace MediaAVCodec
167} // namespace OHOS
168#endif // !UTILS_BLOCK_QUEUE_H
169