1/* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "thread_pool.h" 17#include "errors.h" 18#include "utils_log.h" 19 20#include <memory> 21#include <pthread.h> 22 23namespace OHOS { 24 25ThreadPool::ThreadPool(const std::string& name) 26 : myName_(name), maxTaskNum_(0), running_(false) 27{ 28} 29 30ThreadPool::~ThreadPool() 31{ 32 if (running_) { 33 Stop(); 34 } 35} 36 37uint32_t ThreadPool::Start(int numThreads) 38{ 39 if (!threads_.empty()) { 40 return ERR_INVALID_OPERATION; 41 } 42 43 if (numThreads <= 0) { 44 return ERR_INVALID_VALUE; 45 } 46 running_ = true; 47 threads_.reserve(numThreads); 48 49 for (int i = 0; i < numThreads; ++i) { 50 std::thread t([this] { this->WorkInThread(); }); 51 // Give the name of ThreadPool to threads created by the ThreadPool. 52 int err = pthread_setname_np(t.native_handle(), (myName_ + std::to_string(i)).c_str()); 53 if (err != 0) { 54 UTILS_LOGD("Failed to set name to thread. %{public}s", strerror(err)); 55 } 56 threads_.push_back(std::move(t)); 57 } 58 return ERR_OK; 59} 60 61void ThreadPool::Stop() 62{ 63 { 64 std::unique_lock<std::mutex> lock(mutex_); 65 running_ = false; 66 hasTaskToDo_.notify_all(); 67 } 68 69 for (auto& e : threads_) { 70 e.join(); 71 } 72} 73 74void ThreadPool::AddTask(const Task &f) 75{ 76 if (threads_.empty()) { 77 f(); 78 } else { 79 std::unique_lock<std::mutex> lock(mutex_); 80 while (Overloaded()) { 81 acceptNewTask_.wait(lock); 82 } 83 84 tasks_.push_back(f); 85 hasTaskToDo_.notify_one(); 86 } 87} 88 89size_t ThreadPool::GetCurTaskNum() 90{ 91 std::unique_lock<std::mutex> lock(mutex_); 92 return tasks_.size(); 93} 94 95 96ThreadPool::Task ThreadPool::ScheduleTask() 97{ 98 std::unique_lock<std::mutex> lock(mutex_); 99 while (tasks_.empty() && running_) { 100 hasTaskToDo_.wait(lock); 101 } 102 103 Task task; 104 if (!tasks_.empty()) { 105 task = tasks_.front(); 106 tasks_.pop_front(); 107 108 if (maxTaskNum_ > 0) { 109 acceptNewTask_.notify_one(); 110 } 111 } 112 return task; 113} 114 115bool ThreadPool::Overloaded() const 116{ 117 return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_); 118} 119 120void ThreadPool::WorkInThread() 121{ 122 while (running_) { 123 Task task = ScheduleTask(); 124 if (task) { 125 task(); 126 } 127 } 128} 129 130} // namespace OHOS 131