1fa7767c5Sopenharmony_ci/*
2fa7767c5Sopenharmony_ci * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3fa7767c5Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4fa7767c5Sopenharmony_ci * you may not use this file except in compliance with the License.
5fa7767c5Sopenharmony_ci * You may obtain a copy of the License at
6fa7767c5Sopenharmony_ci *
7fa7767c5Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8fa7767c5Sopenharmony_ci *
9fa7767c5Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10fa7767c5Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11fa7767c5Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12fa7767c5Sopenharmony_ci * See the License for the specific language governing permissions and
13fa7767c5Sopenharmony_ci * limitations under the License.
14fa7767c5Sopenharmony_ci */
15fa7767c5Sopenharmony_ci
16fa7767c5Sopenharmony_ci#define MEDIA_PIPELINE
17fa7767c5Sopenharmony_ci#define HST_LOG_TAG "Pipeline"
18fa7767c5Sopenharmony_ci
19fa7767c5Sopenharmony_ci#include <queue>
20fa7767c5Sopenharmony_ci#include <stack>
21fa7767c5Sopenharmony_ci#include "pipeline/pipeline.h"
22fa7767c5Sopenharmony_ci#include "osal/task/autolock.h"
23fa7767c5Sopenharmony_ci#include "osal/task/jobutils.h"
24fa7767c5Sopenharmony_ci#include "common/log.h"
25fa7767c5Sopenharmony_ci#include "osal/utils/hitrace_utils.h"
26fa7767c5Sopenharmony_ci
27fa7767c5Sopenharmony_cinamespace {
28fa7767c5Sopenharmony_ciconstexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "Pipeline" };
29fa7767c5Sopenharmony_ci}
30fa7767c5Sopenharmony_ci
31fa7767c5Sopenharmony_cinamespace OHOS {
32fa7767c5Sopenharmony_cinamespace Media {
33fa7767c5Sopenharmony_cinamespace Pipeline {
34fa7767c5Sopenharmony_cistatic std::atomic<uint16_t> pipeLineId = 0;
35fa7767c5Sopenharmony_ci
36fa7767c5Sopenharmony_ciint32_t Pipeline::GetNextPipelineId()
37fa7767c5Sopenharmony_ci{
38fa7767c5Sopenharmony_ci    return pipeLineId++;
39fa7767c5Sopenharmony_ci}
40fa7767c5Sopenharmony_ci
41fa7767c5Sopenharmony_ciPipeline::~Pipeline()
42fa7767c5Sopenharmony_ci{
43fa7767c5Sopenharmony_ci}
44fa7767c5Sopenharmony_ci
45fa7767c5Sopenharmony_civoid Pipeline::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback,
46fa7767c5Sopenharmony_ci    const std::string& groupId)
47fa7767c5Sopenharmony_ci{
48fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Pipeline::Init");
49fa7767c5Sopenharmony_ci    eventReceiver_ = receiver;
50fa7767c5Sopenharmony_ci    filterCallback_ = callback;
51fa7767c5Sopenharmony_ci    groupId_ = groupId;
52fa7767c5Sopenharmony_ci}
53fa7767c5Sopenharmony_ci
54fa7767c5Sopenharmony_ciStatus Pipeline::Prepare()
55fa7767c5Sopenharmony_ci{
56fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Prepare enter.");
57fa7767c5Sopenharmony_ci    Status ret = Status::OK;
58fa7767c5Sopenharmony_ci    SubmitJobOnce([&] {
59fa7767c5Sopenharmony_ci        AutoLock lock(mutex_);
60fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
61fa7767c5Sopenharmony_ci            ret = (*it)->Prepare();
62fa7767c5Sopenharmony_ci            if (ret != Status::OK) {
63fa7767c5Sopenharmony_ci                return;
64fa7767c5Sopenharmony_ci            }
65fa7767c5Sopenharmony_ci        }
66fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
67fa7767c5Sopenharmony_ci            ret = (*it)->WaitAllState(FilterState::READY);
68fa7767c5Sopenharmony_ci            if (ret != Status::OK) {
69fa7767c5Sopenharmony_ci                return;
70fa7767c5Sopenharmony_ci            }
71fa7767c5Sopenharmony_ci        }
72fa7767c5Sopenharmony_ci    });
73fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Prepare done ret = %{public}d", ret);
74fa7767c5Sopenharmony_ci    return ret;
75fa7767c5Sopenharmony_ci}
76fa7767c5Sopenharmony_ci
77fa7767c5Sopenharmony_ciStatus Pipeline::Start()
78fa7767c5Sopenharmony_ci{
79fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Start enter.");
80fa7767c5Sopenharmony_ci    Status ret = Status::OK;
81fa7767c5Sopenharmony_ci    SubmitJobOnce([&] {
82fa7767c5Sopenharmony_ci        AutoLock lock(mutex_);
83fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
84fa7767c5Sopenharmony_ci            ret = (*it)->Start();
85fa7767c5Sopenharmony_ci            if (ret != Status::OK) {
86fa7767c5Sopenharmony_ci                return;
87fa7767c5Sopenharmony_ci            }
88fa7767c5Sopenharmony_ci        }
89fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
90fa7767c5Sopenharmony_ci            ret = (*it)->WaitAllState(FilterState::RUNNING);
91fa7767c5Sopenharmony_ci            if (ret != Status::OK) {
92fa7767c5Sopenharmony_ci                return;
93fa7767c5Sopenharmony_ci            }
94fa7767c5Sopenharmony_ci        }
95fa7767c5Sopenharmony_ci    });
96fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Start done ret = %{public}d", ret);
97fa7767c5Sopenharmony_ci    return ret;
98fa7767c5Sopenharmony_ci}
99fa7767c5Sopenharmony_ci
100fa7767c5Sopenharmony_ciStatus Pipeline::Pause()
101fa7767c5Sopenharmony_ci{
102fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Pause enter.");
103fa7767c5Sopenharmony_ci    Status ret = Status::OK;
104fa7767c5Sopenharmony_ci    SubmitJobOnce([&] {
105fa7767c5Sopenharmony_ci        AutoLock lock(mutex_);
106fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
107fa7767c5Sopenharmony_ci            auto rtv = (*it)->Pause();
108fa7767c5Sopenharmony_ci            if (rtv != Status::OK) {
109fa7767c5Sopenharmony_ci                ret = rtv;
110fa7767c5Sopenharmony_ci            }
111fa7767c5Sopenharmony_ci        }
112fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
113fa7767c5Sopenharmony_ci            auto rtv = (*it)->WaitAllState(FilterState::PAUSED);
114fa7767c5Sopenharmony_ci            if (rtv != Status::OK) {
115fa7767c5Sopenharmony_ci                ret = rtv;
116fa7767c5Sopenharmony_ci            }
117fa7767c5Sopenharmony_ci        }
118fa7767c5Sopenharmony_ci    });
119fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Pause done ret = %{public}d", ret);
120fa7767c5Sopenharmony_ci    return ret;
121fa7767c5Sopenharmony_ci}
122fa7767c5Sopenharmony_ci
123fa7767c5Sopenharmony_ciStatus Pipeline::Resume()
124fa7767c5Sopenharmony_ci{
125fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Resume enter.");
126fa7767c5Sopenharmony_ci    Status ret = Status::OK;
127fa7767c5Sopenharmony_ci    SubmitJobOnce([&] {
128fa7767c5Sopenharmony_ci        AutoLock lock(mutex_);
129fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
130fa7767c5Sopenharmony_ci            ret = (*it)->Resume();
131fa7767c5Sopenharmony_ci            if (ret != Status::OK) {
132fa7767c5Sopenharmony_ci                return;
133fa7767c5Sopenharmony_ci            }
134fa7767c5Sopenharmony_ci        }
135fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
136fa7767c5Sopenharmony_ci            ret = (*it)->WaitAllState(FilterState::RUNNING);
137fa7767c5Sopenharmony_ci            if (ret != Status::OK) {
138fa7767c5Sopenharmony_ci                return;
139fa7767c5Sopenharmony_ci            }
140fa7767c5Sopenharmony_ci        }
141fa7767c5Sopenharmony_ci    });
142fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Resume done ret = %{public}d", ret);
143fa7767c5Sopenharmony_ci    return ret;
144fa7767c5Sopenharmony_ci}
145fa7767c5Sopenharmony_ci
146fa7767c5Sopenharmony_ciStatus Pipeline::Stop()
147fa7767c5Sopenharmony_ci{
148fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Stop enter.");
149fa7767c5Sopenharmony_ci    Status ret = Status::OK;
150fa7767c5Sopenharmony_ci    SubmitJobOnce([&] {
151fa7767c5Sopenharmony_ci        AutoLock lock(mutex_);
152fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
153fa7767c5Sopenharmony_ci            if (*it == nullptr) {
154fa7767c5Sopenharmony_ci                MEDIA_LOG_E("Pipeline error: " PUBLIC_LOG_ZU, filters_.size());
155fa7767c5Sopenharmony_ci                continue;
156fa7767c5Sopenharmony_ci            }
157fa7767c5Sopenharmony_ci            auto rtv = (*it)->Stop();
158fa7767c5Sopenharmony_ci            if (rtv != Status::OK) {
159fa7767c5Sopenharmony_ci                ret = rtv;
160fa7767c5Sopenharmony_ci            }
161fa7767c5Sopenharmony_ci        }
162fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
163fa7767c5Sopenharmony_ci            auto rtv = (*it)->WaitAllState(FilterState::STOPPED);
164fa7767c5Sopenharmony_ci            if (rtv != Status::OK) {
165fa7767c5Sopenharmony_ci                ret = rtv;
166fa7767c5Sopenharmony_ci            }
167fa7767c5Sopenharmony_ci        }
168fa7767c5Sopenharmony_ci        filters_.clear();
169fa7767c5Sopenharmony_ci    });
170fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Stop done ret = %{public}d", ret);
171fa7767c5Sopenharmony_ci    return ret;
172fa7767c5Sopenharmony_ci}
173fa7767c5Sopenharmony_ci
174fa7767c5Sopenharmony_ciStatus Pipeline::Flush()
175fa7767c5Sopenharmony_ci{
176fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Flush enter.");
177fa7767c5Sopenharmony_ci    SubmitJobOnce([&] {
178fa7767c5Sopenharmony_ci        AutoLock lock(mutex_);
179fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
180fa7767c5Sopenharmony_ci            (*it)->Flush();
181fa7767c5Sopenharmony_ci        }
182fa7767c5Sopenharmony_ci    });
183fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Flush end.");
184fa7767c5Sopenharmony_ci    return Status::OK;
185fa7767c5Sopenharmony_ci}
186fa7767c5Sopenharmony_ci
187fa7767c5Sopenharmony_ciStatus Pipeline::Release()
188fa7767c5Sopenharmony_ci{
189fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Release enter.");
190fa7767c5Sopenharmony_ci    SubmitJobOnce([&] {
191fa7767c5Sopenharmony_ci        AutoLock lock(mutex_);
192fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
193fa7767c5Sopenharmony_ci            (*it)->Release();
194fa7767c5Sopenharmony_ci        }
195fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
196fa7767c5Sopenharmony_ci            (*it)->WaitAllState(FilterState::RELEASED);
197fa7767c5Sopenharmony_ci        }
198fa7767c5Sopenharmony_ci        filters_.clear();
199fa7767c5Sopenharmony_ci    });
200fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Release done.");
201fa7767c5Sopenharmony_ci    return Status::OK;
202fa7767c5Sopenharmony_ci}
203fa7767c5Sopenharmony_ci
204fa7767c5Sopenharmony_ciStatus Pipeline::Preroll(bool render)
205fa7767c5Sopenharmony_ci{
206fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Preroll enter.");
207fa7767c5Sopenharmony_ci    Status ret = Status::OK;
208fa7767c5Sopenharmony_ci    AutoLock lock(mutex_);
209fa7767c5Sopenharmony_ci    for (auto it = filters_.begin(); it != filters_.end(); ++it) {
210fa7767c5Sopenharmony_ci        auto rtv = (*it)->Preroll();
211fa7767c5Sopenharmony_ci        if (rtv != Status::OK) {
212fa7767c5Sopenharmony_ci            ret = rtv;
213fa7767c5Sopenharmony_ci            MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
214fa7767c5Sopenharmony_ci            return ret;
215fa7767c5Sopenharmony_ci        }
216fa7767c5Sopenharmony_ci    }
217fa7767c5Sopenharmony_ci    for (auto it = filters_.begin(); it != filters_.end(); ++it) {
218fa7767c5Sopenharmony_ci        auto rtv = (*it)->WaitPrerollDone(render);
219fa7767c5Sopenharmony_ci        if (rtv != Status::OK) {
220fa7767c5Sopenharmony_ci            ret = rtv;
221fa7767c5Sopenharmony_ci            MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
222fa7767c5Sopenharmony_ci            return ret;
223fa7767c5Sopenharmony_ci        }
224fa7767c5Sopenharmony_ci    }
225fa7767c5Sopenharmony_ci    MEDIA_LOG_I("Preroll done ret = %{public}d", ret);
226fa7767c5Sopenharmony_ci    return ret;
227fa7767c5Sopenharmony_ci}
228fa7767c5Sopenharmony_ci
229fa7767c5Sopenharmony_ciStatus Pipeline::SetPlayRange(int64_t start, int64_t end)
230fa7767c5Sopenharmony_ci{
231fa7767c5Sopenharmony_ci    MEDIA_LOG_I("SetPlayRange enter.");
232fa7767c5Sopenharmony_ci    SubmitJobOnce([&] {
233fa7767c5Sopenharmony_ci        AutoLock lock(mutex_);
234fa7767c5Sopenharmony_ci        for (auto it = filters_.begin(); it != filters_.end(); ++it) {
235fa7767c5Sopenharmony_ci            (*it)->SetPlayRange(start, end);
236fa7767c5Sopenharmony_ci        }
237fa7767c5Sopenharmony_ci    });
238fa7767c5Sopenharmony_ci    MEDIA_LOG_I("SetPlayRange done.");
239fa7767c5Sopenharmony_ci    return Status::OK;
240fa7767c5Sopenharmony_ci}
241fa7767c5Sopenharmony_ci
242fa7767c5Sopenharmony_ciStatus Pipeline::AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)
243fa7767c5Sopenharmony_ci{
244fa7767c5Sopenharmony_ci    MEDIA_LOG_I("AddHeadFilters enter.");
245fa7767c5Sopenharmony_ci    std::vector<std::shared_ptr<Filter>> filtersToAdd;
246fa7767c5Sopenharmony_ci    for (auto& filterIn : filtersIn) {
247fa7767c5Sopenharmony_ci        bool matched = false;
248fa7767c5Sopenharmony_ci        for (const auto& filter : filters_) {
249fa7767c5Sopenharmony_ci            if (filterIn == filter) {
250fa7767c5Sopenharmony_ci                matched = true;
251fa7767c5Sopenharmony_ci                break;
252fa7767c5Sopenharmony_ci            }
253fa7767c5Sopenharmony_ci        }
254fa7767c5Sopenharmony_ci        if (!matched) {
255fa7767c5Sopenharmony_ci            filtersToAdd.push_back(filterIn);
256fa7767c5Sopenharmony_ci            filterIn->LinkPipeLine(groupId_);
257fa7767c5Sopenharmony_ci        }
258fa7767c5Sopenharmony_ci    }
259fa7767c5Sopenharmony_ci    if (filtersToAdd.empty()) {
260fa7767c5Sopenharmony_ci        MEDIA_LOG_I("filter already exists");
261fa7767c5Sopenharmony_ci        return Status::OK;
262fa7767c5Sopenharmony_ci    }
263fa7767c5Sopenharmony_ci    SubmitJobOnce([&] {
264fa7767c5Sopenharmony_ci        AutoLock lock(mutex_);
265fa7767c5Sopenharmony_ci        this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end());
266fa7767c5Sopenharmony_ci    });
267fa7767c5Sopenharmony_ci    MEDIA_LOG_I("AddHeadFilters done.");
268fa7767c5Sopenharmony_ci    return Status::OK;
269fa7767c5Sopenharmony_ci}
270fa7767c5Sopenharmony_ci
271fa7767c5Sopenharmony_ciStatus Pipeline::RemoveHeadFilter(const std::shared_ptr<Filter>& filter)
272fa7767c5Sopenharmony_ci{
273fa7767c5Sopenharmony_ci    SubmitJobOnce([&] {
274fa7767c5Sopenharmony_ci        AutoLock lock(mutex_);
275fa7767c5Sopenharmony_ci        auto it = std::find_if(filters_.begin(), filters_.end(),
276fa7767c5Sopenharmony_ci                               [&filter](const std::shared_ptr<Filter>& filterPtr) { return filterPtr == filter; });
277fa7767c5Sopenharmony_ci        if (it != filters_.end()) {
278fa7767c5Sopenharmony_ci            filters_.erase(it);
279fa7767c5Sopenharmony_ci        }
280fa7767c5Sopenharmony_ci        filter->Release();
281fa7767c5Sopenharmony_ci        filter->WaitAllState(FilterState::RELEASED);
282fa7767c5Sopenharmony_ci        filter->ClearAllNextFilters();
283fa7767c5Sopenharmony_ci        return Status::OK;
284fa7767c5Sopenharmony_ci    });
285fa7767c5Sopenharmony_ci    return Status::OK;
286fa7767c5Sopenharmony_ci}
287fa7767c5Sopenharmony_ci
288fa7767c5Sopenharmony_ciStatus Pipeline::LinkFilters(const std::shared_ptr<Filter> &preFilter,
289fa7767c5Sopenharmony_ci                             const std::vector<std::shared_ptr<Filter>> &nextFilters,
290fa7767c5Sopenharmony_ci                             StreamType type)
291fa7767c5Sopenharmony_ci{
292fa7767c5Sopenharmony_ci    for (auto nextFilter : nextFilters) {
293fa7767c5Sopenharmony_ci        auto ret = preFilter->LinkNext(nextFilter, type);
294fa7767c5Sopenharmony_ci        nextFilter->LinkPipeLine(groupId_);
295fa7767c5Sopenharmony_ci        FALSE_RETURN_V(ret == Status::OK, ret);
296fa7767c5Sopenharmony_ci    }
297fa7767c5Sopenharmony_ci    return Status::OK;
298fa7767c5Sopenharmony_ci}
299fa7767c5Sopenharmony_ci
300fa7767c5Sopenharmony_ciStatus Pipeline::UpdateFilters(const std::shared_ptr<Filter> &preFilter,
301fa7767c5Sopenharmony_ci                               const std::vector<std::shared_ptr<Filter>> &nextFilters,
302fa7767c5Sopenharmony_ci                               StreamType type)
303fa7767c5Sopenharmony_ci{
304fa7767c5Sopenharmony_ci    for (auto nextFilter : nextFilters) {
305fa7767c5Sopenharmony_ci        preFilter->UpdateNext(nextFilter, type);
306fa7767c5Sopenharmony_ci    }
307fa7767c5Sopenharmony_ci    return Status::OK;
308fa7767c5Sopenharmony_ci}
309fa7767c5Sopenharmony_ci
310fa7767c5Sopenharmony_ciStatus Pipeline::UnLinkFilters(const std::shared_ptr<Filter> &preFilter,
311fa7767c5Sopenharmony_ci                               const std::vector<std::shared_ptr<Filter>> &nextFilters,
312fa7767c5Sopenharmony_ci                               StreamType type)
313fa7767c5Sopenharmony_ci{
314fa7767c5Sopenharmony_ci    for (auto nextFilter : nextFilters) {
315fa7767c5Sopenharmony_ci        preFilter->UnLinkNext(nextFilter, type);
316fa7767c5Sopenharmony_ci    }
317fa7767c5Sopenharmony_ci    return Status::OK;
318fa7767c5Sopenharmony_ci}
319fa7767c5Sopenharmony_ci
320fa7767c5Sopenharmony_civoid Pipeline::OnEvent(const Event& event)
321fa7767c5Sopenharmony_ci{
322fa7767c5Sopenharmony_ci}
323fa7767c5Sopenharmony_ci
324fa7767c5Sopenharmony_ci} // namespace Pipeline
325fa7767c5Sopenharmony_ci} // namespace Media
326fa7767c5Sopenharmony_ci} // namespace OHOS
327