1/* 2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#define MEDIA_TASK_THREAD 16#define HST_LOG_TAG "Task" 17#include "osal/task/task.h" 18#include "osal/task/taskInner.h" 19#include "osal/task/thread.h" 20#include "osal/task/pipeline_threadpool.h" 21#include "osal/utils/util.h" 22#include "cpp_ext/memory_ext.h" 23#include "common/log.h" 24 25#include <mutex> 26 27namespace { 28constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "TaskInner" }; 29} 30 31namespace OHOS { 32namespace Media { 33namespace { 34 constexpr int64_t INVALID_DELAY_TIME_US = 10000000; // 10s 35} 36static std::atomic<int32_t> singletonTaskId = 0; 37 38void TaskInner::SleepInTask(unsigned ms) 39{ 40 OSAL::SleepFor(ms); 41} 42 43static int64_t GetNowUs() 44{ 45 auto now = std::chrono::steady_clock::now(); 46 return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count(); 47} 48 49TaskInner::TaskInner(const std::string& name, const std::string& groupId, TaskType type, TaskPriority priority, 50 bool singleLoop) 51 : name_(std::move(name)), runningState_(RunningState::PAUSED), singleLoop_(singleLoop) 52{ 53 MEDIA_LOG_I(">> " PUBLIC_LOG_S " groupId:" PUBLIC_LOG_S " type:%{public}d ctor", 54 name_.c_str(), groupId.c_str(), type); 55 if (type == TaskType::SINGLETON) { 56 std::string newName = name_ + std::to_string(++singletonTaskId); 57 pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(newName, type, priority); 58 } else { 59 pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(groupId, type, priority); 60 } 61} 62 63void TaskInner::Init() 64{ 65 MEDIA_LOG_I(">> " PUBLIC_LOG_S " Init", name_.c_str()); 66 pipelineThread_->AddTask(shared_from_this()); 67} 68 69void TaskInner::DeInit() 70{ 71 MEDIA_LOG_I(PUBLIC_LOG_S " DeInit", name_.c_str()); 72 pipelineThread_->RemoveTask(shared_from_this()); 73 { 74 AutoLock lock1(jobMutex_); 75 AutoLock lock2(stateMutex_); 76 runningState_ = RunningState::STOPPED; 77 topProcessUs_ = -1; 78 } 79 MEDIA_LOG_I(PUBLIC_LOG_S " DeInit done", name_.c_str()); 80} 81 82TaskInner::~TaskInner() 83{ 84 MEDIA_LOG_D(PUBLIC_LOG_S " dtor", name_.c_str()); 85} 86 87void TaskInner::UpdateDelayTime(int64_t delayUs) 88{ 89 if (!singleLoop_) { 90 MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime do nothing", name_.c_str()); 91 return; 92 } 93 MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime enter topProcessUs:" PUBLIC_LOG_D64 94 ", delayUs:" PUBLIC_LOG_D64, name_.c_str(), topProcessUs_, delayUs); 95 pipelineThread_->LockJobState(); 96 AutoLock lock(stateMutex_); 97 if (runningState_ != RunningState::STARTED) { 98 pipelineThread_->UnLockJobState(false); 99 return; 100 } 101 topProcessUs_ = GetNowUs() + delayUs; 102 pipelineThread_->UnLockJobState(true); 103 MEDIA_LOG_D("task " PUBLIC_LOG_S " UpdateDelayTime exit topProcessUs:" PUBLIC_LOG_D64, 104 name_.c_str(), topProcessUs_); 105} 106 107void TaskInner::Start() 108{ 109 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Start", name_.c_str()); 110 pipelineThread_->LockJobState(); 111 AutoLock lock(stateMutex_); 112 runningState_ = RunningState::STARTED; 113 if (singleLoop_) { 114 if (!job_) { 115 MEDIA_LOG_D("task " PUBLIC_LOG_S " Start, job invalid", name_.c_str()); 116 } 117 topProcessUs_ = GetNowUs(); 118 } else { 119 UpdateTop(); 120 } 121 pipelineThread_->UnLockJobState(true); 122 MEDIA_LOG_I_FALSE_D(isStateLogEnabled_.load(), "task " PUBLIC_LOG_S " Start done", name_.c_str()); 123} 124 125void TaskInner::Stop() 126{ 127 if (pipelineThread_->IsRunningInSelf()) { 128 MEDIA_LOG_W(PUBLIC_LOG_S " Stop done in self task", name_.c_str()); 129 runningState_ = RunningState::STOPPED; 130 topProcessUs_ = -1; 131 return; 132 } 133 MEDIA_LOG_I(">> " PUBLIC_LOG_S " Stop", name_.c_str()); 134 AutoLock lock1(jobMutex_); 135 pipelineThread_->LockJobState(); 136 AutoLock lock2(stateMutex_); 137 if (runningState_.load() == RunningState::STOPPED) { 138 pipelineThread_->UnLockJobState(false); 139 return; 140 } 141 runningState_ = RunningState::STOPPED; 142 topProcessUs_ = -1; 143 pipelineThread_->UnLockJobState(true); 144 MEDIA_LOG_I(PUBLIC_LOG_S " Stop <<", name_.c_str()); 145} 146 147void TaskInner::StopAsync() 148{ 149 if (pipelineThread_->IsRunningInSelf()) { 150 MEDIA_LOG_W(PUBLIC_LOG_S " Stop done in self task", name_.c_str()); 151 runningState_ = RunningState::STOPPED; 152 topProcessUs_ = -1; 153 return; 154 } 155 MEDIA_LOG_I(PUBLIC_LOG_S " StopAsync", name_.c_str()); 156 pipelineThread_->LockJobState(); 157 AutoLock lock(stateMutex_); 158 bool stateChanged = false; 159 if (runningState_.load() != RunningState::STOPPED) { 160 runningState_ = RunningState::STOPPED; 161 topProcessUs_ = -1; 162 stateChanged = true; 163 } 164 pipelineThread_->UnLockJobState(stateChanged); 165} 166 167void TaskInner::Pause() 168{ 169 if (pipelineThread_->IsRunningInSelf()) { 170 RunningState state = runningState_.load(); 171 if (state == RunningState::STARTED) { 172 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), 173 PUBLIC_LOG_S " Pause done in self task", name_.c_str()); 174 runningState_ = RunningState::PAUSED; 175 topProcessUs_ = -1; 176 return; 177 } else { 178 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), 179 PUBLIC_LOG_S " Pause skip in self task, curret State: " PUBLIC_LOG_D32, name_.c_str(), state); 180 return; 181 } 182 } 183 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause", name_.c_str()); 184 AutoLock lock1(jobMutex_); 185 pipelineThread_->LockJobState(); 186 AutoLock lock2(stateMutex_); 187 RunningState state = runningState_.load(); 188 if (state != RunningState::STARTED) { 189 pipelineThread_->UnLockJobState(false); 190 return; 191 } 192 runningState_ = RunningState::PAUSED; 193 topProcessUs_ = -1; 194 pipelineThread_->UnLockJobState(true); 195 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause done.", name_.c_str()); 196} 197 198// There is no need to perform notification, as no call would wait for PAUSING state. 199// If perform notification may cause unnecessasy running when the task is already in PAUSED state. 200void TaskInner::PauseAsync() 201{ 202 if (pipelineThread_->IsRunningInSelf()) { 203 RunningState state = runningState_.load(); 204 if (state == RunningState::STARTED) { 205 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), 206 PUBLIC_LOG_S " PauseAsync done in self task", name_.c_str()); 207 runningState_ = RunningState::PAUSED; 208 topProcessUs_ = -1; 209 return; 210 } else { 211 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), 212 PUBLIC_LOG_S " PauseAsync skip in self task, curretState:%{public}d", name_.c_str(), state); 213 return; 214 } 215 } 216 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " PauseAsync", name_.c_str()); 217 pipelineThread_->LockJobState(); 218 AutoLock lock(stateMutex_); 219 bool stateChanged = false; 220 if (runningState_.load() == RunningState::STARTED) { 221 runningState_ = RunningState::PAUSED; 222 topProcessUs_ = -1; 223 stateChanged = true; 224 } 225 pipelineThread_->UnLockJobState(stateChanged); 226} 227 228void TaskInner::RegisterJob(const std::function<int64_t()>& job) 229{ 230 MEDIA_LOG_I(PUBLIC_LOG_S " RegisterHandler", name_.c_str()); 231 job_ = std::move(job); 232} 233 234void TaskInner::SubmitJobOnce(const std::function<void()>& job, int64_t delayUs, bool wait) 235{ 236 MEDIA_LOG_D(PUBLIC_LOG_S " SubmitJobOnce", name_.c_str()); 237 int64_t time = InsertJob(job, delayUs, false); 238 if (wait) { 239 AutoLock lock(stateMutex_); 240 replyCond_.Wait(lock, [this, time] { return msgQueue_.find(time) == msgQueue_.end(); }); 241 } 242} 243 244void TaskInner::SubmitJob(const std::function<void()>& job, int64_t delayUs, bool wait) 245{ 246 MEDIA_LOG_D(PUBLIC_LOG_S " SubmitJob delayUs:%{public}" PRId64, name_.c_str(), delayUs); 247 int64_t time = InsertJob(job, delayUs, true); 248 if (wait) { 249 AutoLock lock(stateMutex_); 250 replyCond_.Wait(lock, [this, time] { return jobQueue_.find(time) == jobQueue_.end(); }); 251 } 252} 253 254void TaskInner::UpdateTop() 255{ 256 if (msgQueue_.empty() && ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty())) { 257 topProcessUs_ = -1; 258 return; 259 } 260 if (msgQueue_.empty()) { 261 topProcessUs_ = jobQueue_.begin()->first; 262 topIsJob_ = true; 263 } else if ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty()) { 264 topProcessUs_ = msgQueue_.begin()->first; 265 topIsJob_ = false; 266 } else { 267 int64_t msgProcessTime = msgQueue_.begin()->first; 268 int64_t jobProcessTime = jobQueue_.begin()->first; 269 int64_t nowUs = GetNowUs(); 270 if (msgProcessTime <= nowUs || msgProcessTime <= jobProcessTime) { 271 topProcessUs_ = msgProcessTime; 272 topIsJob_ = false; 273 } else { 274 topProcessUs_ = jobProcessTime; 275 topIsJob_ = true; 276 } 277 } 278} 279 280int64_t TaskInner::NextJobUs() 281{ 282 AutoLock lock(stateMutex_); 283 return topProcessUs_; 284} 285 286void TaskInner::HandleJob() 287{ 288 AutoLock lock(jobMutex_); 289 if (singleLoop_) { 290 stateMutex_.lock(); 291 int64_t currentTopProcessUs = topProcessUs_; 292 if (runningState_.load() == RunningState::PAUSED || runningState_.load() == RunningState::STOPPED) { 293 topProcessUs_ = -1; 294 stateMutex_.unlock(); 295 return; 296 } 297 stateMutex_.unlock(); 298 int64_t nextDelay = (!job_) ? INVALID_DELAY_TIME_US : job_(); 299 300 AutoLock lock(stateMutex_); 301 // if topProcessUs_ is -1, we already pause/stop in job_() 302 // if topProcessUs_ is changed, we should ignore the returned delay time. 303 if (topProcessUs_ != -1 && currentTopProcessUs == topProcessUs_) { 304 topProcessUs_ = GetNowUs() + nextDelay; 305 } 306 } else { 307 std::function<void()> nextJob; 308 stateMutex_.lock(); 309 if (topIsJob_) { 310 nextJob = std::move(jobQueue_.begin()->second); 311 jobQueue_.erase(jobQueue_.begin()); 312 } else { 313 nextJob = std::move(msgQueue_.begin()->second); 314 msgQueue_.erase(msgQueue_.begin()); 315 } 316 { 317 stateMutex_.unlock(); 318 nextJob(); 319 replyCond_.NotifyAll(); 320 } 321 AutoLock lock(stateMutex_); 322 UpdateTop(); 323 } 324} 325 326int64_t TaskInner::InsertJob(const std::function<void()>& job, int64_t delayUs, bool inJobQueue) 327{ 328 pipelineThread_->LockJobState(); 329 AutoLock lock(stateMutex_); 330 int64_t nowUs = GetNowUs(); 331 if (delayUs < 0) { 332 delayUs = 0; 333 } 334 int64_t processTime = nowUs + delayUs; 335 if (inJobQueue) { 336 while (jobQueue_.find(processTime) != jobQueue_.end()) { // To prevent dropping job unexpectedly 337 MEDIA_LOG_W("DUPLICATIVE jobQueue_ TIMESTAMP!!!"); 338 processTime++; 339 } 340 jobQueue_[processTime] = std::move(job); 341 } else { 342 while (msgQueue_.find(processTime) != msgQueue_.end()) { // To prevent dropping job unexpectedly 343 MEDIA_LOG_W("DUPLICATIVE msgQueue_ TIMESTAMP!!!"); 344 processTime++; 345 } 346 msgQueue_[processTime] = std::move(job); 347 } 348 int64_t lastProcessUs = topProcessUs_; 349 if (processTime <= topProcessUs_ || topProcessUs_ == -1) { 350 UpdateTop(); 351 } 352 pipelineThread_->UnLockJobState(lastProcessUs != topProcessUs_); 353 return processTime; 354} 355} // namespace Media 356} // namespace OHOS 357