1fb299fa2Sopenharmony_ci/* 2fb299fa2Sopenharmony_ci * Copyright (c) 2021 Huawei Device Co., Ltd. 3fb299fa2Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 4fb299fa2Sopenharmony_ci * you may not use this file except in compliance with the License. 5fb299fa2Sopenharmony_ci * You may obtain a copy of the License at 6fb299fa2Sopenharmony_ci * 7fb299fa2Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 8fb299fa2Sopenharmony_ci * 9fb299fa2Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 10fb299fa2Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 11fb299fa2Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12fb299fa2Sopenharmony_ci * See the License for the specific language governing permissions and 13fb299fa2Sopenharmony_ci * limitations under the License. 14fb299fa2Sopenharmony_ci */ 15fb299fa2Sopenharmony_ci#include "thread_pool.h" 16fb299fa2Sopenharmony_ci#include <cstring> 17fb299fa2Sopenharmony_ci#include "script_utils.h" 18fb299fa2Sopenharmony_ci#include <unistd.h> 19fb299fa2Sopenharmony_ci 20fb299fa2Sopenharmony_cinamespace Uscript { 21fb299fa2Sopenharmony_cistatic thread_local float g_scriptProportion = 1.0f; 22fb299fa2Sopenharmony_cistatic ThreadPool* g_threadPool = nullptr; 23fb299fa2Sopenharmony_cistatic std::mutex g_initMutex; 24fb299fa2Sopenharmony_ci 25fb299fa2Sopenharmony_civoid SetScriptProportion(float proportion) 26fb299fa2Sopenharmony_ci{ 27fb299fa2Sopenharmony_ci g_scriptProportion = proportion; 28fb299fa2Sopenharmony_ci} 29fb299fa2Sopenharmony_ci 30fb299fa2Sopenharmony_cifloat GetScriptProportion() 31fb299fa2Sopenharmony_ci{ 32fb299fa2Sopenharmony_ci return g_scriptProportion; 33fb299fa2Sopenharmony_ci} 34fb299fa2Sopenharmony_ci 35fb299fa2Sopenharmony_ciThreadPool* ThreadPool::CreateThreadPool(int number) 36fb299fa2Sopenharmony_ci{ 37fb299fa2Sopenharmony_ci std::lock_guard<std::mutex> lock(g_initMutex); 38fb299fa2Sopenharmony_ci if (g_threadPool != nullptr) { 39fb299fa2Sopenharmony_ci return g_threadPool; 40fb299fa2Sopenharmony_ci } 41fb299fa2Sopenharmony_ci g_threadPool = new ThreadPool(); 42fb299fa2Sopenharmony_ci g_threadPool->Init(number); 43fb299fa2Sopenharmony_ci return g_threadPool; 44fb299fa2Sopenharmony_ci} 45fb299fa2Sopenharmony_ci 46fb299fa2Sopenharmony_civoid ThreadPool::Destroy() 47fb299fa2Sopenharmony_ci{ 48fb299fa2Sopenharmony_ci if (g_threadPool == nullptr) { 49fb299fa2Sopenharmony_ci return; 50fb299fa2Sopenharmony_ci } 51fb299fa2Sopenharmony_ci std::lock_guard<std::mutex> lock(g_initMutex); 52fb299fa2Sopenharmony_ci delete g_threadPool; 53fb299fa2Sopenharmony_ci g_threadPool = nullptr; 54fb299fa2Sopenharmony_ci} 55fb299fa2Sopenharmony_ci 56fb299fa2Sopenharmony_civoid ThreadPool::Init(int32_t number) 57fb299fa2Sopenharmony_ci{ 58fb299fa2Sopenharmony_ci threadNumber_ = number; 59fb299fa2Sopenharmony_ci taskQueue_.resize(threadPoolMaxTasks); 60fb299fa2Sopenharmony_ci for (size_t t = 0; t < taskQueue_.size(); ++t) { 61fb299fa2Sopenharmony_ci taskQueue_[t].available = true; 62fb299fa2Sopenharmony_ci for (int32_t i = 0; i < threadNumber_; ++i) { 63fb299fa2Sopenharmony_ci taskQueue_[t].subTaskFlag.emplace_back(new std::atomic_bool { false }); 64fb299fa2Sopenharmony_ci } 65fb299fa2Sopenharmony_ci } 66fb299fa2Sopenharmony_ci // Create workers 67fb299fa2Sopenharmony_ci for (int32_t threadIndex = 0; threadIndex < threadNumber_; ++threadIndex) { 68fb299fa2Sopenharmony_ci workers_.emplace_back(std::thread([this, threadIndex] { 69fb299fa2Sopenharmony_ci ThreadExecute(this, threadIndex); 70fb299fa2Sopenharmony_ci })); 71fb299fa2Sopenharmony_ci } 72fb299fa2Sopenharmony_ci} 73fb299fa2Sopenharmony_ci 74fb299fa2Sopenharmony_civoid ThreadPool::ThreadRun(int32_t threadIndex) 75fb299fa2Sopenharmony_ci{ 76fb299fa2Sopenharmony_ci USCRIPT_LOGI("Create new thread successfully, tid: %d", gettid()); 77fb299fa2Sopenharmony_ci while (!stop_) { 78fb299fa2Sopenharmony_ci for (int32_t k = 0; k < threadPoolMaxTasks; ++k) { 79fb299fa2Sopenharmony_ci if (*taskQueue_[k].subTaskFlag[threadIndex]) { 80fb299fa2Sopenharmony_ci taskQueue_[k].task.processor(threadIndex); 81fb299fa2Sopenharmony_ci *taskQueue_[k].subTaskFlag[threadIndex] = false; 82fb299fa2Sopenharmony_ci } 83fb299fa2Sopenharmony_ci } 84fb299fa2Sopenharmony_ci std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms 85fb299fa2Sopenharmony_ci } 86fb299fa2Sopenharmony_ci} 87fb299fa2Sopenharmony_ci 88fb299fa2Sopenharmony_ciThreadPool::~ThreadPool() 89fb299fa2Sopenharmony_ci{ 90fb299fa2Sopenharmony_ci { 91fb299fa2Sopenharmony_ci std::lock_guard<std::mutex> lock(queueMutex_); 92fb299fa2Sopenharmony_ci stop_ = true; 93fb299fa2Sopenharmony_ci } 94fb299fa2Sopenharmony_ci for (auto& worker : workers_) { 95fb299fa2Sopenharmony_ci worker.join(); 96fb299fa2Sopenharmony_ci } 97fb299fa2Sopenharmony_ci for (auto& task : taskQueue_) { 98fb299fa2Sopenharmony_ci for (auto c : task.subTaskFlag) { 99fb299fa2Sopenharmony_ci delete c; 100fb299fa2Sopenharmony_ci } 101fb299fa2Sopenharmony_ci } 102fb299fa2Sopenharmony_ci} 103fb299fa2Sopenharmony_ci 104fb299fa2Sopenharmony_civoid ThreadPool::AddTask(Task &&task) 105fb299fa2Sopenharmony_ci{ 106fb299fa2Sopenharmony_ci if (g_threadPool != nullptr) { 107fb299fa2Sopenharmony_ci g_threadPool->AddNewTask(std::move(task)); 108fb299fa2Sopenharmony_ci } 109fb299fa2Sopenharmony_ci} 110fb299fa2Sopenharmony_ci 111fb299fa2Sopenharmony_civoid ThreadPool::AddNewTask(Task &&task) 112fb299fa2Sopenharmony_ci{ 113fb299fa2Sopenharmony_ci int32_t index = AcquireWorkIndex(); 114fb299fa2Sopenharmony_ci if (index < 0) { 115fb299fa2Sopenharmony_ci USCRIPT_LOGI("ThreadPool::AddNewTask Failed"); 116fb299fa2Sopenharmony_ci return; 117fb299fa2Sopenharmony_ci } 118fb299fa2Sopenharmony_ci 119fb299fa2Sopenharmony_ci RunTask(std::move(task), index); 120fb299fa2Sopenharmony_ci // Works done. make this task available 121fb299fa2Sopenharmony_ci std::lock_guard<std::mutex> lock(queueMutex_); 122fb299fa2Sopenharmony_ci taskQueue_[index].available = true; 123fb299fa2Sopenharmony_ci} 124fb299fa2Sopenharmony_ci 125fb299fa2Sopenharmony_ciint32_t ThreadPool::AcquireWorkIndex() 126fb299fa2Sopenharmony_ci{ 127fb299fa2Sopenharmony_ci std::lock_guard<std::mutex> lock(queueMutex_); 128fb299fa2Sopenharmony_ci for (int32_t i = 0; i < threadPoolMaxTasks; ++i) { 129fb299fa2Sopenharmony_ci if (taskQueue_[i].available) { 130fb299fa2Sopenharmony_ci taskQueue_[i].available = false; 131fb299fa2Sopenharmony_ci return i; 132fb299fa2Sopenharmony_ci } 133fb299fa2Sopenharmony_ci } 134fb299fa2Sopenharmony_ci return -1; 135fb299fa2Sopenharmony_ci} 136fb299fa2Sopenharmony_ci 137fb299fa2Sopenharmony_civoid ThreadPool::RunTask(Task &&task, int32_t index) 138fb299fa2Sopenharmony_ci{ 139fb299fa2Sopenharmony_ci int32_t workSize = task.workSize; 140fb299fa2Sopenharmony_ci taskQueue_[index].task = std::move(task); 141fb299fa2Sopenharmony_ci // Mark each task should be executed 142fb299fa2Sopenharmony_ci int32_t num = workSize > threadNumber_ ? threadNumber_ : workSize; 143fb299fa2Sopenharmony_ci for (int32_t i = 0; i < num; ++i) { 144fb299fa2Sopenharmony_ci *taskQueue_[index].subTaskFlag[i] = true; 145fb299fa2Sopenharmony_ci } 146fb299fa2Sopenharmony_ci 147fb299fa2Sopenharmony_ci bool complete = true; 148fb299fa2Sopenharmony_ci do { 149fb299fa2Sopenharmony_ci std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms 150fb299fa2Sopenharmony_ci complete = true; 151fb299fa2Sopenharmony_ci // 检查是否所有子任务执行结束 152fb299fa2Sopenharmony_ci for (int32_t i = 0; i < num; ++i) { 153fb299fa2Sopenharmony_ci if (*taskQueue_[index].subTaskFlag[i]) { 154fb299fa2Sopenharmony_ci complete = false; 155fb299fa2Sopenharmony_ci break; 156fb299fa2Sopenharmony_ci } 157fb299fa2Sopenharmony_ci } 158fb299fa2Sopenharmony_ci } while (!complete); 159fb299fa2Sopenharmony_ci} 160fb299fa2Sopenharmony_ci} // namespace Uscript 161