1/* 2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#define MEDIA_PIPELINE 17#define HST_LOG_TAG "Pipeline" 18 19#include <queue> 20#include <stack> 21#include "pipeline/pipeline.h" 22#include "osal/task/autolock.h" 23#include "osal/task/jobutils.h" 24#include "common/log.h" 25#include "osal/utils/hitrace_utils.h" 26 27namespace { 28constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "Pipeline" }; 29} 30 31namespace OHOS { 32namespace Media { 33namespace Pipeline { 34static std::atomic<uint16_t> pipeLineId = 0; 35 36int32_t Pipeline::GetNextPipelineId() 37{ 38 return pipeLineId++; 39} 40 41Pipeline::~Pipeline() 42{ 43} 44 45void Pipeline::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback, 46 const std::string& groupId) 47{ 48 MEDIA_LOG_I("Pipeline::Init"); 49 eventReceiver_ = receiver; 50 filterCallback_ = callback; 51 groupId_ = groupId; 52} 53 54Status Pipeline::Prepare() 55{ 56 MEDIA_LOG_I("Prepare enter."); 57 Status ret = Status::OK; 58 SubmitJobOnce([&] { 59 AutoLock lock(mutex_); 60 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 61 ret = (*it)->Prepare(); 62 if (ret != Status::OK) { 63 return; 64 } 65 } 66 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 67 ret = (*it)->WaitAllState(FilterState::READY); 68 if (ret != Status::OK) { 69 return; 70 } 71 } 72 }); 73 MEDIA_LOG_I("Prepare done ret = %{public}d", ret); 74 return ret; 75} 76 77Status Pipeline::Start() 78{ 79 MEDIA_LOG_I("Start enter."); 80 Status ret = Status::OK; 81 SubmitJobOnce([&] { 82 AutoLock lock(mutex_); 83 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 84 ret = (*it)->Start(); 85 if (ret != Status::OK) { 86 return; 87 } 88 } 89 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 90 ret = (*it)->WaitAllState(FilterState::RUNNING); 91 if (ret != Status::OK) { 92 return; 93 } 94 } 95 }); 96 MEDIA_LOG_I("Start done ret = %{public}d", ret); 97 return ret; 98} 99 100Status Pipeline::Pause() 101{ 102 MEDIA_LOG_I("Pause enter."); 103 Status ret = Status::OK; 104 SubmitJobOnce([&] { 105 AutoLock lock(mutex_); 106 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 107 auto rtv = (*it)->Pause(); 108 if (rtv != Status::OK) { 109 ret = rtv; 110 } 111 } 112 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 113 auto rtv = (*it)->WaitAllState(FilterState::PAUSED); 114 if (rtv != Status::OK) { 115 ret = rtv; 116 } 117 } 118 }); 119 MEDIA_LOG_I("Pause done ret = %{public}d", ret); 120 return ret; 121} 122 123Status Pipeline::Resume() 124{ 125 MEDIA_LOG_I("Resume enter."); 126 Status ret = Status::OK; 127 SubmitJobOnce([&] { 128 AutoLock lock(mutex_); 129 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 130 ret = (*it)->Resume(); 131 if (ret != Status::OK) { 132 return; 133 } 134 } 135 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 136 ret = (*it)->WaitAllState(FilterState::RUNNING); 137 if (ret != Status::OK) { 138 return; 139 } 140 } 141 }); 142 MEDIA_LOG_I("Resume done ret = %{public}d", ret); 143 return ret; 144} 145 146Status Pipeline::Stop() 147{ 148 MEDIA_LOG_I("Stop enter."); 149 Status ret = Status::OK; 150 SubmitJobOnce([&] { 151 AutoLock lock(mutex_); 152 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 153 if (*it == nullptr) { 154 MEDIA_LOG_E("Pipeline error: " PUBLIC_LOG_ZU, filters_.size()); 155 continue; 156 } 157 auto rtv = (*it)->Stop(); 158 if (rtv != Status::OK) { 159 ret = rtv; 160 } 161 } 162 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 163 auto rtv = (*it)->WaitAllState(FilterState::STOPPED); 164 if (rtv != Status::OK) { 165 ret = rtv; 166 } 167 } 168 filters_.clear(); 169 }); 170 MEDIA_LOG_I("Stop done ret = %{public}d", ret); 171 return ret; 172} 173 174Status Pipeline::Flush() 175{ 176 MEDIA_LOG_I("Flush enter."); 177 SubmitJobOnce([&] { 178 AutoLock lock(mutex_); 179 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 180 (*it)->Flush(); 181 } 182 }); 183 MEDIA_LOG_I("Flush end."); 184 return Status::OK; 185} 186 187Status Pipeline::Release() 188{ 189 MEDIA_LOG_I("Release enter."); 190 SubmitJobOnce([&] { 191 AutoLock lock(mutex_); 192 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 193 (*it)->Release(); 194 } 195 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 196 (*it)->WaitAllState(FilterState::RELEASED); 197 } 198 filters_.clear(); 199 }); 200 MEDIA_LOG_I("Release done."); 201 return Status::OK; 202} 203 204Status Pipeline::Preroll(bool render) 205{ 206 MEDIA_LOG_I("Preroll enter."); 207 Status ret = Status::OK; 208 AutoLock lock(mutex_); 209 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 210 auto rtv = (*it)->Preroll(); 211 if (rtv != Status::OK) { 212 ret = rtv; 213 MEDIA_LOG_I("Preroll done ret = %{public}d", ret); 214 return ret; 215 } 216 } 217 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 218 auto rtv = (*it)->WaitPrerollDone(render); 219 if (rtv != Status::OK) { 220 ret = rtv; 221 MEDIA_LOG_I("Preroll done ret = %{public}d", ret); 222 return ret; 223 } 224 } 225 MEDIA_LOG_I("Preroll done ret = %{public}d", ret); 226 return ret; 227} 228 229Status Pipeline::SetPlayRange(int64_t start, int64_t end) 230{ 231 MEDIA_LOG_I("SetPlayRange enter."); 232 SubmitJobOnce([&] { 233 AutoLock lock(mutex_); 234 for (auto it = filters_.begin(); it != filters_.end(); ++it) { 235 (*it)->SetPlayRange(start, end); 236 } 237 }); 238 MEDIA_LOG_I("SetPlayRange done."); 239 return Status::OK; 240} 241 242Status Pipeline::AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn) 243{ 244 MEDIA_LOG_I("AddHeadFilters enter."); 245 std::vector<std::shared_ptr<Filter>> filtersToAdd; 246 for (auto& filterIn : filtersIn) { 247 bool matched = false; 248 for (const auto& filter : filters_) { 249 if (filterIn == filter) { 250 matched = true; 251 break; 252 } 253 } 254 if (!matched) { 255 filtersToAdd.push_back(filterIn); 256 filterIn->LinkPipeLine(groupId_); 257 } 258 } 259 if (filtersToAdd.empty()) { 260 MEDIA_LOG_I("filter already exists"); 261 return Status::OK; 262 } 263 SubmitJobOnce([&] { 264 AutoLock lock(mutex_); 265 this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end()); 266 }); 267 MEDIA_LOG_I("AddHeadFilters done."); 268 return Status::OK; 269} 270 271Status Pipeline::RemoveHeadFilter(const std::shared_ptr<Filter>& filter) 272{ 273 SubmitJobOnce([&] { 274 AutoLock lock(mutex_); 275 auto it = std::find_if(filters_.begin(), filters_.end(), 276 [&filter](const std::shared_ptr<Filter>& filterPtr) { return filterPtr == filter; }); 277 if (it != filters_.end()) { 278 filters_.erase(it); 279 } 280 filter->Release(); 281 filter->WaitAllState(FilterState::RELEASED); 282 filter->ClearAllNextFilters(); 283 return Status::OK; 284 }); 285 return Status::OK; 286} 287 288Status Pipeline::LinkFilters(const std::shared_ptr<Filter> &preFilter, 289 const std::vector<std::shared_ptr<Filter>> &nextFilters, 290 StreamType type) 291{ 292 for (auto nextFilter : nextFilters) { 293 auto ret = preFilter->LinkNext(nextFilter, type); 294 nextFilter->LinkPipeLine(groupId_); 295 FALSE_RETURN_V(ret == Status::OK, ret); 296 } 297 return Status::OK; 298} 299 300Status Pipeline::UpdateFilters(const std::shared_ptr<Filter> &preFilter, 301 const std::vector<std::shared_ptr<Filter>> &nextFilters, 302 StreamType type) 303{ 304 for (auto nextFilter : nextFilters) { 305 preFilter->UpdateNext(nextFilter, type); 306 } 307 return Status::OK; 308} 309 310Status Pipeline::UnLinkFilters(const std::shared_ptr<Filter> &preFilter, 311 const std::vector<std::shared_ptr<Filter>> &nextFilters, 312 StreamType type) 313{ 314 for (auto nextFilter : nextFilters) { 315 preFilter->UnLinkNext(nextFilter, type); 316 } 317 return Status::OK; 318} 319 320void Pipeline::OnEvent(const Event& event) 321{ 322} 323 324} // namespace Pipeline 325} // namespace Media 326} // namespace OHOS 327