1/*
2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#define MEDIA_PIPELINE
17#define HST_LOG_TAG "Pipeline"
18
19#include <queue>
20#include <stack>
21#include "pipeline/pipeline.h"
22#include "osal/task/autolock.h"
23#include "osal/task/jobutils.h"
24#include "common/log.h"
25#include "osal/utils/hitrace_utils.h"
26
27namespace {
28constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "Pipeline" };
29}
30
31namespace OHOS {
32namespace Media {
33namespace Pipeline {
34static std::atomic<uint16_t> pipeLineId = 0;
35
36int32_t Pipeline::GetNextPipelineId()
37{
38    return pipeLineId++;
39}
40
41Pipeline::~Pipeline()
42{
43}
44
45void Pipeline::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback,
46    const std::string& groupId)
47{
48    MEDIA_LOG_I("Pipeline::Init");
49    eventReceiver_ = receiver;
50    filterCallback_ = callback;
51    groupId_ = groupId;
52}
53
54Status Pipeline::Prepare()
55{
56    MEDIA_LOG_I("Prepare enter.");
57    Status ret = Status::OK;
58    SubmitJobOnce([&] {
59        AutoLock lock(mutex_);
60        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
61            ret = (*it)->Prepare();
62            if (ret != Status::OK) {
63                return;
64            }
65        }
66        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
67            ret = (*it)->WaitAllState(FilterState::READY);
68            if (ret != Status::OK) {
69                return;
70            }
71        }
72    });
73    MEDIA_LOG_I("Prepare done ret = %{public}d", ret);
74    return ret;
75}
76
77Status Pipeline::Start()
78{
79    MEDIA_LOG_I("Start enter.");
80    Status ret = Status::OK;
81    SubmitJobOnce([&] {
82        AutoLock lock(mutex_);
83        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
84            ret = (*it)->Start();
85            if (ret != Status::OK) {
86                return;
87            }
88        }
89        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
90            ret = (*it)->WaitAllState(FilterState::RUNNING);
91            if (ret != Status::OK) {
92                return;
93            }
94        }
95    });
96    MEDIA_LOG_I("Start done ret = %{public}d", ret);
97    return ret;
98}
99
100Status Pipeline::Pause()
101{
102    MEDIA_LOG_I("Pause enter.");
103    Status ret = Status::OK;
104    SubmitJobOnce([&] {
105        AutoLock lock(mutex_);
106        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
107            auto rtv = (*it)->Pause();
108            if (rtv != Status::OK) {
109                ret = rtv;
110            }
111        }
112        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
113            auto rtv = (*it)->WaitAllState(FilterState::PAUSED);
114            if (rtv != Status::OK) {
115                ret = rtv;
116            }
117        }
118    });
119    MEDIA_LOG_I("Pause done ret = %{public}d", ret);
120    return ret;
121}
122
123Status Pipeline::Resume()
124{
125    MEDIA_LOG_I("Resume enter.");
126    Status ret = Status::OK;
127    SubmitJobOnce([&] {
128        AutoLock lock(mutex_);
129        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
130            ret = (*it)->Resume();
131            if (ret != Status::OK) {
132                return;
133            }
134        }
135        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
136            ret = (*it)->WaitAllState(FilterState::RUNNING);
137            if (ret != Status::OK) {
138                return;
139            }
140        }
141    });
142    MEDIA_LOG_I("Resume done ret = %{public}d", ret);
143    return ret;
144}
145
146Status Pipeline::Stop()
147{
148    MEDIA_LOG_I("Stop enter.");
149    Status ret = Status::OK;
150    SubmitJobOnce([&] {
151        AutoLock lock(mutex_);
152        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
153            if (*it == nullptr) {
154                MEDIA_LOG_E("Pipeline error: " PUBLIC_LOG_ZU, filters_.size());
155                continue;
156            }
157            auto rtv = (*it)->Stop();
158            if (rtv != Status::OK) {
159                ret = rtv;
160            }
161        }
162        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
163            auto rtv = (*it)->WaitAllState(FilterState::STOPPED);
164            if (rtv != Status::OK) {
165                ret = rtv;
166            }
167        }
168        filters_.clear();
169    });
170    MEDIA_LOG_I("Stop done ret = %{public}d", ret);
171    return ret;
172}
173
174Status Pipeline::Flush()
175{
176    MEDIA_LOG_I("Flush enter.");
177    SubmitJobOnce([&] {
178        AutoLock lock(mutex_);
179        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
180            (*it)->Flush();
181        }
182    });
183    MEDIA_LOG_I("Flush end.");
184    return Status::OK;
185}
186
187Status Pipeline::Release()
188{
189    MEDIA_LOG_I("Release enter.");
190    SubmitJobOnce([&] {
191        AutoLock lock(mutex_);
192        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
193            (*it)->Release();
194        }
195        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
196            (*it)->WaitAllState(FilterState::RELEASED);
197        }
198        filters_.clear();
199    });
200    MEDIA_LOG_I("Release done.");
201    return Status::OK;
202}
203
204Status Pipeline::Preroll(bool render)
205{
206    MEDIA_LOG_I("Preroll enter.");
207    Status ret = Status::OK;
208    AutoLock lock(mutex_);
209    for (auto it = filters_.begin(); it != filters_.end(); ++it) {
210        auto rtv = (*it)->Preroll();
211        if (rtv != Status::OK) {
212            ret = rtv;
213            MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
214            return ret;
215        }
216    }
217    for (auto it = filters_.begin(); it != filters_.end(); ++it) {
218        auto rtv = (*it)->WaitPrerollDone(render);
219        if (rtv != Status::OK) {
220            ret = rtv;
221            MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
222            return ret;
223        }
224    }
225    MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
226    return ret;
227}
228
229Status Pipeline::SetPlayRange(int64_t start, int64_t end)
230{
231    MEDIA_LOG_I("SetPlayRange enter.");
232    SubmitJobOnce([&] {
233        AutoLock lock(mutex_);
234        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
235            (*it)->SetPlayRange(start, end);
236        }
237    });
238    MEDIA_LOG_I("SetPlayRange done.");
239    return Status::OK;
240}
241
242Status Pipeline::AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)
243{
244    MEDIA_LOG_I("AddHeadFilters enter.");
245    std::vector<std::shared_ptr<Filter>> filtersToAdd;
246    for (auto& filterIn : filtersIn) {
247        bool matched = false;
248        for (const auto& filter : filters_) {
249            if (filterIn == filter) {
250                matched = true;
251                break;
252            }
253        }
254        if (!matched) {
255            filtersToAdd.push_back(filterIn);
256            filterIn->LinkPipeLine(groupId_);
257        }
258    }
259    if (filtersToAdd.empty()) {
260        MEDIA_LOG_I("filter already exists");
261        return Status::OK;
262    }
263    SubmitJobOnce([&] {
264        AutoLock lock(mutex_);
265        this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end());
266    });
267    MEDIA_LOG_I("AddHeadFilters done.");
268    return Status::OK;
269}
270
271Status Pipeline::RemoveHeadFilter(const std::shared_ptr<Filter>& filter)
272{
273    SubmitJobOnce([&] {
274        AutoLock lock(mutex_);
275        auto it = std::find_if(filters_.begin(), filters_.end(),
276                               [&filter](const std::shared_ptr<Filter>& filterPtr) { return filterPtr == filter; });
277        if (it != filters_.end()) {
278            filters_.erase(it);
279        }
280        filter->Release();
281        filter->WaitAllState(FilterState::RELEASED);
282        filter->ClearAllNextFilters();
283        return Status::OK;
284    });
285    return Status::OK;
286}
287
288Status Pipeline::LinkFilters(const std::shared_ptr<Filter> &preFilter,
289                             const std::vector<std::shared_ptr<Filter>> &nextFilters,
290                             StreamType type)
291{
292    for (auto nextFilter : nextFilters) {
293        auto ret = preFilter->LinkNext(nextFilter, type);
294        nextFilter->LinkPipeLine(groupId_);
295        FALSE_RETURN_V(ret == Status::OK, ret);
296    }
297    return Status::OK;
298}
299
300Status Pipeline::UpdateFilters(const std::shared_ptr<Filter> &preFilter,
301                               const std::vector<std::shared_ptr<Filter>> &nextFilters,
302                               StreamType type)
303{
304    for (auto nextFilter : nextFilters) {
305        preFilter->UpdateNext(nextFilter, type);
306    }
307    return Status::OK;
308}
309
310Status Pipeline::UnLinkFilters(const std::shared_ptr<Filter> &preFilter,
311                               const std::vector<std::shared_ptr<Filter>> &nextFilters,
312                               StreamType type)
313{
314    for (auto nextFilter : nextFilters) {
315        preFilter->UnLinkNext(nextFilter, type);
316    }
317    return Status::OK;
318}
319
320void Pipeline::OnEvent(const Event& event)
321{
322}
323
324} // namespace Pipeline
325} // namespace Media
326} // namespace OHOS
327