1fa7767c5Sopenharmony_ci/* 2fa7767c5Sopenharmony_ci * Copyright (c) 2021-2023 Huawei Device Co., Ltd. 3fa7767c5Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4fa7767c5Sopenharmony_ci * you may not use this file except in compliance with the License. 5fa7767c5Sopenharmony_ci * You may obtain a copy of the License at 6fa7767c5Sopenharmony_ci * 7fa7767c5Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8fa7767c5Sopenharmony_ci * 9fa7767c5Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10fa7767c5Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11fa7767c5Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12fa7767c5Sopenharmony_ci * See the License for the specific language governing permissions and 13fa7767c5Sopenharmony_ci * limitations under the License. 14fa7767c5Sopenharmony_ci */ 15fa7767c5Sopenharmony_ci#define MEDIA_TASK_THREAD 16fa7767c5Sopenharmony_ci#define HST_LOG_TAG "Task" 17fa7767c5Sopenharmony_ci#include "osal/task/task.h" 18fa7767c5Sopenharmony_ci#include "osal/task/taskInner.h" 19fa7767c5Sopenharmony_ci#include "osal/task/thread.h" 20fa7767c5Sopenharmony_ci#include "osal/task/pipeline_threadpool.h" 21fa7767c5Sopenharmony_ci#include "osal/utils/util.h" 22fa7767c5Sopenharmony_ci#include "cpp_ext/memory_ext.h" 23fa7767c5Sopenharmony_ci#include "common/log.h" 24fa7767c5Sopenharmony_ci 25fa7767c5Sopenharmony_ci#include <mutex> 26fa7767c5Sopenharmony_ci 27fa7767c5Sopenharmony_cinamespace { 28fa7767c5Sopenharmony_ciconstexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "TaskInner" }; 29fa7767c5Sopenharmony_ci} 30fa7767c5Sopenharmony_ci 31fa7767c5Sopenharmony_cinamespace OHOS { 32fa7767c5Sopenharmony_cinamespace Media { 33fa7767c5Sopenharmony_cinamespace { 34fa7767c5Sopenharmony_ci constexpr int64_t INVALID_DELAY_TIME_US = 10000000; // 10s 35fa7767c5Sopenharmony_ci} 36fa7767c5Sopenharmony_cistatic std::atomic<int32_t> singletonTaskId = 0; 37fa7767c5Sopenharmony_ci 38fa7767c5Sopenharmony_civoid TaskInner::SleepInTask(unsigned ms) 39fa7767c5Sopenharmony_ci{ 40fa7767c5Sopenharmony_ci OSAL::SleepFor(ms); 41fa7767c5Sopenharmony_ci} 42fa7767c5Sopenharmony_ci 43fa7767c5Sopenharmony_cistatic int64_t GetNowUs() 44fa7767c5Sopenharmony_ci{ 45fa7767c5Sopenharmony_ci auto now = std::chrono::steady_clock::now(); 46fa7767c5Sopenharmony_ci return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count(); 47fa7767c5Sopenharmony_ci} 48fa7767c5Sopenharmony_ci 49fa7767c5Sopenharmony_ciTaskInner::TaskInner(const std::string& name, const std::string& groupId, TaskType type, TaskPriority priority, 50fa7767c5Sopenharmony_ci bool singleLoop) 51fa7767c5Sopenharmony_ci : name_(std::move(name)), runningState_(RunningState::PAUSED), singleLoop_(singleLoop) 52fa7767c5Sopenharmony_ci{ 53fa7767c5Sopenharmony_ci MEDIA_LOG_I(">> " PUBLIC_LOG_S " groupId:" PUBLIC_LOG_S " type:%{public}d ctor", 54fa7767c5Sopenharmony_ci name_.c_str(), groupId.c_str(), type); 55fa7767c5Sopenharmony_ci if (type == TaskType::SINGLETON) { 56fa7767c5Sopenharmony_ci std::string newName = name_ + std::to_string(++singletonTaskId); 57fa7767c5Sopenharmony_ci pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(newName, type, priority); 58fa7767c5Sopenharmony_ci } else { 59fa7767c5Sopenharmony_ci pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(groupId, type, priority); 60fa7767c5Sopenharmony_ci } 61fa7767c5Sopenharmony_ci} 62fa7767c5Sopenharmony_ci 63fa7767c5Sopenharmony_civoid TaskInner::Init() 64fa7767c5Sopenharmony_ci{ 65fa7767c5Sopenharmony_ci MEDIA_LOG_I(">> " PUBLIC_LOG_S " Init", name_.c_str()); 66fa7767c5Sopenharmony_ci pipelineThread_->AddTask(shared_from_this()); 67fa7767c5Sopenharmony_ci} 68fa7767c5Sopenharmony_ci 69fa7767c5Sopenharmony_civoid TaskInner::DeInit() 70fa7767c5Sopenharmony_ci{ 71fa7767c5Sopenharmony_ci MEDIA_LOG_I(PUBLIC_LOG_S " DeInit", name_.c_str()); 72fa7767c5Sopenharmony_ci pipelineThread_->RemoveTask(shared_from_this()); 73fa7767c5Sopenharmony_ci { 74fa7767c5Sopenharmony_ci AutoLock lock1(jobMutex_); 75fa7767c5Sopenharmony_ci AutoLock lock2(stateMutex_); 76fa7767c5Sopenharmony_ci runningState_ = RunningState::STOPPED; 77fa7767c5Sopenharmony_ci topProcessUs_ = -1; 78fa7767c5Sopenharmony_ci } 79fa7767c5Sopenharmony_ci MEDIA_LOG_I(PUBLIC_LOG_S " DeInit done", name_.c_str()); 80fa7767c5Sopenharmony_ci} 81fa7767c5Sopenharmony_ci 82fa7767c5Sopenharmony_ciTaskInner::~TaskInner() 83fa7767c5Sopenharmony_ci{ 84fa7767c5Sopenharmony_ci MEDIA_LOG_D(PUBLIC_LOG_S " dtor", name_.c_str()); 85fa7767c5Sopenharmony_ci} 86fa7767c5Sopenharmony_ci 87fa7767c5Sopenharmony_civoid TaskInner::UpdateDelayTime(int64_t delayUs) 88fa7767c5Sopenharmony_ci{ 89fa7767c5Sopenharmony_ci if (!singleLoop_) { 90fa7767c5Sopenharmony_ci MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime do nothing", name_.c_str()); 91fa7767c5Sopenharmony_ci return; 92fa7767c5Sopenharmony_ci } 93fa7767c5Sopenharmony_ci MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime enter topProcessUs:" PUBLIC_LOG_D64 94fa7767c5Sopenharmony_ci ", delayUs:" PUBLIC_LOG_D64, name_.c_str(), topProcessUs_, delayUs); 95fa7767c5Sopenharmony_ci pipelineThread_->LockJobState(); 96fa7767c5Sopenharmony_ci AutoLock lock(stateMutex_); 97fa7767c5Sopenharmony_ci if (runningState_ != RunningState::STARTED) { 98fa7767c5Sopenharmony_ci pipelineThread_->UnLockJobState(false); 99fa7767c5Sopenharmony_ci return; 100fa7767c5Sopenharmony_ci } 101fa7767c5Sopenharmony_ci topProcessUs_ = GetNowUs() + delayUs; 102fa7767c5Sopenharmony_ci pipelineThread_->UnLockJobState(true); 103fa7767c5Sopenharmony_ci MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime exit topProcessUs:" PUBLIC_LOG_D64, 104fa7767c5Sopenharmony_ci name_.c_str(), topProcessUs_); 105fa7767c5Sopenharmony_ci} 106fa7767c5Sopenharmony_ci 107fa7767c5Sopenharmony_civoid TaskInner::Start() 108fa7767c5Sopenharmony_ci{ 109fa7767c5Sopenharmony_ci MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Start", name_.c_str()); 110fa7767c5Sopenharmony_ci pipelineThread_->LockJobState(); 111fa7767c5Sopenharmony_ci AutoLock lock(stateMutex_); 112fa7767c5Sopenharmony_ci runningState_ = RunningState::STARTED; 113fa7767c5Sopenharmony_ci if (singleLoop_) { 114fa7767c5Sopenharmony_ci if (!job_) { 115fa7767c5Sopenharmony_ci MEDIA_LOG_D("task " PUBLIC_LOG_S " Start, job invalid", name_.c_str()); 116fa7767c5Sopenharmony_ci } 117fa7767c5Sopenharmony_ci topProcessUs_ = GetNowUs(); 118fa7767c5Sopenharmony_ci } else { 119fa7767c5Sopenharmony_ci UpdateTop(); 120fa7767c5Sopenharmony_ci } 121fa7767c5Sopenharmony_ci pipelineThread_->UnLockJobState(true); 122fa7767c5Sopenharmony_ci MEDIA_LOG_I_FALSE_D(isStateLogEnabled_.load(), "task " PUBLIC_LOG_S " Start done", name_.c_str()); 123fa7767c5Sopenharmony_ci} 124fa7767c5Sopenharmony_ci 125fa7767c5Sopenharmony_civoid TaskInner::Stop() 126fa7767c5Sopenharmony_ci{ 127fa7767c5Sopenharmony_ci if (pipelineThread_->IsRunningInSelf()) { 128fa7767c5Sopenharmony_ci MEDIA_LOG_W(PUBLIC_LOG_S " Stop done in self task", name_.c_str()); 129fa7767c5Sopenharmony_ci runningState_ = RunningState::STOPPED; 130fa7767c5Sopenharmony_ci topProcessUs_ = -1; 131fa7767c5Sopenharmony_ci return; 132fa7767c5Sopenharmony_ci } 133fa7767c5Sopenharmony_ci MEDIA_LOG_I(">> " PUBLIC_LOG_S " Stop", name_.c_str()); 134fa7767c5Sopenharmony_ci AutoLock lock1(jobMutex_); 135fa7767c5Sopenharmony_ci pipelineThread_->LockJobState(); 136fa7767c5Sopenharmony_ci AutoLock lock2(stateMutex_); 137fa7767c5Sopenharmony_ci if (runningState_.load() == RunningState::STOPPED) { 138fa7767c5Sopenharmony_ci pipelineThread_->UnLockJobState(false); 139fa7767c5Sopenharmony_ci return; 140fa7767c5Sopenharmony_ci } 141fa7767c5Sopenharmony_ci runningState_ = RunningState::STOPPED; 142fa7767c5Sopenharmony_ci topProcessUs_ = -1; 143fa7767c5Sopenharmony_ci pipelineThread_->UnLockJobState(true); 144fa7767c5Sopenharmony_ci MEDIA_LOG_I(PUBLIC_LOG_S " Stop <<", name_.c_str()); 145fa7767c5Sopenharmony_ci} 146fa7767c5Sopenharmony_ci 147fa7767c5Sopenharmony_civoid TaskInner::StopAsync() 148fa7767c5Sopenharmony_ci{ 149fa7767c5Sopenharmony_ci if (pipelineThread_->IsRunningInSelf()) { 150fa7767c5Sopenharmony_ci MEDIA_LOG_W(PUBLIC_LOG_S " Stop done in self task", name_.c_str()); 151fa7767c5Sopenharmony_ci runningState_ = RunningState::STOPPED; 152fa7767c5Sopenharmony_ci topProcessUs_ = -1; 153fa7767c5Sopenharmony_ci return; 154fa7767c5Sopenharmony_ci } 155fa7767c5Sopenharmony_ci MEDIA_LOG_I(PUBLIC_LOG_S " StopAsync", name_.c_str()); 156fa7767c5Sopenharmony_ci pipelineThread_->LockJobState(); 157fa7767c5Sopenharmony_ci AutoLock lock(stateMutex_); 158fa7767c5Sopenharmony_ci bool stateChanged = false; 159fa7767c5Sopenharmony_ci if (runningState_.load() != RunningState::STOPPED) { 160fa7767c5Sopenharmony_ci runningState_ = RunningState::STOPPED; 161fa7767c5Sopenharmony_ci topProcessUs_ = -1; 162fa7767c5Sopenharmony_ci stateChanged = true; 163fa7767c5Sopenharmony_ci } 164fa7767c5Sopenharmony_ci pipelineThread_->UnLockJobState(stateChanged); 165fa7767c5Sopenharmony_ci} 166fa7767c5Sopenharmony_ci 167fa7767c5Sopenharmony_civoid TaskInner::Pause() 168fa7767c5Sopenharmony_ci{ 169fa7767c5Sopenharmony_ci if (pipelineThread_->IsRunningInSelf()) { 170fa7767c5Sopenharmony_ci RunningState state = runningState_.load(); 171fa7767c5Sopenharmony_ci if (state == RunningState::STARTED) { 172fa7767c5Sopenharmony_ci MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), 173fa7767c5Sopenharmony_ci PUBLIC_LOG_S " Pause done in self task", name_.c_str()); 174fa7767c5Sopenharmony_ci runningState_ = RunningState::PAUSED; 175fa7767c5Sopenharmony_ci topProcessUs_ = -1; 176fa7767c5Sopenharmony_ci return; 177fa7767c5Sopenharmony_ci } else { 178fa7767c5Sopenharmony_ci MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), 179fa7767c5Sopenharmony_ci PUBLIC_LOG_S " Pause skip in self task, curret State: " PUBLIC_LOG_D32, name_.c_str(), state); 180fa7767c5Sopenharmony_ci return; 181fa7767c5Sopenharmony_ci } 182fa7767c5Sopenharmony_ci } 183fa7767c5Sopenharmony_ci MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause", name_.c_str()); 184fa7767c5Sopenharmony_ci AutoLock lock1(jobMutex_); 185fa7767c5Sopenharmony_ci pipelineThread_->LockJobState(); 186fa7767c5Sopenharmony_ci AutoLock lock2(stateMutex_); 187fa7767c5Sopenharmony_ci RunningState state = runningState_.load(); 188fa7767c5Sopenharmony_ci if (state != RunningState::STARTED) { 189fa7767c5Sopenharmony_ci pipelineThread_->UnLockJobState(false); 190fa7767c5Sopenharmony_ci return; 191fa7767c5Sopenharmony_ci } 192fa7767c5Sopenharmony_ci runningState_ = RunningState::PAUSED; 193fa7767c5Sopenharmony_ci topProcessUs_ = -1; 194fa7767c5Sopenharmony_ci pipelineThread_->UnLockJobState(true); 195fa7767c5Sopenharmony_ci MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause done.", name_.c_str()); 196fa7767c5Sopenharmony_ci} 197fa7767c5Sopenharmony_ci 198fa7767c5Sopenharmony_ci// There is no need to perform notification, as no call would wait for PAUSING state. 199fa7767c5Sopenharmony_ci// If perform notification may cause unnecessasy running when the task is already in PAUSED state. 200fa7767c5Sopenharmony_civoid TaskInner::PauseAsync() 201fa7767c5Sopenharmony_ci{ 202fa7767c5Sopenharmony_ci if (pipelineThread_->IsRunningInSelf()) { 203fa7767c5Sopenharmony_ci RunningState state = runningState_.load(); 204fa7767c5Sopenharmony_ci if (state == RunningState::STARTED) { 205fa7767c5Sopenharmony_ci MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), 206fa7767c5Sopenharmony_ci PUBLIC_LOG_S " PauseAsync done in self task", name_.c_str()); 207fa7767c5Sopenharmony_ci runningState_ = RunningState::PAUSED; 208fa7767c5Sopenharmony_ci topProcessUs_ = -1; 209fa7767c5Sopenharmony_ci return; 210fa7767c5Sopenharmony_ci } else { 211fa7767c5Sopenharmony_ci MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), 212fa7767c5Sopenharmony_ci PUBLIC_LOG_S " PauseAsync skip in self task, curretState:%{public}d", name_.c_str(), state); 213fa7767c5Sopenharmony_ci return; 214fa7767c5Sopenharmony_ci } 215fa7767c5Sopenharmony_ci } 216fa7767c5Sopenharmony_ci MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " PauseAsync", name_.c_str()); 217fa7767c5Sopenharmony_ci pipelineThread_->LockJobState(); 218fa7767c5Sopenharmony_ci AutoLock lock(stateMutex_); 219fa7767c5Sopenharmony_ci bool stateChanged = false; 220fa7767c5Sopenharmony_ci if (runningState_.load() == RunningState::STARTED) { 221fa7767c5Sopenharmony_ci runningState_ = RunningState::PAUSED; 222fa7767c5Sopenharmony_ci topProcessUs_ = -1; 223fa7767c5Sopenharmony_ci stateChanged = true; 224fa7767c5Sopenharmony_ci } 225fa7767c5Sopenharmony_ci pipelineThread_->UnLockJobState(stateChanged); 226fa7767c5Sopenharmony_ci} 227fa7767c5Sopenharmony_ci 228fa7767c5Sopenharmony_civoid TaskInner::RegisterJob(const std::function<int64_t()>& job) 229fa7767c5Sopenharmony_ci{ 230fa7767c5Sopenharmony_ci MEDIA_LOG_I(PUBLIC_LOG_S " RegisterHandler", name_.c_str()); 231fa7767c5Sopenharmony_ci job_ = std::move(job); 232fa7767c5Sopenharmony_ci} 233fa7767c5Sopenharmony_ci 234fa7767c5Sopenharmony_civoid TaskInner::SubmitJobOnce(const std::function<void()>& job, int64_t delayUs, bool wait) 235fa7767c5Sopenharmony_ci{ 236fa7767c5Sopenharmony_ci MEDIA_LOG_D(PUBLIC_LOG_S " SubmitJobOnce", name_.c_str()); 237fa7767c5Sopenharmony_ci int64_t time = InsertJob(job, delayUs, false); 238fa7767c5Sopenharmony_ci if (wait) { 239fa7767c5Sopenharmony_ci AutoLock lock(stateMutex_); 240fa7767c5Sopenharmony_ci replyCond_.Wait(lock, [this, time] { return msgQueue_.find(time) == msgQueue_.end(); }); 241fa7767c5Sopenharmony_ci } 242fa7767c5Sopenharmony_ci} 243fa7767c5Sopenharmony_ci 244fa7767c5Sopenharmony_civoid TaskInner::SubmitJob(const std::function<void()>& job, int64_t delayUs, bool wait) 245fa7767c5Sopenharmony_ci{ 246fa7767c5Sopenharmony_ci MEDIA_LOG_D(PUBLIC_LOG_S " SubmitJob delayUs:%{public}" PRId64, name_.c_str(), delayUs); 247fa7767c5Sopenharmony_ci int64_t time = InsertJob(job, delayUs, true); 248fa7767c5Sopenharmony_ci if (wait) { 249fa7767c5Sopenharmony_ci AutoLock lock(stateMutex_); 250fa7767c5Sopenharmony_ci replyCond_.Wait(lock, [this, time] { return jobQueue_.find(time) == jobQueue_.end(); }); 251fa7767c5Sopenharmony_ci } 252fa7767c5Sopenharmony_ci} 253fa7767c5Sopenharmony_ci 254fa7767c5Sopenharmony_civoid TaskInner::UpdateTop() 255fa7767c5Sopenharmony_ci{ 256fa7767c5Sopenharmony_ci if (msgQueue_.empty() && ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty())) { 257fa7767c5Sopenharmony_ci topProcessUs_ = -1; 258fa7767c5Sopenharmony_ci return; 259fa7767c5Sopenharmony_ci } 260fa7767c5Sopenharmony_ci if (msgQueue_.empty()) { 261fa7767c5Sopenharmony_ci topProcessUs_ = jobQueue_.begin()->first; 262fa7767c5Sopenharmony_ci topIsJob_ = true; 263fa7767c5Sopenharmony_ci } else if ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty()) { 264fa7767c5Sopenharmony_ci topProcessUs_ = msgQueue_.begin()->first; 265fa7767c5Sopenharmony_ci topIsJob_ = false; 266fa7767c5Sopenharmony_ci } else { 267fa7767c5Sopenharmony_ci int64_t msgProcessTime = msgQueue_.begin()->first; 268fa7767c5Sopenharmony_ci int64_t jobProcessTime = jobQueue_.begin()->first; 269fa7767c5Sopenharmony_ci int64_t nowUs = GetNowUs(); 270fa7767c5Sopenharmony_ci if (msgProcessTime <= nowUs || msgProcessTime <= jobProcessTime) { 271fa7767c5Sopenharmony_ci topProcessUs_ = msgProcessTime; 272fa7767c5Sopenharmony_ci topIsJob_ = false; 273fa7767c5Sopenharmony_ci } else { 274fa7767c5Sopenharmony_ci topProcessUs_ = jobProcessTime; 275fa7767c5Sopenharmony_ci topIsJob_ = true; 276fa7767c5Sopenharmony_ci } 277fa7767c5Sopenharmony_ci } 278fa7767c5Sopenharmony_ci} 279fa7767c5Sopenharmony_ci 280fa7767c5Sopenharmony_ciint64_t TaskInner::NextJobUs() 281fa7767c5Sopenharmony_ci{ 282fa7767c5Sopenharmony_ci AutoLock lock(stateMutex_); 283fa7767c5Sopenharmony_ci return topProcessUs_; 284fa7767c5Sopenharmony_ci} 285fa7767c5Sopenharmony_ci 286fa7767c5Sopenharmony_civoid TaskInner::HandleJob() 287fa7767c5Sopenharmony_ci{ 288fa7767c5Sopenharmony_ci AutoLock lock(jobMutex_); 289fa7767c5Sopenharmony_ci if (singleLoop_) { 290fa7767c5Sopenharmony_ci stateMutex_.lock(); 291fa7767c5Sopenharmony_ci int64_t currentTopProcessUs = topProcessUs_; 292fa7767c5Sopenharmony_ci if (runningState_.load() == RunningState::PAUSED || runningState_.load() == RunningState::STOPPED) { 293fa7767c5Sopenharmony_ci topProcessUs_ = -1; 294fa7767c5Sopenharmony_ci stateMutex_.unlock(); 295fa7767c5Sopenharmony_ci return; 296fa7767c5Sopenharmony_ci } 297fa7767c5Sopenharmony_ci stateMutex_.unlock(); 298fa7767c5Sopenharmony_ci int64_t nextDelay = (!job_) ? INVALID_DELAY_TIME_US : job_(); 299fa7767c5Sopenharmony_ci 300fa7767c5Sopenharmony_ci AutoLock lock(stateMutex_); 301fa7767c5Sopenharmony_ci // if topProcessUs_ is -1, we already pause/stop in job_() 302fa7767c5Sopenharmony_ci // if topProcessUs_ is changed, we should ignore the returned delay time. 303fa7767c5Sopenharmony_ci if (topProcessUs_ != -1 && currentTopProcessUs == topProcessUs_) { 304fa7767c5Sopenharmony_ci topProcessUs_ = GetNowUs() + nextDelay; 305fa7767c5Sopenharmony_ci } 306fa7767c5Sopenharmony_ci } else { 307fa7767c5Sopenharmony_ci std::function<void()> nextJob; 308fa7767c5Sopenharmony_ci stateMutex_.lock(); 309fa7767c5Sopenharmony_ci if (topIsJob_) { 310fa7767c5Sopenharmony_ci nextJob = std::move(jobQueue_.begin()->second); 311fa7767c5Sopenharmony_ci jobQueue_.erase(jobQueue_.begin()); 312fa7767c5Sopenharmony_ci } else { 313fa7767c5Sopenharmony_ci nextJob = std::move(msgQueue_.begin()->second); 314fa7767c5Sopenharmony_ci msgQueue_.erase(msgQueue_.begin()); 315fa7767c5Sopenharmony_ci } 316fa7767c5Sopenharmony_ci { 317fa7767c5Sopenharmony_ci stateMutex_.unlock(); 318fa7767c5Sopenharmony_ci nextJob(); 319fa7767c5Sopenharmony_ci replyCond_.NotifyAll(); 320fa7767c5Sopenharmony_ci } 321fa7767c5Sopenharmony_ci AutoLock lock(stateMutex_); 322fa7767c5Sopenharmony_ci UpdateTop(); 323fa7767c5Sopenharmony_ci } 324fa7767c5Sopenharmony_ci} 325fa7767c5Sopenharmony_ci 326fa7767c5Sopenharmony_ciint64_t TaskInner::InsertJob(const std::function<void()>& job, int64_t delayUs, bool inJobQueue) 327fa7767c5Sopenharmony_ci{ 328fa7767c5Sopenharmony_ci pipelineThread_->LockJobState(); 329fa7767c5Sopenharmony_ci AutoLock lock(stateMutex_); 330fa7767c5Sopenharmony_ci int64_t nowUs = GetNowUs(); 331fa7767c5Sopenharmony_ci if (delayUs < 0) { 332fa7767c5Sopenharmony_ci delayUs = 0; 333fa7767c5Sopenharmony_ci } 334fa7767c5Sopenharmony_ci int64_t processTime = nowUs + delayUs; 335fa7767c5Sopenharmony_ci if (inJobQueue) { 336fa7767c5Sopenharmony_ci while (jobQueue_.find(processTime) != jobQueue_.end()) { // To prevent dropping job unexpectedly 337fa7767c5Sopenharmony_ci MEDIA_LOG_W("DUPLICATIVE jobQueue_ TIMESTAMP!!!"); 338fa7767c5Sopenharmony_ci processTime++; 339fa7767c5Sopenharmony_ci } 340fa7767c5Sopenharmony_ci jobQueue_[processTime] = std::move(job); 341fa7767c5Sopenharmony_ci } else { 342fa7767c5Sopenharmony_ci while (msgQueue_.find(processTime) != msgQueue_.end()) { // To prevent dropping job unexpectedly 343fa7767c5Sopenharmony_ci MEDIA_LOG_W("DUPLICATIVE msgQueue_ TIMESTAMP!!!"); 344fa7767c5Sopenharmony_ci processTime++; 345fa7767c5Sopenharmony_ci } 346fa7767c5Sopenharmony_ci msgQueue_[processTime] = std::move(job); 347fa7767c5Sopenharmony_ci } 348fa7767c5Sopenharmony_ci int64_t lastProcessUs = topProcessUs_; 349fa7767c5Sopenharmony_ci if (processTime <= topProcessUs_ || topProcessUs_ == -1) { 350fa7767c5Sopenharmony_ci UpdateTop(); 351fa7767c5Sopenharmony_ci } 352fa7767c5Sopenharmony_ci pipelineThread_->UnLockJobState(lastProcessUs != topProcessUs_); 353fa7767c5Sopenharmony_ci return processTime; 354fa7767c5Sopenharmony_ci} 355fa7767c5Sopenharmony_ci} // namespace Media 356fa7767c5Sopenharmony_ci} // namespace OHOS 357