1da853ecaSopenharmony_ci/*
2da853ecaSopenharmony_ci * Copyright (C) 2023 Huawei Device Co., Ltd.
3da853ecaSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4da853ecaSopenharmony_ci * you may not use this file except in compliance with the License.
5da853ecaSopenharmony_ci * You may obtain a copy of the License at
6da853ecaSopenharmony_ci *
7da853ecaSopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8da853ecaSopenharmony_ci *
9da853ecaSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10da853ecaSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11da853ecaSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12da853ecaSopenharmony_ci * See the License for the specific language governing permissions and
13da853ecaSopenharmony_ci * limitations under the License.
14da853ecaSopenharmony_ci */
15da853ecaSopenharmony_ci
16da853ecaSopenharmony_ci#ifndef UTILS_BLOCK_QUEUE_H
17da853ecaSopenharmony_ci#define UTILS_BLOCK_QUEUE_H
18da853ecaSopenharmony_ci#include <atomic>
19da853ecaSopenharmony_ci#include <mutex>
20da853ecaSopenharmony_ci#include <condition_variable>
21da853ecaSopenharmony_ci#include <queue>
22da853ecaSopenharmony_ci#include "avcodec_log.h"
23da853ecaSopenharmony_ci
24da853ecaSopenharmony_cinamespace OHOS {
25da853ecaSopenharmony_cinamespace MediaAVCodec {
26da853ecaSopenharmony_cinamespace {
27da853ecaSopenharmony_ciconstexpr size_t DEFAULT_QUEUE_SIZE = 10;
28da853ecaSopenharmony_ci}
29da853ecaSopenharmony_ci
30da853ecaSopenharmony_citemplate <typename T>
31da853ecaSopenharmony_ciclass BlockQueue {
32da853ecaSopenharmony_cipublic:
33da853ecaSopenharmony_ci    explicit BlockQueue(std::string name, size_t capacity = DEFAULT_QUEUE_SIZE)
34da853ecaSopenharmony_ci        : name_(std::move(name)), capacity_(capacity), isActive_(true)
35da853ecaSopenharmony_ci    {
36da853ecaSopenharmony_ci    }
37da853ecaSopenharmony_ci
38da853ecaSopenharmony_ci    ~BlockQueue() = default;
39da853ecaSopenharmony_ci
40da853ecaSopenharmony_ci    size_t Size()
41da853ecaSopenharmony_ci    {
42da853ecaSopenharmony_ci        std::lock_guard<std::mutex> lock(mutex_);
43da853ecaSopenharmony_ci        return que_.size();
44da853ecaSopenharmony_ci    }
45da853ecaSopenharmony_ci
46da853ecaSopenharmony_ci    size_t Capacity()
47da853ecaSopenharmony_ci    {
48da853ecaSopenharmony_ci        return capacity_;
49da853ecaSopenharmony_ci    }
50da853ecaSopenharmony_ci
51da853ecaSopenharmony_ci    bool Empty()
52da853ecaSopenharmony_ci    {
53da853ecaSopenharmony_ci        std::lock_guard<std::mutex> lock(mutex_);
54da853ecaSopenharmony_ci        return que_.empty();
55da853ecaSopenharmony_ci    }
56da853ecaSopenharmony_ci
57da853ecaSopenharmony_ci    bool Push(const T& block)
58da853ecaSopenharmony_ci    {
59da853ecaSopenharmony_ci        AVCODEC_LOGD("block queue %{public}s Push enter.", name_.c_str());
60da853ecaSopenharmony_ci        std::unique_lock<std::mutex> lock(mutex_);
61da853ecaSopenharmony_ci        if (!isActive_) {
62da853ecaSopenharmony_ci            AVCODEC_LOGD("block queue %{public}s is inactive for Push.", name_.c_str());
63da853ecaSopenharmony_ci            return false;
64da853ecaSopenharmony_ci        }
65da853ecaSopenharmony_ci        if (que_.size() >= capacity_) {
66da853ecaSopenharmony_ci            AVCODEC_LOGD("block queue %{public}s is full, please waiting for Pop.", name_.c_str());
67da853ecaSopenharmony_ci            condFull_.wait(lock, [this] { return !isActive_ || que_.size() < capacity_; });
68da853ecaSopenharmony_ci        }
69da853ecaSopenharmony_ci        if (!isActive_) {
70da853ecaSopenharmony_ci            AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, isFull: %{public}d.",
71da853ecaSopenharmony_ci                name_.c_str(), isActive_.load(), que_.size() < capacity_);
72da853ecaSopenharmony_ci            return false;
73da853ecaSopenharmony_ci        }
74da853ecaSopenharmony_ci        que_.push(block);
75da853ecaSopenharmony_ci        condEmpty_.notify_one();
76da853ecaSopenharmony_ci        AVCODEC_LOGD("block queue %{public}s Push ok.", name_.c_str());
77da853ecaSopenharmony_ci        return true;
78da853ecaSopenharmony_ci    }
79da853ecaSopenharmony_ci
80da853ecaSopenharmony_ci    T Pop()
81da853ecaSopenharmony_ci    {
82da853ecaSopenharmony_ci        AVCODEC_LOGD("block queue %{public}s Pop enter.", name_.c_str());
83da853ecaSopenharmony_ci        std::unique_lock<std::mutex> lock(mutex_);
84da853ecaSopenharmony_ci        if (que_.empty() && !isActive_) {
85da853ecaSopenharmony_ci            AVCODEC_LOGD("block queue %{public}s is inactive for Pop.", name_.c_str());
86da853ecaSopenharmony_ci            return {};
87da853ecaSopenharmony_ci        } else if (que_.empty() && isActive_) {
88da853ecaSopenharmony_ci            AVCODEC_LOGD("block queue %{public}s is empty, please waiting for Push.", name_.c_str());
89da853ecaSopenharmony_ci            condEmpty_.wait(lock, [this] { return !isActive_ || !que_.empty(); });
90da853ecaSopenharmony_ci        }
91da853ecaSopenharmony_ci        if (que_.empty()) {
92da853ecaSopenharmony_ci            AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, size: %{public}zu.",
93da853ecaSopenharmony_ci                name_.c_str(), isActive_.load(), que_.size());
94da853ecaSopenharmony_ci            return {};
95da853ecaSopenharmony_ci        }
96da853ecaSopenharmony_ci        T element = que_.front();
97da853ecaSopenharmony_ci        que_.pop();
98da853ecaSopenharmony_ci        condFull_.notify_one();
99da853ecaSopenharmony_ci        AVCODEC_LOGD("block queue %{public}s Pop ok.", name_.c_str());
100da853ecaSopenharmony_ci        return element;
101da853ecaSopenharmony_ci    }
102da853ecaSopenharmony_ci
103da853ecaSopenharmony_ci    T Front()
104da853ecaSopenharmony_ci    {
105da853ecaSopenharmony_ci        AVCODEC_LOGD("block queue %{public}s Front enter.", name_.c_str());
106da853ecaSopenharmony_ci        std::unique_lock<std::mutex> lock(mutex_);
107da853ecaSopenharmony_ci        if (que_.empty() && !isActive_) {
108da853ecaSopenharmony_ci            AVCODEC_LOGD("block queue %{public}s is inactive for Front.", name_.c_str());
109da853ecaSopenharmony_ci            return {};
110da853ecaSopenharmony_ci        } else if (que_.empty() && isActive_) {
111da853ecaSopenharmony_ci            AVCODEC_LOGD("block queue %{public}s is empty, please waiting for Push.", name_.c_str());
112da853ecaSopenharmony_ci            condEmpty_.wait(lock, [this] { return !isActive_ || !que_.empty(); });
113da853ecaSopenharmony_ci        }
114da853ecaSopenharmony_ci        if (que_.empty()) {
115da853ecaSopenharmony_ci            AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, size: %{public}zu.",
116da853ecaSopenharmony_ci                name_.c_str(), isActive_.load(), que_.size());
117da853ecaSopenharmony_ci            return {};
118da853ecaSopenharmony_ci        }
119da853ecaSopenharmony_ci        T element = que_.front();
120da853ecaSopenharmony_ci        condFull_.notify_one();
121da853ecaSopenharmony_ci        AVCODEC_LOGD("block queue %{public}s Front ok.", name_.c_str());
122da853ecaSopenharmony_ci        return element;
123da853ecaSopenharmony_ci    }
124da853ecaSopenharmony_ci
125da853ecaSopenharmony_ci    void Clear()
126da853ecaSopenharmony_ci    {
127da853ecaSopenharmony_ci        std::lock_guard<std::mutex> lock(mutex_);
128da853ecaSopenharmony_ci        ClearUnprotected();
129da853ecaSopenharmony_ci    }
130da853ecaSopenharmony_ci
131da853ecaSopenharmony_ci    void SetActive(bool active, bool cleanData = true)
132da853ecaSopenharmony_ci    {
133da853ecaSopenharmony_ci        std::lock_guard<std::mutex> lock(mutex_);
134da853ecaSopenharmony_ci        AVCODEC_LOGD("SetActive %{public}s: %{public}d.", name_.c_str(), isActive_.load());
135da853ecaSopenharmony_ci        isActive_ = active;
136da853ecaSopenharmony_ci        if (!active) {
137da853ecaSopenharmony_ci            if (cleanData) {
138da853ecaSopenharmony_ci                ClearUnprotected();
139da853ecaSopenharmony_ci            }
140da853ecaSopenharmony_ci            condEmpty_.notify_one();
141da853ecaSopenharmony_ci        }
142da853ecaSopenharmony_ci    }
143da853ecaSopenharmony_ci
144da853ecaSopenharmony_ciprivate:
145da853ecaSopenharmony_ci    void ClearUnprotected()
146da853ecaSopenharmony_ci    {
147da853ecaSopenharmony_ci        if (que_.empty()) {
148da853ecaSopenharmony_ci            return;
149da853ecaSopenharmony_ci        }
150da853ecaSopenharmony_ci        bool needNotify = que_.size() == capacity_;
151da853ecaSopenharmony_ci        std::queue<T>().swap(que_);
152da853ecaSopenharmony_ci        if (needNotify) {
153da853ecaSopenharmony_ci            condFull_.notify_one();
154da853ecaSopenharmony_ci        }
155da853ecaSopenharmony_ci    }
156da853ecaSopenharmony_ci
157da853ecaSopenharmony_ci    std::mutex mutex_;
158da853ecaSopenharmony_ci    std::condition_variable condFull_;
159da853ecaSopenharmony_ci    std::condition_variable condEmpty_;
160da853ecaSopenharmony_ci    std::queue<T> que_;
161da853ecaSopenharmony_ci    std::string name_;
162da853ecaSopenharmony_ci    const size_t capacity_;
163da853ecaSopenharmony_ci    std::atomic<bool> isActive_;
164da853ecaSopenharmony_ci    const OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_FRAMEWORK, "BlockQueue"};
165da853ecaSopenharmony_ci};
166da853ecaSopenharmony_ci} // namespace MediaAVCodec
167da853ecaSopenharmony_ci} // namespace OHOS
168da853ecaSopenharmony_ci#endif // !UTILS_BLOCK_QUEUE_H
169