11e934351Sopenharmony_ci/*
21e934351Sopenharmony_ci * Copyright (c) 2022 Huawei Device Co., Ltd.
31e934351Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
41e934351Sopenharmony_ci * you may not use this file except in compliance with the License.
51e934351Sopenharmony_ci * You may obtain a copy of the License at
61e934351Sopenharmony_ci *
71e934351Sopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
81e934351Sopenharmony_ci *
91e934351Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
101e934351Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
111e934351Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
121e934351Sopenharmony_ci * See the License for the specific language governing permissions and
131e934351Sopenharmony_ci * limitations under the License.
141e934351Sopenharmony_ci */
151e934351Sopenharmony_ci
161e934351Sopenharmony_ci#ifndef NETSTACK_THREAD_POOL
171e934351Sopenharmony_ci#define NETSTACK_THREAD_POOL
181e934351Sopenharmony_ci
191e934351Sopenharmony_ci#include <atomic>
201e934351Sopenharmony_ci#include <condition_variable>
211e934351Sopenharmony_ci#include <queue>
221e934351Sopenharmony_ci#include <thread>
231e934351Sopenharmony_ci#include <vector>
241e934351Sopenharmony_ci
251e934351Sopenharmony_cinamespace OHOS::NetStack {
261e934351Sopenharmony_citemplate <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool {
271e934351Sopenharmony_cipublic:
281e934351Sopenharmony_ci    /**
291e934351Sopenharmony_ci     * disallow default constructor
301e934351Sopenharmony_ci     */
311e934351Sopenharmony_ci    ThreadPool() = delete;
321e934351Sopenharmony_ci
331e934351Sopenharmony_ci    /**
341e934351Sopenharmony_ci     * disallow copy and move
351e934351Sopenharmony_ci     */
361e934351Sopenharmony_ci    ThreadPool(const ThreadPool &) = delete;
371e934351Sopenharmony_ci
381e934351Sopenharmony_ci    /**
391e934351Sopenharmony_ci     * disallow copy and move
401e934351Sopenharmony_ci     */
411e934351Sopenharmony_ci    ThreadPool &operator=(const ThreadPool &) = delete;
421e934351Sopenharmony_ci
431e934351Sopenharmony_ci    /**
441e934351Sopenharmony_ci     * disallow copy and move
451e934351Sopenharmony_ci     */
461e934351Sopenharmony_ci    ThreadPool(ThreadPool &&) = delete;
471e934351Sopenharmony_ci
481e934351Sopenharmony_ci    /**
491e934351Sopenharmony_ci     * disallow copy and move
501e934351Sopenharmony_ci     */
511e934351Sopenharmony_ci    ThreadPool &operator=(ThreadPool &&) = delete;
521e934351Sopenharmony_ci
531e934351Sopenharmony_ci    /**
541e934351Sopenharmony_ci     * make DEFAULT_THREAD_NUM threads
551e934351Sopenharmony_ci     * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated
561e934351Sopenharmony_ci     */
571e934351Sopenharmony_ci    explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true)
581e934351Sopenharmony_ci    {
591e934351Sopenharmony_ci        for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) {
601e934351Sopenharmony_ci            std::thread([this] { RunTask(); }).detach();
611e934351Sopenharmony_ci        }
621e934351Sopenharmony_ci    }
631e934351Sopenharmony_ci
641e934351Sopenharmony_ci    /**
651e934351Sopenharmony_ci     * if ~ThreadPool, terminate all thread
661e934351Sopenharmony_ci     */
671e934351Sopenharmony_ci    ~ThreadPool()
681e934351Sopenharmony_ci    {
691e934351Sopenharmony_ci        // set needRun_ = false, and notify all the thread to wake and terminate
701e934351Sopenharmony_ci        needRun_ = false;
711e934351Sopenharmony_ci        while (runningNum_ > 0) {
721e934351Sopenharmony_ci            needRunCondition_.notify_all();
731e934351Sopenharmony_ci        }
741e934351Sopenharmony_ci    }
751e934351Sopenharmony_ci
761e934351Sopenharmony_ci    /**
771e934351Sopenharmony_ci     * push it to taskQueue_ and notify a thread to run it
781e934351Sopenharmony_ci     * @param task new task to Execute
791e934351Sopenharmony_ci     */
801e934351Sopenharmony_ci    void Push(const Task &task)
811e934351Sopenharmony_ci    {
821e934351Sopenharmony_ci        PushTask(task);
831e934351Sopenharmony_ci
841e934351Sopenharmony_ci        if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) {
851e934351Sopenharmony_ci            std::thread([this] { RunTask(); }).detach();
861e934351Sopenharmony_ci        }
871e934351Sopenharmony_ci
881e934351Sopenharmony_ci        needRunCondition_.notify_all();
891e934351Sopenharmony_ci    }
901e934351Sopenharmony_ci
911e934351Sopenharmony_ciprivate:
921e934351Sopenharmony_ci    bool IsQueueEmpty()
931e934351Sopenharmony_ci    {
941e934351Sopenharmony_ci        std::lock_guard<std::mutex> guard(mutex_);
951e934351Sopenharmony_ci        return taskQueue_.empty();
961e934351Sopenharmony_ci    }
971e934351Sopenharmony_ci
981e934351Sopenharmony_ci    bool GetTask(Task &task)
991e934351Sopenharmony_ci    {
1001e934351Sopenharmony_ci        std::lock_guard<std::mutex> guard(mutex_);
1011e934351Sopenharmony_ci
1021e934351Sopenharmony_ci        // if taskQueue_ is empty, means timeout
1031e934351Sopenharmony_ci        if (taskQueue_.empty()) {
1041e934351Sopenharmony_ci            return false;
1051e934351Sopenharmony_ci        }
1061e934351Sopenharmony_ci
1071e934351Sopenharmony_ci        // if run to this line, means that taskQueue_ is not empty
1081e934351Sopenharmony_ci        task = taskQueue_.top();
1091e934351Sopenharmony_ci        taskQueue_.pop();
1101e934351Sopenharmony_ci        return true;
1111e934351Sopenharmony_ci    }
1121e934351Sopenharmony_ci
1131e934351Sopenharmony_ci    void PushTask(const Task &task)
1141e934351Sopenharmony_ci    {
1151e934351Sopenharmony_ci        std::lock_guard<std::mutex> guard(mutex_);
1161e934351Sopenharmony_ci        taskQueue_.push(task);
1171e934351Sopenharmony_ci    }
1181e934351Sopenharmony_ci
1191e934351Sopenharmony_ci    class NumWrapper {
1201e934351Sopenharmony_ci    public:
1211e934351Sopenharmony_ci        NumWrapper() = delete;
1221e934351Sopenharmony_ci
1231e934351Sopenharmony_ci        explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num)
1241e934351Sopenharmony_ci        {
1251e934351Sopenharmony_ci            ++num_;
1261e934351Sopenharmony_ci        }
1271e934351Sopenharmony_ci
1281e934351Sopenharmony_ci        ~NumWrapper()
1291e934351Sopenharmony_ci        {
1301e934351Sopenharmony_ci            --num_;
1311e934351Sopenharmony_ci        }
1321e934351Sopenharmony_ci
1331e934351Sopenharmony_ci    private:
1341e934351Sopenharmony_ci        std::atomic<uint32_t> &num_;
1351e934351Sopenharmony_ci    };
1361e934351Sopenharmony_ci
1371e934351Sopenharmony_ci    void Sleep()
1381e934351Sopenharmony_ci    {
1391e934351Sopenharmony_ci        std::mutex needRunMutex;
1401e934351Sopenharmony_ci        std::unique_lock<std::mutex> lock(needRunMutex);
1411e934351Sopenharmony_ci
1421e934351Sopenharmony_ci        /**
1431e934351Sopenharmony_ci         * if the thread is waiting, it is idle
1441e934351Sopenharmony_ci         * if wake up, this thread is not idle:
1451e934351Sopenharmony_ci         *     1 this thread should return
1461e934351Sopenharmony_ci         *     2 this thread should run task
1471e934351Sopenharmony_ci         *     3 this thread should go to next loop
1481e934351Sopenharmony_ci         */
1491e934351Sopenharmony_ci        NumWrapper idleWrapper(idleThreadNum_);
1501e934351Sopenharmony_ci        (void)idleWrapper;
1511e934351Sopenharmony_ci
1521e934351Sopenharmony_ci        needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_),
1531e934351Sopenharmony_ci                                   [this] { return !needRun_ || !IsQueueEmpty(); });
1541e934351Sopenharmony_ci    }
1551e934351Sopenharmony_ci
1561e934351Sopenharmony_ci    void RunTask()
1571e934351Sopenharmony_ci    {
1581e934351Sopenharmony_ci        NumWrapper runningWrapper(runningNum_);
1591e934351Sopenharmony_ci        (void)runningWrapper;
1601e934351Sopenharmony_ci
1611e934351Sopenharmony_ci        while (needRun_) {
1621e934351Sopenharmony_ci            Task task;
1631e934351Sopenharmony_ci            if (GetTask(task)) {
1641e934351Sopenharmony_ci                task.Execute();
1651e934351Sopenharmony_ci                continue;
1661e934351Sopenharmony_ci            }
1671e934351Sopenharmony_ci
1681e934351Sopenharmony_ci            Sleep();
1691e934351Sopenharmony_ci
1701e934351Sopenharmony_ci            if (!needRun_) {
1711e934351Sopenharmony_ci                return;
1721e934351Sopenharmony_ci            }
1731e934351Sopenharmony_ci
1741e934351Sopenharmony_ci            if (GetTask(task)) {
1751e934351Sopenharmony_ci                task.Execute();
1761e934351Sopenharmony_ci                continue;
1771e934351Sopenharmony_ci            }
1781e934351Sopenharmony_ci
1791e934351Sopenharmony_ci            if (runningNum_ > DEFAULT_THREAD_NUM) {
1801e934351Sopenharmony_ci                return;
1811e934351Sopenharmony_ci            }
1821e934351Sopenharmony_ci        }
1831e934351Sopenharmony_ci    }
1841e934351Sopenharmony_ci
1851e934351Sopenharmony_ciprivate:
1861e934351Sopenharmony_ci    /**
1871e934351Sopenharmony_ci     * other thread put a task to the taskQueue_
1881e934351Sopenharmony_ci     */
1891e934351Sopenharmony_ci    std::mutex mutex_;
1901e934351Sopenharmony_ci    std::priority_queue<Task> taskQueue_;
1911e934351Sopenharmony_ci    /**
1921e934351Sopenharmony_ci     * 1 terminate the thread if it is idle for timeout_ seconds
1931e934351Sopenharmony_ci     * 2 wait for the thread started util timeout_
1941e934351Sopenharmony_ci     * 3 wait for the thread notified util timeout_
1951e934351Sopenharmony_ci     * 4 wait for the thread terminated util timeout_
1961e934351Sopenharmony_ci     */
1971e934351Sopenharmony_ci    uint32_t timeout_;
1981e934351Sopenharmony_ci    /**
1991e934351Sopenharmony_ci     * if idleThreadNum_ is zero, make a new thread
2001e934351Sopenharmony_ci     */
2011e934351Sopenharmony_ci    std::atomic<uint32_t> idleThreadNum_;
2021e934351Sopenharmony_ci    /**
2031e934351Sopenharmony_ci     * when ThreadPool object is deleted, wait until runningNum_ is zero.
2041e934351Sopenharmony_ci     */
2051e934351Sopenharmony_ci    std::atomic<uint32_t> runningNum_;
2061e934351Sopenharmony_ci    /**
2071e934351Sopenharmony_ci     * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated
2081e934351Sopenharmony_ci     */
2091e934351Sopenharmony_ci    std::atomic_bool needRun_;
2101e934351Sopenharmony_ci    std::condition_variable needRunCondition_;
2111e934351Sopenharmony_ci};
2121e934351Sopenharmony_ci} // namespace OHOS::NetStack
2131e934351Sopenharmony_ci#endif /* NETSTACK_THREAD_POOL */
214