1/* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15#include "thread_pool.h" 16#include <cstring> 17#include "script_utils.h" 18#include <unistd.h> 19 20namespace Uscript { 21static thread_local float g_scriptProportion = 1.0f; 22static ThreadPool* g_threadPool = nullptr; 23static std::mutex g_initMutex; 24 25void SetScriptProportion(float proportion) 26{ 27 g_scriptProportion = proportion; 28} 29 30float GetScriptProportion() 31{ 32 return g_scriptProportion; 33} 34 35ThreadPool* ThreadPool::CreateThreadPool(int number) 36{ 37 std::lock_guard<std::mutex> lock(g_initMutex); 38 if (g_threadPool != nullptr) { 39 return g_threadPool; 40 } 41 g_threadPool = new ThreadPool(); 42 g_threadPool->Init(number); 43 return g_threadPool; 44} 45 46void ThreadPool::Destroy() 47{ 48 if (g_threadPool == nullptr) { 49 return; 50 } 51 std::lock_guard<std::mutex> lock(g_initMutex); 52 delete g_threadPool; 53 g_threadPool = nullptr; 54} 55 56void ThreadPool::Init(int32_t number) 57{ 58 threadNumber_ = number; 59 taskQueue_.resize(threadPoolMaxTasks); 60 for (size_t t = 0; t < taskQueue_.size(); ++t) { 61 taskQueue_[t].available = true; 62 for (int32_t i = 0; i < threadNumber_; ++i) { 63 taskQueue_[t].subTaskFlag.emplace_back(new std::atomic_bool { false }); 64 } 65 } 66 // Create workers 67 for (int32_t threadIndex = 0; threadIndex < threadNumber_; ++threadIndex) { 68 workers_.emplace_back(std::thread([this, threadIndex] { 69 ThreadExecute(this, threadIndex); 70 })); 71 } 72} 73 74void ThreadPool::ThreadRun(int32_t threadIndex) 75{ 76 USCRIPT_LOGI("Create new thread successfully, tid: %d", gettid()); 77 while (!stop_) { 78 for (int32_t k = 0; k < threadPoolMaxTasks; ++k) { 79 if (*taskQueue_[k].subTaskFlag[threadIndex]) { 80 taskQueue_[k].task.processor(threadIndex); 81 *taskQueue_[k].subTaskFlag[threadIndex] = false; 82 } 83 } 84 std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms 85 } 86} 87 88ThreadPool::~ThreadPool() 89{ 90 { 91 std::lock_guard<std::mutex> lock(queueMutex_); 92 stop_ = true; 93 } 94 for (auto& worker : workers_) { 95 worker.join(); 96 } 97 for (auto& task : taskQueue_) { 98 for (auto c : task.subTaskFlag) { 99 delete c; 100 } 101 } 102} 103 104void ThreadPool::AddTask(Task &&task) 105{ 106 if (g_threadPool != nullptr) { 107 g_threadPool->AddNewTask(std::move(task)); 108 } 109} 110 111void ThreadPool::AddNewTask(Task &&task) 112{ 113 int32_t index = AcquireWorkIndex(); 114 if (index < 0) { 115 USCRIPT_LOGI("ThreadPool::AddNewTask Failed"); 116 return; 117 } 118 119 RunTask(std::move(task), index); 120 // Works done. make this task available 121 std::lock_guard<std::mutex> lock(queueMutex_); 122 taskQueue_[index].available = true; 123} 124 125int32_t ThreadPool::AcquireWorkIndex() 126{ 127 std::lock_guard<std::mutex> lock(queueMutex_); 128 for (int32_t i = 0; i < threadPoolMaxTasks; ++i) { 129 if (taskQueue_[i].available) { 130 taskQueue_[i].available = false; 131 return i; 132 } 133 } 134 return -1; 135} 136 137void ThreadPool::RunTask(Task &&task, int32_t index) 138{ 139 int32_t workSize = task.workSize; 140 taskQueue_[index].task = std::move(task); 141 // Mark each task should be executed 142 int32_t num = workSize > threadNumber_ ? threadNumber_ : workSize; 143 for (int32_t i = 0; i < num; ++i) { 144 *taskQueue_[index].subTaskFlag[i] = true; 145 } 146 147 bool complete = true; 148 do { 149 std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms 150 complete = true; 151 // 检查是否所有子任务执行结束 152 for (int32_t i = 0; i < num; ++i) { 153 if (*taskQueue_[index].subTaskFlag[i]) { 154 complete = false; 155 break; 156 } 157 } 158 } while (!complete); 159} 160} // namespace Uscript 161