1da853ecaSopenharmony_ci/* 2da853ecaSopenharmony_ci * Copyright (C) 2023 Huawei Device Co., Ltd. 3da853ecaSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4da853ecaSopenharmony_ci * you may not use this file except in compliance with the License. 5da853ecaSopenharmony_ci * You may obtain a copy of the License at 6da853ecaSopenharmony_ci * 7da853ecaSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8da853ecaSopenharmony_ci * 9da853ecaSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10da853ecaSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11da853ecaSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12da853ecaSopenharmony_ci * See the License for the specific language governing permissions and 13da853ecaSopenharmony_ci * limitations under the License. 14da853ecaSopenharmony_ci */ 15da853ecaSopenharmony_ci 16da853ecaSopenharmony_ci#ifndef UTILS_BLOCK_QUEUE_H 17da853ecaSopenharmony_ci#define UTILS_BLOCK_QUEUE_H 18da853ecaSopenharmony_ci#include <atomic> 19da853ecaSopenharmony_ci#include <mutex> 20da853ecaSopenharmony_ci#include <condition_variable> 21da853ecaSopenharmony_ci#include <queue> 22da853ecaSopenharmony_ci#include "avcodec_log.h" 23da853ecaSopenharmony_ci 24da853ecaSopenharmony_cinamespace OHOS { 25da853ecaSopenharmony_cinamespace MediaAVCodec { 26da853ecaSopenharmony_cinamespace { 27da853ecaSopenharmony_ciconstexpr size_t DEFAULT_QUEUE_SIZE = 10; 28da853ecaSopenharmony_ci} 29da853ecaSopenharmony_ci 30da853ecaSopenharmony_citemplate <typename T> 31da853ecaSopenharmony_ciclass BlockQueue { 32da853ecaSopenharmony_cipublic: 33da853ecaSopenharmony_ci explicit BlockQueue(std::string name, size_t capacity = DEFAULT_QUEUE_SIZE) 34da853ecaSopenharmony_ci : name_(std::move(name)), capacity_(capacity), isActive_(true) 35da853ecaSopenharmony_ci { 36da853ecaSopenharmony_ci } 37da853ecaSopenharmony_ci 38da853ecaSopenharmony_ci ~BlockQueue() = default; 39da853ecaSopenharmony_ci 40da853ecaSopenharmony_ci size_t Size() 41da853ecaSopenharmony_ci { 42da853ecaSopenharmony_ci std::lock_guard<std::mutex> lock(mutex_); 43da853ecaSopenharmony_ci return que_.size(); 44da853ecaSopenharmony_ci } 45da853ecaSopenharmony_ci 46da853ecaSopenharmony_ci size_t Capacity() 47da853ecaSopenharmony_ci { 48da853ecaSopenharmony_ci return capacity_; 49da853ecaSopenharmony_ci } 50da853ecaSopenharmony_ci 51da853ecaSopenharmony_ci bool Empty() 52da853ecaSopenharmony_ci { 53da853ecaSopenharmony_ci std::lock_guard<std::mutex> lock(mutex_); 54da853ecaSopenharmony_ci return que_.empty(); 55da853ecaSopenharmony_ci } 56da853ecaSopenharmony_ci 57da853ecaSopenharmony_ci bool Push(const T& block) 58da853ecaSopenharmony_ci { 59da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s Push enter.", name_.c_str()); 60da853ecaSopenharmony_ci std::unique_lock<std::mutex> lock(mutex_); 61da853ecaSopenharmony_ci if (!isActive_) { 62da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s is inactive for Push.", name_.c_str()); 63da853ecaSopenharmony_ci return false; 64da853ecaSopenharmony_ci } 65da853ecaSopenharmony_ci if (que_.size() >= capacity_) { 66da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s is full, please waiting for Pop.", name_.c_str()); 67da853ecaSopenharmony_ci condFull_.wait(lock, [this] { return !isActive_ || que_.size() < capacity_; }); 68da853ecaSopenharmony_ci } 69da853ecaSopenharmony_ci if (!isActive_) { 70da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, isFull: %{public}d.", 71da853ecaSopenharmony_ci name_.c_str(), isActive_.load(), que_.size() < capacity_); 72da853ecaSopenharmony_ci return false; 73da853ecaSopenharmony_ci } 74da853ecaSopenharmony_ci que_.push(block); 75da853ecaSopenharmony_ci condEmpty_.notify_one(); 76da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s Push ok.", name_.c_str()); 77da853ecaSopenharmony_ci return true; 78da853ecaSopenharmony_ci } 79da853ecaSopenharmony_ci 80da853ecaSopenharmony_ci T Pop() 81da853ecaSopenharmony_ci { 82da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s Pop enter.", name_.c_str()); 83da853ecaSopenharmony_ci std::unique_lock<std::mutex> lock(mutex_); 84da853ecaSopenharmony_ci if (que_.empty() && !isActive_) { 85da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s is inactive for Pop.", name_.c_str()); 86da853ecaSopenharmony_ci return {}; 87da853ecaSopenharmony_ci } else if (que_.empty() && isActive_) { 88da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s is empty, please waiting for Push.", name_.c_str()); 89da853ecaSopenharmony_ci condEmpty_.wait(lock, [this] { return !isActive_ || !que_.empty(); }); 90da853ecaSopenharmony_ci } 91da853ecaSopenharmony_ci if (que_.empty()) { 92da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, size: %{public}zu.", 93da853ecaSopenharmony_ci name_.c_str(), isActive_.load(), que_.size()); 94da853ecaSopenharmony_ci return {}; 95da853ecaSopenharmony_ci } 96da853ecaSopenharmony_ci T element = que_.front(); 97da853ecaSopenharmony_ci que_.pop(); 98da853ecaSopenharmony_ci condFull_.notify_one(); 99da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s Pop ok.", name_.c_str()); 100da853ecaSopenharmony_ci return element; 101da853ecaSopenharmony_ci } 102da853ecaSopenharmony_ci 103da853ecaSopenharmony_ci T Front() 104da853ecaSopenharmony_ci { 105da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s Front enter.", name_.c_str()); 106da853ecaSopenharmony_ci std::unique_lock<std::mutex> lock(mutex_); 107da853ecaSopenharmony_ci if (que_.empty() && !isActive_) { 108da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s is inactive for Front.", name_.c_str()); 109da853ecaSopenharmony_ci return {}; 110da853ecaSopenharmony_ci } else if (que_.empty() && isActive_) { 111da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s is empty, please waiting for Push.", name_.c_str()); 112da853ecaSopenharmony_ci condEmpty_.wait(lock, [this] { return !isActive_ || !que_.empty(); }); 113da853ecaSopenharmony_ci } 114da853ecaSopenharmony_ci if (que_.empty()) { 115da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s: inactive: %{public}d, size: %{public}zu.", 116da853ecaSopenharmony_ci name_.c_str(), isActive_.load(), que_.size()); 117da853ecaSopenharmony_ci return {}; 118da853ecaSopenharmony_ci } 119da853ecaSopenharmony_ci T element = que_.front(); 120da853ecaSopenharmony_ci condFull_.notify_one(); 121da853ecaSopenharmony_ci AVCODEC_LOGD("block queue %{public}s Front ok.", name_.c_str()); 122da853ecaSopenharmony_ci return element; 123da853ecaSopenharmony_ci } 124da853ecaSopenharmony_ci 125da853ecaSopenharmony_ci void Clear() 126da853ecaSopenharmony_ci { 127da853ecaSopenharmony_ci std::lock_guard<std::mutex> lock(mutex_); 128da853ecaSopenharmony_ci ClearUnprotected(); 129da853ecaSopenharmony_ci } 130da853ecaSopenharmony_ci 131da853ecaSopenharmony_ci void SetActive(bool active, bool cleanData = true) 132da853ecaSopenharmony_ci { 133da853ecaSopenharmony_ci std::lock_guard<std::mutex> lock(mutex_); 134da853ecaSopenharmony_ci AVCODEC_LOGD("SetActive %{public}s: %{public}d.", name_.c_str(), isActive_.load()); 135da853ecaSopenharmony_ci isActive_ = active; 136da853ecaSopenharmony_ci if (!active) { 137da853ecaSopenharmony_ci if (cleanData) { 138da853ecaSopenharmony_ci ClearUnprotected(); 139da853ecaSopenharmony_ci } 140da853ecaSopenharmony_ci condEmpty_.notify_one(); 141da853ecaSopenharmony_ci } 142da853ecaSopenharmony_ci } 143da853ecaSopenharmony_ci 144da853ecaSopenharmony_ciprivate: 145da853ecaSopenharmony_ci void ClearUnprotected() 146da853ecaSopenharmony_ci { 147da853ecaSopenharmony_ci if (que_.empty()) { 148da853ecaSopenharmony_ci return; 149da853ecaSopenharmony_ci } 150da853ecaSopenharmony_ci bool needNotify = que_.size() == capacity_; 151da853ecaSopenharmony_ci std::queue<T>().swap(que_); 152da853ecaSopenharmony_ci if (needNotify) { 153da853ecaSopenharmony_ci condFull_.notify_one(); 154da853ecaSopenharmony_ci } 155da853ecaSopenharmony_ci } 156da853ecaSopenharmony_ci 157da853ecaSopenharmony_ci std::mutex mutex_; 158da853ecaSopenharmony_ci std::condition_variable condFull_; 159da853ecaSopenharmony_ci std::condition_variable condEmpty_; 160da853ecaSopenharmony_ci std::queue<T> que_; 161da853ecaSopenharmony_ci std::string name_; 162da853ecaSopenharmony_ci const size_t capacity_; 163da853ecaSopenharmony_ci std::atomic<bool> isActive_; 164da853ecaSopenharmony_ci const OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_FRAMEWORK, "BlockQueue"}; 165da853ecaSopenharmony_ci}; 166da853ecaSopenharmony_ci} // namespace MediaAVCodec 167da853ecaSopenharmony_ci} // namespace OHOS 168da853ecaSopenharmony_ci#endif // !UTILS_BLOCK_QUEUE_H 169