1fa7767c5Sopenharmony_ci/*
2fa7767c5Sopenharmony_ci * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3fa7767c5Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4fa7767c5Sopenharmony_ci * you may not use this file except in compliance with the License.
5fa7767c5Sopenharmony_ci * You may obtain a copy of the License at
6fa7767c5Sopenharmony_ci *
7fa7767c5Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
8fa7767c5Sopenharmony_ci *
9fa7767c5Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10fa7767c5Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11fa7767c5Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12fa7767c5Sopenharmony_ci * See the License for the specific language governing permissions and
13fa7767c5Sopenharmony_ci * limitations under the License.
14fa7767c5Sopenharmony_ci */
15fa7767c5Sopenharmony_ci#define MEDIA_TASK_THREAD
16fa7767c5Sopenharmony_ci#define HST_LOG_TAG "Task"
17fa7767c5Sopenharmony_ci#include "osal/task/task.h"
18fa7767c5Sopenharmony_ci#include "osal/task/taskInner.h"
19fa7767c5Sopenharmony_ci#include "osal/task/thread.h"
20fa7767c5Sopenharmony_ci#include "osal/task/pipeline_threadpool.h"
21fa7767c5Sopenharmony_ci#include "osal/utils/util.h"
22fa7767c5Sopenharmony_ci#include "cpp_ext/memory_ext.h"
23fa7767c5Sopenharmony_ci#include "common/log.h"
24fa7767c5Sopenharmony_ci
25fa7767c5Sopenharmony_ci#include <mutex>
26fa7767c5Sopenharmony_ci
27fa7767c5Sopenharmony_cinamespace {
28fa7767c5Sopenharmony_ciconstexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "TaskInner" };
29fa7767c5Sopenharmony_ci}
30fa7767c5Sopenharmony_ci
31fa7767c5Sopenharmony_cinamespace OHOS {
32fa7767c5Sopenharmony_cinamespace Media {
33fa7767c5Sopenharmony_cinamespace {
34fa7767c5Sopenharmony_ci    constexpr int64_t INVALID_DELAY_TIME_US = 10000000; // 10s
35fa7767c5Sopenharmony_ci}
36fa7767c5Sopenharmony_cistatic std::atomic<int32_t> singletonTaskId = 0;
37fa7767c5Sopenharmony_ci
38fa7767c5Sopenharmony_civoid TaskInner::SleepInTask(unsigned ms)
39fa7767c5Sopenharmony_ci{
40fa7767c5Sopenharmony_ci    OSAL::SleepFor(ms);
41fa7767c5Sopenharmony_ci}
42fa7767c5Sopenharmony_ci
43fa7767c5Sopenharmony_cistatic int64_t GetNowUs()
44fa7767c5Sopenharmony_ci{
45fa7767c5Sopenharmony_ci    auto now = std::chrono::steady_clock::now();
46fa7767c5Sopenharmony_ci    return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
47fa7767c5Sopenharmony_ci}
48fa7767c5Sopenharmony_ci
49fa7767c5Sopenharmony_ciTaskInner::TaskInner(const std::string& name, const std::string& groupId, TaskType type, TaskPriority priority,
50fa7767c5Sopenharmony_ci    bool singleLoop)
51fa7767c5Sopenharmony_ci    : name_(std::move(name)), runningState_(RunningState::PAUSED), singleLoop_(singleLoop)
52fa7767c5Sopenharmony_ci{
53fa7767c5Sopenharmony_ci    MEDIA_LOG_I(">> " PUBLIC_LOG_S " groupId:" PUBLIC_LOG_S " type:%{public}d ctor",
54fa7767c5Sopenharmony_ci        name_.c_str(), groupId.c_str(), type);
55fa7767c5Sopenharmony_ci    if (type == TaskType::SINGLETON) {
56fa7767c5Sopenharmony_ci        std::string newName = name_ + std::to_string(++singletonTaskId);
57fa7767c5Sopenharmony_ci        pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(newName, type, priority);
58fa7767c5Sopenharmony_ci    } else {
59fa7767c5Sopenharmony_ci        pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(groupId, type, priority);
60fa7767c5Sopenharmony_ci    }
61fa7767c5Sopenharmony_ci}
62fa7767c5Sopenharmony_ci
63fa7767c5Sopenharmony_civoid TaskInner::Init()
64fa7767c5Sopenharmony_ci{
65fa7767c5Sopenharmony_ci    MEDIA_LOG_I(">> " PUBLIC_LOG_S " Init", name_.c_str());
66fa7767c5Sopenharmony_ci    pipelineThread_->AddTask(shared_from_this());
67fa7767c5Sopenharmony_ci}
68fa7767c5Sopenharmony_ci
69fa7767c5Sopenharmony_civoid TaskInner::DeInit()
70fa7767c5Sopenharmony_ci{
71fa7767c5Sopenharmony_ci    MEDIA_LOG_I(PUBLIC_LOG_S " DeInit", name_.c_str());
72fa7767c5Sopenharmony_ci    pipelineThread_->RemoveTask(shared_from_this());
73fa7767c5Sopenharmony_ci    {
74fa7767c5Sopenharmony_ci        AutoLock lock1(jobMutex_);
75fa7767c5Sopenharmony_ci        AutoLock lock2(stateMutex_);
76fa7767c5Sopenharmony_ci        runningState_ = RunningState::STOPPED;
77fa7767c5Sopenharmony_ci        topProcessUs_ = -1;
78fa7767c5Sopenharmony_ci    }
79fa7767c5Sopenharmony_ci    MEDIA_LOG_I(PUBLIC_LOG_S " DeInit done", name_.c_str());
80fa7767c5Sopenharmony_ci}
81fa7767c5Sopenharmony_ci
82fa7767c5Sopenharmony_ciTaskInner::~TaskInner()
83fa7767c5Sopenharmony_ci{
84fa7767c5Sopenharmony_ci    MEDIA_LOG_D(PUBLIC_LOG_S " dtor", name_.c_str());
85fa7767c5Sopenharmony_ci}
86fa7767c5Sopenharmony_ci
87fa7767c5Sopenharmony_civoid TaskInner::UpdateDelayTime(int64_t delayUs)
88fa7767c5Sopenharmony_ci{
89fa7767c5Sopenharmony_ci    if (!singleLoop_) {
90fa7767c5Sopenharmony_ci        MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime do nothing", name_.c_str());
91fa7767c5Sopenharmony_ci        return;
92fa7767c5Sopenharmony_ci    }
93fa7767c5Sopenharmony_ci    MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime enter topProcessUs:" PUBLIC_LOG_D64
94fa7767c5Sopenharmony_ci        ", delayUs:" PUBLIC_LOG_D64, name_.c_str(), topProcessUs_, delayUs);
95fa7767c5Sopenharmony_ci    pipelineThread_->LockJobState();
96fa7767c5Sopenharmony_ci    AutoLock lock(stateMutex_);
97fa7767c5Sopenharmony_ci    if (runningState_ != RunningState::STARTED) {
98fa7767c5Sopenharmony_ci        pipelineThread_->UnLockJobState(false);
99fa7767c5Sopenharmony_ci        return;
100fa7767c5Sopenharmony_ci    }
101fa7767c5Sopenharmony_ci    topProcessUs_ = GetNowUs() + delayUs;
102fa7767c5Sopenharmony_ci    pipelineThread_->UnLockJobState(true);
103fa7767c5Sopenharmony_ci    MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime exit topProcessUs:" PUBLIC_LOG_D64,
104fa7767c5Sopenharmony_ci        name_.c_str(), topProcessUs_);
105fa7767c5Sopenharmony_ci}
106fa7767c5Sopenharmony_ci
107fa7767c5Sopenharmony_civoid TaskInner::Start()
108fa7767c5Sopenharmony_ci{
109fa7767c5Sopenharmony_ci    MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Start", name_.c_str());
110fa7767c5Sopenharmony_ci    pipelineThread_->LockJobState();
111fa7767c5Sopenharmony_ci    AutoLock lock(stateMutex_);
112fa7767c5Sopenharmony_ci    runningState_ = RunningState::STARTED;
113fa7767c5Sopenharmony_ci    if (singleLoop_) {
114fa7767c5Sopenharmony_ci        if (!job_) {
115fa7767c5Sopenharmony_ci            MEDIA_LOG_D("task " PUBLIC_LOG_S " Start, job invalid", name_.c_str());
116fa7767c5Sopenharmony_ci        }
117fa7767c5Sopenharmony_ci        topProcessUs_ = GetNowUs();
118fa7767c5Sopenharmony_ci    } else {
119fa7767c5Sopenharmony_ci        UpdateTop();
120fa7767c5Sopenharmony_ci    }
121fa7767c5Sopenharmony_ci    pipelineThread_->UnLockJobState(true);
122fa7767c5Sopenharmony_ci    MEDIA_LOG_I_FALSE_D(isStateLogEnabled_.load(), "task " PUBLIC_LOG_S " Start done", name_.c_str());
123fa7767c5Sopenharmony_ci}
124fa7767c5Sopenharmony_ci
125fa7767c5Sopenharmony_civoid TaskInner::Stop()
126fa7767c5Sopenharmony_ci{
127fa7767c5Sopenharmony_ci    if (pipelineThread_->IsRunningInSelf()) {
128fa7767c5Sopenharmony_ci        MEDIA_LOG_W(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
129fa7767c5Sopenharmony_ci        runningState_ = RunningState::STOPPED;
130fa7767c5Sopenharmony_ci        topProcessUs_ = -1;
131fa7767c5Sopenharmony_ci        return;
132fa7767c5Sopenharmony_ci    }
133fa7767c5Sopenharmony_ci    MEDIA_LOG_I(">> " PUBLIC_LOG_S " Stop", name_.c_str());
134fa7767c5Sopenharmony_ci    AutoLock lock1(jobMutex_);
135fa7767c5Sopenharmony_ci    pipelineThread_->LockJobState();
136fa7767c5Sopenharmony_ci    AutoLock lock2(stateMutex_);
137fa7767c5Sopenharmony_ci    if (runningState_.load() == RunningState::STOPPED) {
138fa7767c5Sopenharmony_ci        pipelineThread_->UnLockJobState(false);
139fa7767c5Sopenharmony_ci        return;
140fa7767c5Sopenharmony_ci    }
141fa7767c5Sopenharmony_ci    runningState_ = RunningState::STOPPED;
142fa7767c5Sopenharmony_ci    topProcessUs_ = -1;
143fa7767c5Sopenharmony_ci    pipelineThread_->UnLockJobState(true);
144fa7767c5Sopenharmony_ci    MEDIA_LOG_I(PUBLIC_LOG_S " Stop <<", name_.c_str());
145fa7767c5Sopenharmony_ci}
146fa7767c5Sopenharmony_ci
147fa7767c5Sopenharmony_civoid TaskInner::StopAsync()
148fa7767c5Sopenharmony_ci{
149fa7767c5Sopenharmony_ci    if (pipelineThread_->IsRunningInSelf()) {
150fa7767c5Sopenharmony_ci        MEDIA_LOG_W(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
151fa7767c5Sopenharmony_ci        runningState_ = RunningState::STOPPED;
152fa7767c5Sopenharmony_ci        topProcessUs_ = -1;
153fa7767c5Sopenharmony_ci        return;
154fa7767c5Sopenharmony_ci    }
155fa7767c5Sopenharmony_ci    MEDIA_LOG_I(PUBLIC_LOG_S " StopAsync", name_.c_str());
156fa7767c5Sopenharmony_ci    pipelineThread_->LockJobState();
157fa7767c5Sopenharmony_ci    AutoLock lock(stateMutex_);
158fa7767c5Sopenharmony_ci    bool stateChanged = false;
159fa7767c5Sopenharmony_ci    if (runningState_.load() != RunningState::STOPPED) {
160fa7767c5Sopenharmony_ci        runningState_ = RunningState::STOPPED;
161fa7767c5Sopenharmony_ci        topProcessUs_ = -1;
162fa7767c5Sopenharmony_ci        stateChanged = true;
163fa7767c5Sopenharmony_ci    }
164fa7767c5Sopenharmony_ci    pipelineThread_->UnLockJobState(stateChanged);
165fa7767c5Sopenharmony_ci}
166fa7767c5Sopenharmony_ci
167fa7767c5Sopenharmony_civoid TaskInner::Pause()
168fa7767c5Sopenharmony_ci{
169fa7767c5Sopenharmony_ci    if (pipelineThread_->IsRunningInSelf()) {
170fa7767c5Sopenharmony_ci        RunningState state = runningState_.load();
171fa7767c5Sopenharmony_ci        if (state == RunningState::STARTED) {
172fa7767c5Sopenharmony_ci            MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
173fa7767c5Sopenharmony_ci                PUBLIC_LOG_S " Pause done in self task", name_.c_str());
174fa7767c5Sopenharmony_ci            runningState_ = RunningState::PAUSED;
175fa7767c5Sopenharmony_ci            topProcessUs_ = -1;
176fa7767c5Sopenharmony_ci            return;
177fa7767c5Sopenharmony_ci        } else {
178fa7767c5Sopenharmony_ci            MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
179fa7767c5Sopenharmony_ci                PUBLIC_LOG_S " Pause skip in self task, curret State: " PUBLIC_LOG_D32, name_.c_str(), state);
180fa7767c5Sopenharmony_ci            return;
181fa7767c5Sopenharmony_ci        }
182fa7767c5Sopenharmony_ci    }
183fa7767c5Sopenharmony_ci    MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause", name_.c_str());
184fa7767c5Sopenharmony_ci    AutoLock lock1(jobMutex_);
185fa7767c5Sopenharmony_ci    pipelineThread_->LockJobState();
186fa7767c5Sopenharmony_ci    AutoLock lock2(stateMutex_);
187fa7767c5Sopenharmony_ci    RunningState state = runningState_.load();
188fa7767c5Sopenharmony_ci    if (state != RunningState::STARTED) {
189fa7767c5Sopenharmony_ci        pipelineThread_->UnLockJobState(false);
190fa7767c5Sopenharmony_ci        return;
191fa7767c5Sopenharmony_ci    }
192fa7767c5Sopenharmony_ci    runningState_ = RunningState::PAUSED;
193fa7767c5Sopenharmony_ci    topProcessUs_ = -1;
194fa7767c5Sopenharmony_ci    pipelineThread_->UnLockJobState(true);
195fa7767c5Sopenharmony_ci    MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause done.", name_.c_str());
196fa7767c5Sopenharmony_ci}
197fa7767c5Sopenharmony_ci
198fa7767c5Sopenharmony_ci// There is no need to perform notification, as no call would wait for PAUSING state.
199fa7767c5Sopenharmony_ci// If perform notification may cause unnecessasy running when the task is already in PAUSED state.
200fa7767c5Sopenharmony_civoid TaskInner::PauseAsync()
201fa7767c5Sopenharmony_ci{
202fa7767c5Sopenharmony_ci    if (pipelineThread_->IsRunningInSelf()) {
203fa7767c5Sopenharmony_ci        RunningState state = runningState_.load();
204fa7767c5Sopenharmony_ci        if (state == RunningState::STARTED) {
205fa7767c5Sopenharmony_ci            MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
206fa7767c5Sopenharmony_ci                PUBLIC_LOG_S " PauseAsync done in self task", name_.c_str());
207fa7767c5Sopenharmony_ci            runningState_ = RunningState::PAUSED;
208fa7767c5Sopenharmony_ci            topProcessUs_ = -1;
209fa7767c5Sopenharmony_ci            return;
210fa7767c5Sopenharmony_ci        } else {
211fa7767c5Sopenharmony_ci            MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
212fa7767c5Sopenharmony_ci                PUBLIC_LOG_S " PauseAsync skip in self task, curretState:%{public}d", name_.c_str(), state);
213fa7767c5Sopenharmony_ci            return;
214fa7767c5Sopenharmony_ci        }
215fa7767c5Sopenharmony_ci    }
216fa7767c5Sopenharmony_ci    MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " PauseAsync", name_.c_str());
217fa7767c5Sopenharmony_ci    pipelineThread_->LockJobState();
218fa7767c5Sopenharmony_ci    AutoLock lock(stateMutex_);
219fa7767c5Sopenharmony_ci    bool stateChanged = false;
220fa7767c5Sopenharmony_ci    if (runningState_.load() == RunningState::STARTED) {
221fa7767c5Sopenharmony_ci        runningState_ = RunningState::PAUSED;
222fa7767c5Sopenharmony_ci        topProcessUs_ = -1;
223fa7767c5Sopenharmony_ci        stateChanged = true;
224fa7767c5Sopenharmony_ci    }
225fa7767c5Sopenharmony_ci    pipelineThread_->UnLockJobState(stateChanged);
226fa7767c5Sopenharmony_ci}
227fa7767c5Sopenharmony_ci
228fa7767c5Sopenharmony_civoid TaskInner::RegisterJob(const std::function<int64_t()>& job)
229fa7767c5Sopenharmony_ci{
230fa7767c5Sopenharmony_ci    MEDIA_LOG_I(PUBLIC_LOG_S " RegisterHandler", name_.c_str());
231fa7767c5Sopenharmony_ci    job_ = std::move(job);
232fa7767c5Sopenharmony_ci}
233fa7767c5Sopenharmony_ci
234fa7767c5Sopenharmony_civoid TaskInner::SubmitJobOnce(const std::function<void()>& job, int64_t delayUs, bool wait)
235fa7767c5Sopenharmony_ci{
236fa7767c5Sopenharmony_ci    MEDIA_LOG_D(PUBLIC_LOG_S " SubmitJobOnce", name_.c_str());
237fa7767c5Sopenharmony_ci    int64_t time = InsertJob(job, delayUs, false);
238fa7767c5Sopenharmony_ci    if (wait) {
239fa7767c5Sopenharmony_ci        AutoLock lock(stateMutex_);
240fa7767c5Sopenharmony_ci        replyCond_.Wait(lock, [this, time] { return msgQueue_.find(time) == msgQueue_.end(); });
241fa7767c5Sopenharmony_ci    }
242fa7767c5Sopenharmony_ci}
243fa7767c5Sopenharmony_ci
244fa7767c5Sopenharmony_civoid TaskInner::SubmitJob(const std::function<void()>& job, int64_t delayUs, bool wait)
245fa7767c5Sopenharmony_ci{
246fa7767c5Sopenharmony_ci    MEDIA_LOG_D(PUBLIC_LOG_S " SubmitJob delayUs:%{public}" PRId64, name_.c_str(), delayUs);
247fa7767c5Sopenharmony_ci    int64_t time = InsertJob(job, delayUs, true);
248fa7767c5Sopenharmony_ci    if (wait) {
249fa7767c5Sopenharmony_ci        AutoLock lock(stateMutex_);
250fa7767c5Sopenharmony_ci        replyCond_.Wait(lock, [this, time] { return jobQueue_.find(time) == jobQueue_.end(); });
251fa7767c5Sopenharmony_ci    }
252fa7767c5Sopenharmony_ci}
253fa7767c5Sopenharmony_ci
254fa7767c5Sopenharmony_civoid TaskInner::UpdateTop()
255fa7767c5Sopenharmony_ci{
256fa7767c5Sopenharmony_ci    if (msgQueue_.empty() && ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty())) {
257fa7767c5Sopenharmony_ci        topProcessUs_ = -1;
258fa7767c5Sopenharmony_ci        return;
259fa7767c5Sopenharmony_ci    }
260fa7767c5Sopenharmony_ci    if (msgQueue_.empty()) {
261fa7767c5Sopenharmony_ci        topProcessUs_ = jobQueue_.begin()->first;
262fa7767c5Sopenharmony_ci        topIsJob_ = true;
263fa7767c5Sopenharmony_ci    } else if ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty()) {
264fa7767c5Sopenharmony_ci        topProcessUs_ = msgQueue_.begin()->first;
265fa7767c5Sopenharmony_ci        topIsJob_ = false;
266fa7767c5Sopenharmony_ci    } else {
267fa7767c5Sopenharmony_ci        int64_t msgProcessTime = msgQueue_.begin()->first;
268fa7767c5Sopenharmony_ci        int64_t jobProcessTime = jobQueue_.begin()->first;
269fa7767c5Sopenharmony_ci        int64_t nowUs =  GetNowUs();
270fa7767c5Sopenharmony_ci        if (msgProcessTime <= nowUs || msgProcessTime <= jobProcessTime) {
271fa7767c5Sopenharmony_ci            topProcessUs_ = msgProcessTime;
272fa7767c5Sopenharmony_ci            topIsJob_ = false;
273fa7767c5Sopenharmony_ci        } else  {
274fa7767c5Sopenharmony_ci            topProcessUs_ = jobProcessTime;
275fa7767c5Sopenharmony_ci            topIsJob_ = true;
276fa7767c5Sopenharmony_ci        }
277fa7767c5Sopenharmony_ci    }
278fa7767c5Sopenharmony_ci}
279fa7767c5Sopenharmony_ci
280fa7767c5Sopenharmony_ciint64_t TaskInner::NextJobUs()
281fa7767c5Sopenharmony_ci{
282fa7767c5Sopenharmony_ci    AutoLock lock(stateMutex_);
283fa7767c5Sopenharmony_ci    return topProcessUs_;
284fa7767c5Sopenharmony_ci}
285fa7767c5Sopenharmony_ci
286fa7767c5Sopenharmony_civoid TaskInner::HandleJob()
287fa7767c5Sopenharmony_ci{
288fa7767c5Sopenharmony_ci    AutoLock lock(jobMutex_);
289fa7767c5Sopenharmony_ci    if (singleLoop_) {
290fa7767c5Sopenharmony_ci        stateMutex_.lock();
291fa7767c5Sopenharmony_ci        int64_t currentTopProcessUs = topProcessUs_;
292fa7767c5Sopenharmony_ci        if (runningState_.load() == RunningState::PAUSED || runningState_.load() == RunningState::STOPPED) {
293fa7767c5Sopenharmony_ci            topProcessUs_ = -1;
294fa7767c5Sopenharmony_ci            stateMutex_.unlock();
295fa7767c5Sopenharmony_ci            return;
296fa7767c5Sopenharmony_ci        }
297fa7767c5Sopenharmony_ci        stateMutex_.unlock();
298fa7767c5Sopenharmony_ci        int64_t nextDelay = (!job_) ? INVALID_DELAY_TIME_US : job_();
299fa7767c5Sopenharmony_ci
300fa7767c5Sopenharmony_ci        AutoLock lock(stateMutex_);
301fa7767c5Sopenharmony_ci        // if topProcessUs_ is -1, we already pause/stop in job_()
302fa7767c5Sopenharmony_ci        // if topProcessUs_ is changed, we should ignore the returned delay time.
303fa7767c5Sopenharmony_ci        if (topProcessUs_ != -1 && currentTopProcessUs == topProcessUs_) {
304fa7767c5Sopenharmony_ci            topProcessUs_ = GetNowUs() + nextDelay;
305fa7767c5Sopenharmony_ci        }
306fa7767c5Sopenharmony_ci    } else {
307fa7767c5Sopenharmony_ci        std::function<void()> nextJob;
308fa7767c5Sopenharmony_ci        stateMutex_.lock();
309fa7767c5Sopenharmony_ci        if (topIsJob_) {
310fa7767c5Sopenharmony_ci            nextJob = std::move(jobQueue_.begin()->second);
311fa7767c5Sopenharmony_ci            jobQueue_.erase(jobQueue_.begin());
312fa7767c5Sopenharmony_ci        } else {
313fa7767c5Sopenharmony_ci            nextJob = std::move(msgQueue_.begin()->second);
314fa7767c5Sopenharmony_ci            msgQueue_.erase(msgQueue_.begin());
315fa7767c5Sopenharmony_ci        }
316fa7767c5Sopenharmony_ci        {
317fa7767c5Sopenharmony_ci            stateMutex_.unlock();
318fa7767c5Sopenharmony_ci            nextJob();
319fa7767c5Sopenharmony_ci            replyCond_.NotifyAll();
320fa7767c5Sopenharmony_ci        }
321fa7767c5Sopenharmony_ci        AutoLock lock(stateMutex_);
322fa7767c5Sopenharmony_ci        UpdateTop();
323fa7767c5Sopenharmony_ci    }
324fa7767c5Sopenharmony_ci}
325fa7767c5Sopenharmony_ci
326fa7767c5Sopenharmony_ciint64_t TaskInner::InsertJob(const std::function<void()>& job, int64_t delayUs, bool inJobQueue)
327fa7767c5Sopenharmony_ci{
328fa7767c5Sopenharmony_ci    pipelineThread_->LockJobState();
329fa7767c5Sopenharmony_ci    AutoLock lock(stateMutex_);
330fa7767c5Sopenharmony_ci    int64_t nowUs = GetNowUs();
331fa7767c5Sopenharmony_ci    if (delayUs < 0) {
332fa7767c5Sopenharmony_ci        delayUs = 0;
333fa7767c5Sopenharmony_ci    }
334fa7767c5Sopenharmony_ci    int64_t processTime = nowUs + delayUs;
335fa7767c5Sopenharmony_ci    if (inJobQueue) {
336fa7767c5Sopenharmony_ci        while (jobQueue_.find(processTime) != jobQueue_.end()) { // To prevent dropping job unexpectedly
337fa7767c5Sopenharmony_ci            MEDIA_LOG_W("DUPLICATIVE jobQueue_ TIMESTAMP!!!");
338fa7767c5Sopenharmony_ci            processTime++;
339fa7767c5Sopenharmony_ci        }
340fa7767c5Sopenharmony_ci        jobQueue_[processTime] = std::move(job);
341fa7767c5Sopenharmony_ci    } else {
342fa7767c5Sopenharmony_ci        while (msgQueue_.find(processTime) != msgQueue_.end()) { // To prevent dropping job unexpectedly
343fa7767c5Sopenharmony_ci            MEDIA_LOG_W("DUPLICATIVE msgQueue_ TIMESTAMP!!!");
344fa7767c5Sopenharmony_ci            processTime++;
345fa7767c5Sopenharmony_ci        }
346fa7767c5Sopenharmony_ci        msgQueue_[processTime] = std::move(job);
347fa7767c5Sopenharmony_ci    }
348fa7767c5Sopenharmony_ci    int64_t lastProcessUs = topProcessUs_;
349fa7767c5Sopenharmony_ci    if (processTime <= topProcessUs_ || topProcessUs_ == -1) {
350fa7767c5Sopenharmony_ci        UpdateTop();
351fa7767c5Sopenharmony_ci    }
352fa7767c5Sopenharmony_ci    pipelineThread_->UnLockJobState(lastProcessUs != topProcessUs_);
353fa7767c5Sopenharmony_ci    return processTime;
354fa7767c5Sopenharmony_ci}
355fa7767c5Sopenharmony_ci} // namespace Media
356fa7767c5Sopenharmony_ci} // namespace OHOS
357