1/* 2 * Copyright (c) 2024-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#define HST_LOG_TAG "Task" 16#include "osal/task/task.h" 17#include "osal/task/taskInner.h" 18#include "osal/task/thread.h" 19#include "osal/utils/util.h" 20#include "cpp_ext/memory_ext.h" 21#include "common/log.h" 22 23#include <mutex> 24 25namespace { 26constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "PipelineTreadPool" }; 27} 28 29namespace OHOS { 30namespace Media { 31namespace { 32 constexpr int64_t ADJUST_US = 500; 33 constexpr int64_t US_PER_MS = 1000; 34} 35 36static ThreadPriority ConvertPriorityType(TaskPriority priority) 37{ 38 switch (priority) { 39 case TaskPriority::LOW: 40 return ThreadPriority::LOW; 41 case TaskPriority::NORMAL: 42 return ThreadPriority::NORMAL; 43 case TaskPriority::MIDDLE: 44 return ThreadPriority::MIDDLE; 45 case TaskPriority::HIGHEST: 46 return ThreadPriority::HIGHEST; 47 default: 48 return ThreadPriority::HIGH; 49 } 50} 51 52static std::string TaskTypeConvert(TaskType type) 53{ 54 static const std::map<TaskType, std::string> table = { 55 {TaskType::GLOBAL, "G"}, 56 {TaskType::VIDEO, "V"}, 57 {TaskType::AUDIO, "A"}, 58 {TaskType::SUBTITLE, "T"}, 59 {TaskType::SINGLETON, "S"}, 60 }; 61 auto it = table.find(type); 62 if (it != table.end()) { 63 return it->second; 64 } 65 return ""; 66} 67 68static int64_t GetNowUs() 69{ 70 auto now = std::chrono::steady_clock::now(); 71 return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count(); 72} 73 74PipeLineThreadPool& PipeLineThreadPool::GetInstance() 75{ 76 static PipeLineThreadPool instance; 77 return instance; 78} 79 80std::shared_ptr<PipeLineThread> PipeLineThreadPool::FindThread(std::string groupId, TaskType taskType, TaskPriority priority) 81{ 82 AutoLock lock(mutex_); 83 if (workerGroupMap.find(groupId) == workerGroupMap.end()) { 84 workerGroupMap[groupId] = std::make_shared<std::list<std::shared_ptr<PipeLineThread>>>(); 85 } 86 std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList = workerGroupMap[groupId]; 87 for (auto thread : *threadList.get()) { 88 if (thread->type_ == taskType) { 89 return thread; 90 } 91 } 92 std::shared_ptr<PipeLineThread> newThread = std::make_shared<PipeLineThread>(groupId, taskType, priority); 93 threadList->push_back(newThread); 94 return newThread; 95} 96 97void PipeLineThreadPool::DestroyThread(std::string groupId) 98{ 99 MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " destroy", groupId.c_str()); 100 std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList; 101 { 102 AutoLock lock(mutex_); 103 if (workerGroupMap.find(groupId) == workerGroupMap.end()) { 104 MEDIA_LOG_E("PipeLineThread not exist"); 105 return; 106 } 107 threadList = workerGroupMap[groupId]; 108 workerGroupMap.erase(groupId); 109 } 110 for (auto thread : *threadList.get()) { 111 thread->Exit(); 112 } 113} 114 115PipeLineThread::PipeLineThread(std::string groupId, TaskType type, TaskPriority priority) 116 : groupId_(groupId), type_(type) 117{ 118 MEDIA_LOG_I("PipeLineThread name:" PUBLIC_LOG_S " type:%{public}d created call", groupId_.c_str(), type); 119 loop_ = CppExt::make_unique<Thread>(ConvertPriorityType(priority)); 120 name_ = groupId_ + "_" + TaskTypeConvert(type); 121 loop_->SetName(name_); 122 threadExit_ = false; 123 if (loop_->CreateThread([this] { Run(); })) { 124 threadExit_ = false; 125 } else { 126 threadExit_ = true; 127 loop_ = nullptr; 128 MEDIA_LOG_E("PipeLineThread " PUBLIC_LOG_S " create failed", name_.c_str()); 129 } 130} 131 132PipeLineThread::~PipeLineThread() 133{ 134 Exit(); 135} 136 137void PipeLineThread::Exit() 138{ 139 { 140 AutoLock lock(mutex_); 141 FALSE_RETURN_W(!threadExit_.load() && loop_); 142 143 MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " exit", name_.c_str()); 144 threadExit_ = true; 145 syncCond_.NotifyAll(); 146 147 // trigger to quit thread in current running thread, must not wait, 148 // or else the current thread will be suspended and can not quit. 149 if (IsRunningInSelf()) { 150 return; 151 } 152 } 153 // loop_ destroy will wait thread join 154 loop_ = nullptr; 155} 156 157void PipeLineThread::Run() 158{ 159 MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " run", name_.c_str()); 160 while (true) { 161 std::shared_ptr<TaskInner> nextTask; 162 { 163 AutoLock lock(mutex_); 164 if (threadExit_.load()) { 165 break; 166 } 167 int64_t nextJobUs = INT64_MAX; 168 for (auto task: taskList_) { 169 int64_t taskJobUs = task->NextJobUs(); 170 if (taskJobUs == -1) { 171 continue; 172 } 173 if (taskJobUs < nextJobUs) { 174 nextJobUs = taskJobUs; 175 nextTask = task; 176 } 177 } 178 if (nextTask == nullptr) { 179 syncCond_.Wait(lock); 180 continue; 181 } 182 int64_t nowUs = GetNowUs(); 183 if (nextJobUs > (nowUs + ADJUST_US)) { 184 syncCond_.WaitFor(lock, (nextJobUs - nowUs + ADJUST_US) / US_PER_MS); 185 continue; 186 } 187 } 188 nextTask->HandleJob(); 189 } 190} 191 192void PipeLineThread::AddTask(std::shared_ptr<TaskInner> task) 193{ 194 AutoLock lock(mutex_); 195 taskList_.push_back(task); 196} 197 198void PipeLineThread::RemoveTask(std::shared_ptr<TaskInner> task) 199{ 200 { 201 AutoLock lock(mutex_); 202 taskList_.remove(task); 203 FALSE_LOG_MSG(!taskList_.empty(), 204 "PipeLineThread " PUBLIC_LOG_S " remove all Task", name_.c_str()); 205 } 206 if (type_ == TaskType::SINGLETON) { 207 PipeLineThreadPool::GetInstance().DestroyThread(name_); 208 } 209} 210 211void PipeLineThread::LockJobState() 212{ 213 if (IsRunningInSelf()) { 214 return; 215 } 216 mutex_.lock(); 217} 218 219void PipeLineThread::UnLockJobState(bool notifyChange) 220{ 221 if (IsRunningInSelf()) { 222 return; 223 } 224 if (notifyChange) { 225 syncCond_.NotifyAll(); 226 } 227 mutex_.unlock(); 228} 229 230bool PipeLineThread::IsRunningInSelf() 231{ 232 return loop_ ? loop_->IsRunningInSelf() : false; 233} 234} // namespace Media 235} // namespace OHOS 236