11e934351Sopenharmony_ci/* 21e934351Sopenharmony_ci * Copyright (c) 2022 Huawei Device Co., Ltd. 31e934351Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 41e934351Sopenharmony_ci * you may not use this file except in compliance with the License. 51e934351Sopenharmony_ci * You may obtain a copy of the License at 61e934351Sopenharmony_ci * 71e934351Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 81e934351Sopenharmony_ci * 91e934351Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software 101e934351Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 111e934351Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 121e934351Sopenharmony_ci * See the License for the specific language governing permissions and 131e934351Sopenharmony_ci * limitations under the License. 141e934351Sopenharmony_ci */ 151e934351Sopenharmony_ci 161e934351Sopenharmony_ci#ifndef NETSTACK_THREAD_POOL 171e934351Sopenharmony_ci#define NETSTACK_THREAD_POOL 181e934351Sopenharmony_ci 191e934351Sopenharmony_ci#include <atomic> 201e934351Sopenharmony_ci#include <condition_variable> 211e934351Sopenharmony_ci#include <queue> 221e934351Sopenharmony_ci#include <thread> 231e934351Sopenharmony_ci#include <vector> 241e934351Sopenharmony_ci 251e934351Sopenharmony_cinamespace OHOS::NetStack { 261e934351Sopenharmony_citemplate <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool { 271e934351Sopenharmony_cipublic: 281e934351Sopenharmony_ci /** 291e934351Sopenharmony_ci * disallow default constructor 301e934351Sopenharmony_ci */ 311e934351Sopenharmony_ci ThreadPool() = delete; 321e934351Sopenharmony_ci 331e934351Sopenharmony_ci /** 341e934351Sopenharmony_ci * disallow copy and move 351e934351Sopenharmony_ci */ 361e934351Sopenharmony_ci ThreadPool(const ThreadPool &) = delete; 371e934351Sopenharmony_ci 381e934351Sopenharmony_ci /** 391e934351Sopenharmony_ci * disallow copy and move 401e934351Sopenharmony_ci */ 411e934351Sopenharmony_ci ThreadPool &operator=(const ThreadPool &) = delete; 421e934351Sopenharmony_ci 431e934351Sopenharmony_ci /** 441e934351Sopenharmony_ci * disallow copy and move 451e934351Sopenharmony_ci */ 461e934351Sopenharmony_ci ThreadPool(ThreadPool &&) = delete; 471e934351Sopenharmony_ci 481e934351Sopenharmony_ci /** 491e934351Sopenharmony_ci * disallow copy and move 501e934351Sopenharmony_ci */ 511e934351Sopenharmony_ci ThreadPool &operator=(ThreadPool &&) = delete; 521e934351Sopenharmony_ci 531e934351Sopenharmony_ci /** 541e934351Sopenharmony_ci * make DEFAULT_THREAD_NUM threads 551e934351Sopenharmony_ci * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated 561e934351Sopenharmony_ci */ 571e934351Sopenharmony_ci explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true) 581e934351Sopenharmony_ci { 591e934351Sopenharmony_ci for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) { 601e934351Sopenharmony_ci std::thread([this] { RunTask(); }).detach(); 611e934351Sopenharmony_ci } 621e934351Sopenharmony_ci } 631e934351Sopenharmony_ci 641e934351Sopenharmony_ci /** 651e934351Sopenharmony_ci * if ~ThreadPool, terminate all thread 661e934351Sopenharmony_ci */ 671e934351Sopenharmony_ci ~ThreadPool() 681e934351Sopenharmony_ci { 691e934351Sopenharmony_ci // set needRun_ = false, and notify all the thread to wake and terminate 701e934351Sopenharmony_ci needRun_ = false; 711e934351Sopenharmony_ci while (runningNum_ > 0) { 721e934351Sopenharmony_ci needRunCondition_.notify_all(); 731e934351Sopenharmony_ci } 741e934351Sopenharmony_ci } 751e934351Sopenharmony_ci 761e934351Sopenharmony_ci /** 771e934351Sopenharmony_ci * push it to taskQueue_ and notify a thread to run it 781e934351Sopenharmony_ci * @param task new task to Execute 791e934351Sopenharmony_ci */ 801e934351Sopenharmony_ci void Push(const Task &task) 811e934351Sopenharmony_ci { 821e934351Sopenharmony_ci PushTask(task); 831e934351Sopenharmony_ci 841e934351Sopenharmony_ci if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) { 851e934351Sopenharmony_ci std::thread([this] { RunTask(); }).detach(); 861e934351Sopenharmony_ci } 871e934351Sopenharmony_ci 881e934351Sopenharmony_ci needRunCondition_.notify_all(); 891e934351Sopenharmony_ci } 901e934351Sopenharmony_ci 911e934351Sopenharmony_ciprivate: 921e934351Sopenharmony_ci bool IsQueueEmpty() 931e934351Sopenharmony_ci { 941e934351Sopenharmony_ci std::lock_guard<std::mutex> guard(mutex_); 951e934351Sopenharmony_ci return taskQueue_.empty(); 961e934351Sopenharmony_ci } 971e934351Sopenharmony_ci 981e934351Sopenharmony_ci bool GetTask(Task &task) 991e934351Sopenharmony_ci { 1001e934351Sopenharmony_ci std::lock_guard<std::mutex> guard(mutex_); 1011e934351Sopenharmony_ci 1021e934351Sopenharmony_ci // if taskQueue_ is empty, means timeout 1031e934351Sopenharmony_ci if (taskQueue_.empty()) { 1041e934351Sopenharmony_ci return false; 1051e934351Sopenharmony_ci } 1061e934351Sopenharmony_ci 1071e934351Sopenharmony_ci // if run to this line, means that taskQueue_ is not empty 1081e934351Sopenharmony_ci task = taskQueue_.top(); 1091e934351Sopenharmony_ci taskQueue_.pop(); 1101e934351Sopenharmony_ci return true; 1111e934351Sopenharmony_ci } 1121e934351Sopenharmony_ci 1131e934351Sopenharmony_ci void PushTask(const Task &task) 1141e934351Sopenharmony_ci { 1151e934351Sopenharmony_ci std::lock_guard<std::mutex> guard(mutex_); 1161e934351Sopenharmony_ci taskQueue_.push(task); 1171e934351Sopenharmony_ci } 1181e934351Sopenharmony_ci 1191e934351Sopenharmony_ci class NumWrapper { 1201e934351Sopenharmony_ci public: 1211e934351Sopenharmony_ci NumWrapper() = delete; 1221e934351Sopenharmony_ci 1231e934351Sopenharmony_ci explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num) 1241e934351Sopenharmony_ci { 1251e934351Sopenharmony_ci ++num_; 1261e934351Sopenharmony_ci } 1271e934351Sopenharmony_ci 1281e934351Sopenharmony_ci ~NumWrapper() 1291e934351Sopenharmony_ci { 1301e934351Sopenharmony_ci --num_; 1311e934351Sopenharmony_ci } 1321e934351Sopenharmony_ci 1331e934351Sopenharmony_ci private: 1341e934351Sopenharmony_ci std::atomic<uint32_t> &num_; 1351e934351Sopenharmony_ci }; 1361e934351Sopenharmony_ci 1371e934351Sopenharmony_ci void Sleep() 1381e934351Sopenharmony_ci { 1391e934351Sopenharmony_ci std::mutex needRunMutex; 1401e934351Sopenharmony_ci std::unique_lock<std::mutex> lock(needRunMutex); 1411e934351Sopenharmony_ci 1421e934351Sopenharmony_ci /** 1431e934351Sopenharmony_ci * if the thread is waiting, it is idle 1441e934351Sopenharmony_ci * if wake up, this thread is not idle: 1451e934351Sopenharmony_ci * 1 this thread should return 1461e934351Sopenharmony_ci * 2 this thread should run task 1471e934351Sopenharmony_ci * 3 this thread should go to next loop 1481e934351Sopenharmony_ci */ 1491e934351Sopenharmony_ci NumWrapper idleWrapper(idleThreadNum_); 1501e934351Sopenharmony_ci (void)idleWrapper; 1511e934351Sopenharmony_ci 1521e934351Sopenharmony_ci needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_), 1531e934351Sopenharmony_ci [this] { return !needRun_ || !IsQueueEmpty(); }); 1541e934351Sopenharmony_ci } 1551e934351Sopenharmony_ci 1561e934351Sopenharmony_ci void RunTask() 1571e934351Sopenharmony_ci { 1581e934351Sopenharmony_ci NumWrapper runningWrapper(runningNum_); 1591e934351Sopenharmony_ci (void)runningWrapper; 1601e934351Sopenharmony_ci 1611e934351Sopenharmony_ci while (needRun_) { 1621e934351Sopenharmony_ci Task task; 1631e934351Sopenharmony_ci if (GetTask(task)) { 1641e934351Sopenharmony_ci task.Execute(); 1651e934351Sopenharmony_ci continue; 1661e934351Sopenharmony_ci } 1671e934351Sopenharmony_ci 1681e934351Sopenharmony_ci Sleep(); 1691e934351Sopenharmony_ci 1701e934351Sopenharmony_ci if (!needRun_) { 1711e934351Sopenharmony_ci return; 1721e934351Sopenharmony_ci } 1731e934351Sopenharmony_ci 1741e934351Sopenharmony_ci if (GetTask(task)) { 1751e934351Sopenharmony_ci task.Execute(); 1761e934351Sopenharmony_ci continue; 1771e934351Sopenharmony_ci } 1781e934351Sopenharmony_ci 1791e934351Sopenharmony_ci if (runningNum_ > DEFAULT_THREAD_NUM) { 1801e934351Sopenharmony_ci return; 1811e934351Sopenharmony_ci } 1821e934351Sopenharmony_ci } 1831e934351Sopenharmony_ci } 1841e934351Sopenharmony_ci 1851e934351Sopenharmony_ciprivate: 1861e934351Sopenharmony_ci /** 1871e934351Sopenharmony_ci * other thread put a task to the taskQueue_ 1881e934351Sopenharmony_ci */ 1891e934351Sopenharmony_ci std::mutex mutex_; 1901e934351Sopenharmony_ci std::priority_queue<Task> taskQueue_; 1911e934351Sopenharmony_ci /** 1921e934351Sopenharmony_ci * 1 terminate the thread if it is idle for timeout_ seconds 1931e934351Sopenharmony_ci * 2 wait for the thread started util timeout_ 1941e934351Sopenharmony_ci * 3 wait for the thread notified util timeout_ 1951e934351Sopenharmony_ci * 4 wait for the thread terminated util timeout_ 1961e934351Sopenharmony_ci */ 1971e934351Sopenharmony_ci uint32_t timeout_; 1981e934351Sopenharmony_ci /** 1991e934351Sopenharmony_ci * if idleThreadNum_ is zero, make a new thread 2001e934351Sopenharmony_ci */ 2011e934351Sopenharmony_ci std::atomic<uint32_t> idleThreadNum_; 2021e934351Sopenharmony_ci /** 2031e934351Sopenharmony_ci * when ThreadPool object is deleted, wait until runningNum_ is zero. 2041e934351Sopenharmony_ci */ 2051e934351Sopenharmony_ci std::atomic<uint32_t> runningNum_; 2061e934351Sopenharmony_ci /** 2071e934351Sopenharmony_ci * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated 2081e934351Sopenharmony_ci */ 2091e934351Sopenharmony_ci std::atomic_bool needRun_; 2101e934351Sopenharmony_ci std::condition_variable needRunCondition_; 2111e934351Sopenharmony_ci}; 2121e934351Sopenharmony_ci} // namespace OHOS::NetStack 2131e934351Sopenharmony_ci#endif /* NETSTACK_THREAD_POOL */ 214