1fa7767c5Sopenharmony_ci/* 2fa7767c5Sopenharmony_ci * Copyright (c) 2023-2023 Huawei Device Co., Ltd. 3fa7767c5Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4fa7767c5Sopenharmony_ci * you may not use this file except in compliance with the License. 5fa7767c5Sopenharmony_ci * You may obtain a copy of the License at 6fa7767c5Sopenharmony_ci * 7fa7767c5Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8fa7767c5Sopenharmony_ci * 9fa7767c5Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10fa7767c5Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11fa7767c5Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12fa7767c5Sopenharmony_ci * See the License for the specific language governing permissions and 13fa7767c5Sopenharmony_ci * limitations under the License. 14fa7767c5Sopenharmony_ci */ 15fa7767c5Sopenharmony_ci 16fa7767c5Sopenharmony_ci#define MEDIA_PIPELINE 17fa7767c5Sopenharmony_ci#define HST_LOG_TAG "Pipeline" 18fa7767c5Sopenharmony_ci 19fa7767c5Sopenharmony_ci#include <queue> 20fa7767c5Sopenharmony_ci#include <stack> 21fa7767c5Sopenharmony_ci#include "pipeline/pipeline.h" 22fa7767c5Sopenharmony_ci#include "osal/task/autolock.h" 23fa7767c5Sopenharmony_ci#include "osal/task/jobutils.h" 24fa7767c5Sopenharmony_ci#include "common/log.h" 25fa7767c5Sopenharmony_ci#include "osal/utils/hitrace_utils.h" 26fa7767c5Sopenharmony_ci 27fa7767c5Sopenharmony_cinamespace { 28fa7767c5Sopenharmony_ciconstexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "Pipeline" }; 29fa7767c5Sopenharmony_ci} 30fa7767c5Sopenharmony_ci 31fa7767c5Sopenharmony_cinamespace OHOS { 32fa7767c5Sopenharmony_cinamespace Media { 33fa7767c5Sopenharmony_cinamespace Pipeline { 34fa7767c5Sopenharmony_cistatic std::atomic<uint16_t> pipeLineId = 0; 35fa7767c5Sopenharmony_ci 36fa7767c5Sopenharmony_ciint32_t Pipeline::GetNextPipelineId() 37fa7767c5Sopenharmony_ci{ 38fa7767c5Sopenharmony_ci return pipeLineId++; 39fa7767c5Sopenharmony_ci} 40fa7767c5Sopenharmony_ci 41fa7767c5Sopenharmony_ciPipeline::~Pipeline() 42fa7767c5Sopenharmony_ci{ 43fa7767c5Sopenharmony_ci} 44fa7767c5Sopenharmony_ci 45fa7767c5Sopenharmony_civoid Pipeline::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback, 46fa7767c5Sopenharmony_ci const std::string& groupId) 47fa7767c5Sopenharmony_ci{ 48fa7767c5Sopenharmony_ci MEDIA_LOG_I("Pipeline::Init"); 49fa7767c5Sopenharmony_ci eventReceiver_ = receiver; 50fa7767c5Sopenharmony_ci filterCallback_ = callback; 51fa7767c5Sopenharmony_ci groupId_ = groupId; 52fa7767c5Sopenharmony_ci} 53fa7767c5Sopenharmony_ci 54fa7767c5Sopenharmony_ciStatus Pipeline::Prepare() 55fa7767c5Sopenharmony_ci{ 56fa7767c5Sopenharmony_ci MEDIA_LOG_I("Prepare enter."); 57fa7767c5Sopenharmony_ci Status ret = Status::OK; 58fa7767c5Sopenharmony_ci SubmitJobOnce([&] { 59fa7767c5Sopenharmony_ci AutoLock lock(mutex_); 60fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 61fa7767c5Sopenharmony_ci ret = (*it)->Prepare(); 62fa7767c5Sopenharmony_ci if (ret != Status::OK) { 63fa7767c5Sopenharmony_ci return; 64fa7767c5Sopenharmony_ci } 65fa7767c5Sopenharmony_ci } 66fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 67fa7767c5Sopenharmony_ci ret = (*it)->WaitAllState(FilterState::READY); 68fa7767c5Sopenharmony_ci if (ret != Status::OK) { 69fa7767c5Sopenharmony_ci return; 70fa7767c5Sopenharmony_ci } 71fa7767c5Sopenharmony_ci } 72fa7767c5Sopenharmony_ci }); 73fa7767c5Sopenharmony_ci MEDIA_LOG_I("Prepare done ret = %{public}d", ret); 74fa7767c5Sopenharmony_ci return ret; 75fa7767c5Sopenharmony_ci} 76fa7767c5Sopenharmony_ci 77fa7767c5Sopenharmony_ciStatus Pipeline::Start() 78fa7767c5Sopenharmony_ci{ 79fa7767c5Sopenharmony_ci MEDIA_LOG_I("Start enter."); 80fa7767c5Sopenharmony_ci Status ret = Status::OK; 81fa7767c5Sopenharmony_ci SubmitJobOnce([&] { 82fa7767c5Sopenharmony_ci AutoLock lock(mutex_); 83fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 84fa7767c5Sopenharmony_ci ret = (*it)->Start(); 85fa7767c5Sopenharmony_ci if (ret != Status::OK) { 86fa7767c5Sopenharmony_ci return; 87fa7767c5Sopenharmony_ci } 88fa7767c5Sopenharmony_ci } 89fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 90fa7767c5Sopenharmony_ci ret = (*it)->WaitAllState(FilterState::RUNNING); 91fa7767c5Sopenharmony_ci if (ret != Status::OK) { 92fa7767c5Sopenharmony_ci return; 93fa7767c5Sopenharmony_ci } 94fa7767c5Sopenharmony_ci } 95fa7767c5Sopenharmony_ci }); 96fa7767c5Sopenharmony_ci MEDIA_LOG_I("Start done ret = %{public}d", ret); 97fa7767c5Sopenharmony_ci return ret; 98fa7767c5Sopenharmony_ci} 99fa7767c5Sopenharmony_ci 100fa7767c5Sopenharmony_ciStatus Pipeline::Pause() 101fa7767c5Sopenharmony_ci{ 102fa7767c5Sopenharmony_ci MEDIA_LOG_I("Pause enter."); 103fa7767c5Sopenharmony_ci Status ret = Status::OK; 104fa7767c5Sopenharmony_ci SubmitJobOnce([&] { 105fa7767c5Sopenharmony_ci AutoLock lock(mutex_); 106fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 107fa7767c5Sopenharmony_ci auto rtv = (*it)->Pause(); 108fa7767c5Sopenharmony_ci if (rtv != Status::OK) { 109fa7767c5Sopenharmony_ci ret = rtv; 110fa7767c5Sopenharmony_ci } 111fa7767c5Sopenharmony_ci } 112fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 113fa7767c5Sopenharmony_ci auto rtv = (*it)->WaitAllState(FilterState::PAUSED); 114fa7767c5Sopenharmony_ci if (rtv != Status::OK) { 115fa7767c5Sopenharmony_ci ret = rtv; 116fa7767c5Sopenharmony_ci } 117fa7767c5Sopenharmony_ci } 118fa7767c5Sopenharmony_ci }); 119fa7767c5Sopenharmony_ci MEDIA_LOG_I("Pause done ret = %{public}d", ret); 120fa7767c5Sopenharmony_ci return ret; 121fa7767c5Sopenharmony_ci} 122fa7767c5Sopenharmony_ci 123fa7767c5Sopenharmony_ciStatus Pipeline::Resume() 124fa7767c5Sopenharmony_ci{ 125fa7767c5Sopenharmony_ci MEDIA_LOG_I("Resume enter."); 126fa7767c5Sopenharmony_ci Status ret = Status::OK; 127fa7767c5Sopenharmony_ci SubmitJobOnce([&] { 128fa7767c5Sopenharmony_ci AutoLock lock(mutex_); 129fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 130fa7767c5Sopenharmony_ci ret = (*it)->Resume(); 131fa7767c5Sopenharmony_ci if (ret != Status::OK) { 132fa7767c5Sopenharmony_ci return; 133fa7767c5Sopenharmony_ci } 134fa7767c5Sopenharmony_ci } 135fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 136fa7767c5Sopenharmony_ci ret = (*it)->WaitAllState(FilterState::RUNNING); 137fa7767c5Sopenharmony_ci if (ret != Status::OK) { 138fa7767c5Sopenharmony_ci return; 139fa7767c5Sopenharmony_ci } 140fa7767c5Sopenharmony_ci } 141fa7767c5Sopenharmony_ci }); 142fa7767c5Sopenharmony_ci MEDIA_LOG_I("Resume done ret = %{public}d", ret); 143fa7767c5Sopenharmony_ci return ret; 144fa7767c5Sopenharmony_ci} 145fa7767c5Sopenharmony_ci 146fa7767c5Sopenharmony_ciStatus Pipeline::Stop() 147fa7767c5Sopenharmony_ci{ 148fa7767c5Sopenharmony_ci MEDIA_LOG_I("Stop enter."); 149fa7767c5Sopenharmony_ci Status ret = Status::OK; 150fa7767c5Sopenharmony_ci SubmitJobOnce([&] { 151fa7767c5Sopenharmony_ci AutoLock lock(mutex_); 152fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 153fa7767c5Sopenharmony_ci if (*it == nullptr) { 154fa7767c5Sopenharmony_ci MEDIA_LOG_E("Pipeline error: " PUBLIC_LOG_ZU, filters_.size()); 155fa7767c5Sopenharmony_ci continue; 156fa7767c5Sopenharmony_ci } 157fa7767c5Sopenharmony_ci auto rtv = (*it)->Stop(); 158fa7767c5Sopenharmony_ci if (rtv != Status::OK) { 159fa7767c5Sopenharmony_ci ret = rtv; 160fa7767c5Sopenharmony_ci } 161fa7767c5Sopenharmony_ci } 162fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 163fa7767c5Sopenharmony_ci auto rtv = (*it)->WaitAllState(FilterState::STOPPED); 164fa7767c5Sopenharmony_ci if (rtv != Status::OK) { 165fa7767c5Sopenharmony_ci ret = rtv; 166fa7767c5Sopenharmony_ci } 167fa7767c5Sopenharmony_ci } 168fa7767c5Sopenharmony_ci filters_.clear(); 169fa7767c5Sopenharmony_ci }); 170fa7767c5Sopenharmony_ci MEDIA_LOG_I("Stop done ret = %{public}d", ret); 171fa7767c5Sopenharmony_ci return ret; 172fa7767c5Sopenharmony_ci} 173fa7767c5Sopenharmony_ci 174fa7767c5Sopenharmony_ciStatus Pipeline::Flush() 175fa7767c5Sopenharmony_ci{ 176fa7767c5Sopenharmony_ci MEDIA_LOG_I("Flush enter."); 177fa7767c5Sopenharmony_ci SubmitJobOnce([&] { 178fa7767c5Sopenharmony_ci AutoLock lock(mutex_); 179fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 180fa7767c5Sopenharmony_ci (*it)->Flush(); 181fa7767c5Sopenharmony_ci } 182fa7767c5Sopenharmony_ci }); 183fa7767c5Sopenharmony_ci MEDIA_LOG_I("Flush end."); 184fa7767c5Sopenharmony_ci return Status::OK; 185fa7767c5Sopenharmony_ci} 186fa7767c5Sopenharmony_ci 187fa7767c5Sopenharmony_ciStatus Pipeline::Release() 188fa7767c5Sopenharmony_ci{ 189fa7767c5Sopenharmony_ci MEDIA_LOG_I("Release enter."); 190fa7767c5Sopenharmony_ci SubmitJobOnce([&] { 191fa7767c5Sopenharmony_ci AutoLock lock(mutex_); 192fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 193fa7767c5Sopenharmony_ci (*it)->Release(); 194fa7767c5Sopenharmony_ci } 195fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 196fa7767c5Sopenharmony_ci (*it)->WaitAllState(FilterState::RELEASED); 197fa7767c5Sopenharmony_ci } 198fa7767c5Sopenharmony_ci filters_.clear(); 199fa7767c5Sopenharmony_ci }); 200fa7767c5Sopenharmony_ci MEDIA_LOG_I("Release done."); 201fa7767c5Sopenharmony_ci return Status::OK; 202fa7767c5Sopenharmony_ci} 203fa7767c5Sopenharmony_ci 204fa7767c5Sopenharmony_ciStatus Pipeline::Preroll(bool render) 205fa7767c5Sopenharmony_ci{ 206fa7767c5Sopenharmony_ci MEDIA_LOG_I("Preroll enter."); 207fa7767c5Sopenharmony_ci Status ret = Status::OK; 208fa7767c5Sopenharmony_ci AutoLock lock(mutex_); 209fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 210fa7767c5Sopenharmony_ci auto rtv = (*it)->Preroll(); 211fa7767c5Sopenharmony_ci if (rtv != Status::OK) { 212fa7767c5Sopenharmony_ci ret = rtv; 213fa7767c5Sopenharmony_ci MEDIA_LOG_I("Preroll done ret = %{public}d", ret); 214fa7767c5Sopenharmony_ci return ret; 215fa7767c5Sopenharmony_ci } 216fa7767c5Sopenharmony_ci } 217fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 218fa7767c5Sopenharmony_ci auto rtv = (*it)->WaitPrerollDone(render); 219fa7767c5Sopenharmony_ci if (rtv != Status::OK) { 220fa7767c5Sopenharmony_ci ret = rtv; 221fa7767c5Sopenharmony_ci MEDIA_LOG_I("Preroll done ret = %{public}d", ret); 222fa7767c5Sopenharmony_ci return ret; 223fa7767c5Sopenharmony_ci } 224fa7767c5Sopenharmony_ci } 225fa7767c5Sopenharmony_ci MEDIA_LOG_I("Preroll done ret = %{public}d", ret); 226fa7767c5Sopenharmony_ci return ret; 227fa7767c5Sopenharmony_ci} 228fa7767c5Sopenharmony_ci 229fa7767c5Sopenharmony_ciStatus Pipeline::SetPlayRange(int64_t start, int64_t end) 230fa7767c5Sopenharmony_ci{ 231fa7767c5Sopenharmony_ci MEDIA_LOG_I("SetPlayRange enter."); 232fa7767c5Sopenharmony_ci SubmitJobOnce([&] { 233fa7767c5Sopenharmony_ci AutoLock lock(mutex_); 234fa7767c5Sopenharmony_ci for (auto it = filters_.begin(); it != filters_.end(); ++it) { 235fa7767c5Sopenharmony_ci (*it)->SetPlayRange(start, end); 236fa7767c5Sopenharmony_ci } 237fa7767c5Sopenharmony_ci }); 238fa7767c5Sopenharmony_ci MEDIA_LOG_I("SetPlayRange done."); 239fa7767c5Sopenharmony_ci return Status::OK; 240fa7767c5Sopenharmony_ci} 241fa7767c5Sopenharmony_ci 242fa7767c5Sopenharmony_ciStatus Pipeline::AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn) 243fa7767c5Sopenharmony_ci{ 244fa7767c5Sopenharmony_ci MEDIA_LOG_I("AddHeadFilters enter."); 245fa7767c5Sopenharmony_ci std::vector<std::shared_ptr<Filter>> filtersToAdd; 246fa7767c5Sopenharmony_ci for (auto& filterIn : filtersIn) { 247fa7767c5Sopenharmony_ci bool matched = false; 248fa7767c5Sopenharmony_ci for (const auto& filter : filters_) { 249fa7767c5Sopenharmony_ci if (filterIn == filter) { 250fa7767c5Sopenharmony_ci matched = true; 251fa7767c5Sopenharmony_ci break; 252fa7767c5Sopenharmony_ci } 253fa7767c5Sopenharmony_ci } 254fa7767c5Sopenharmony_ci if (!matched) { 255fa7767c5Sopenharmony_ci filtersToAdd.push_back(filterIn); 256fa7767c5Sopenharmony_ci filterIn->LinkPipeLine(groupId_); 257fa7767c5Sopenharmony_ci } 258fa7767c5Sopenharmony_ci } 259fa7767c5Sopenharmony_ci if (filtersToAdd.empty()) { 260fa7767c5Sopenharmony_ci MEDIA_LOG_I("filter already exists"); 261fa7767c5Sopenharmony_ci return Status::OK; 262fa7767c5Sopenharmony_ci } 263fa7767c5Sopenharmony_ci SubmitJobOnce([&] { 264fa7767c5Sopenharmony_ci AutoLock lock(mutex_); 265fa7767c5Sopenharmony_ci this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end()); 266fa7767c5Sopenharmony_ci }); 267fa7767c5Sopenharmony_ci MEDIA_LOG_I("AddHeadFilters done."); 268fa7767c5Sopenharmony_ci return Status::OK; 269fa7767c5Sopenharmony_ci} 270fa7767c5Sopenharmony_ci 271fa7767c5Sopenharmony_ciStatus Pipeline::RemoveHeadFilter(const std::shared_ptr<Filter>& filter) 272fa7767c5Sopenharmony_ci{ 273fa7767c5Sopenharmony_ci SubmitJobOnce([&] { 274fa7767c5Sopenharmony_ci AutoLock lock(mutex_); 275fa7767c5Sopenharmony_ci auto it = std::find_if(filters_.begin(), filters_.end(), 276fa7767c5Sopenharmony_ci [&filter](const std::shared_ptr<Filter>& filterPtr) { return filterPtr == filter; }); 277fa7767c5Sopenharmony_ci if (it != filters_.end()) { 278fa7767c5Sopenharmony_ci filters_.erase(it); 279fa7767c5Sopenharmony_ci } 280fa7767c5Sopenharmony_ci filter->Release(); 281fa7767c5Sopenharmony_ci filter->WaitAllState(FilterState::RELEASED); 282fa7767c5Sopenharmony_ci filter->ClearAllNextFilters(); 283fa7767c5Sopenharmony_ci return Status::OK; 284fa7767c5Sopenharmony_ci }); 285fa7767c5Sopenharmony_ci return Status::OK; 286fa7767c5Sopenharmony_ci} 287fa7767c5Sopenharmony_ci 288fa7767c5Sopenharmony_ciStatus Pipeline::LinkFilters(const std::shared_ptr<Filter> &preFilter, 289fa7767c5Sopenharmony_ci const std::vector<std::shared_ptr<Filter>> &nextFilters, 290fa7767c5Sopenharmony_ci StreamType type) 291fa7767c5Sopenharmony_ci{ 292fa7767c5Sopenharmony_ci for (auto nextFilter : nextFilters) { 293fa7767c5Sopenharmony_ci auto ret = preFilter->LinkNext(nextFilter, type); 294fa7767c5Sopenharmony_ci nextFilter->LinkPipeLine(groupId_); 295fa7767c5Sopenharmony_ci FALSE_RETURN_V(ret == Status::OK, ret); 296fa7767c5Sopenharmony_ci } 297fa7767c5Sopenharmony_ci return Status::OK; 298fa7767c5Sopenharmony_ci} 299fa7767c5Sopenharmony_ci 300fa7767c5Sopenharmony_ciStatus Pipeline::UpdateFilters(const std::shared_ptr<Filter> &preFilter, 301fa7767c5Sopenharmony_ci const std::vector<std::shared_ptr<Filter>> &nextFilters, 302fa7767c5Sopenharmony_ci StreamType type) 303fa7767c5Sopenharmony_ci{ 304fa7767c5Sopenharmony_ci for (auto nextFilter : nextFilters) { 305fa7767c5Sopenharmony_ci preFilter->UpdateNext(nextFilter, type); 306fa7767c5Sopenharmony_ci } 307fa7767c5Sopenharmony_ci return Status::OK; 308fa7767c5Sopenharmony_ci} 309fa7767c5Sopenharmony_ci 310fa7767c5Sopenharmony_ciStatus Pipeline::UnLinkFilters(const std::shared_ptr<Filter> &preFilter, 311fa7767c5Sopenharmony_ci const std::vector<std::shared_ptr<Filter>> &nextFilters, 312fa7767c5Sopenharmony_ci StreamType type) 313fa7767c5Sopenharmony_ci{ 314fa7767c5Sopenharmony_ci for (auto nextFilter : nextFilters) { 315fa7767c5Sopenharmony_ci preFilter->UnLinkNext(nextFilter, type); 316fa7767c5Sopenharmony_ci } 317fa7767c5Sopenharmony_ci return Status::OK; 318fa7767c5Sopenharmony_ci} 319fa7767c5Sopenharmony_ci 320fa7767c5Sopenharmony_civoid Pipeline::OnEvent(const Event& event) 321fa7767c5Sopenharmony_ci{ 322fa7767c5Sopenharmony_ci} 323fa7767c5Sopenharmony_ci 324fa7767c5Sopenharmony_ci} // namespace Pipeline 325fa7767c5Sopenharmony_ci} // namespace Media 326fa7767c5Sopenharmony_ci} // namespace OHOS 327