1/*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#ifndef NETSTACK_THREAD_POOL
17#define NETSTACK_THREAD_POOL
18
19#include <atomic>
20#include <condition_variable>
21#include <queue>
22#include <thread>
23#include <vector>
24
25namespace OHOS::NetStack {
26template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool {
27public:
28    /**
29     * disallow default constructor
30     */
31    ThreadPool() = delete;
32
33    /**
34     * disallow copy and move
35     */
36    ThreadPool(const ThreadPool &) = delete;
37
38    /**
39     * disallow copy and move
40     */
41    ThreadPool &operator=(const ThreadPool &) = delete;
42
43    /**
44     * disallow copy and move
45     */
46    ThreadPool(ThreadPool &&) = delete;
47
48    /**
49     * disallow copy and move
50     */
51    ThreadPool &operator=(ThreadPool &&) = delete;
52
53    /**
54     * make DEFAULT_THREAD_NUM threads
55     * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated
56     */
57    explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true)
58    {
59        for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) {
60            std::thread([this] { RunTask(); }).detach();
61        }
62    }
63
64    /**
65     * if ~ThreadPool, terminate all thread
66     */
67    ~ThreadPool()
68    {
69        // set needRun_ = false, and notify all the thread to wake and terminate
70        needRun_ = false;
71        while (runningNum_ > 0) {
72            needRunCondition_.notify_all();
73        }
74    }
75
76    /**
77     * push it to taskQueue_ and notify a thread to run it
78     * @param task new task to Execute
79     */
80    void Push(const Task &task)
81    {
82        PushTask(task);
83
84        if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) {
85            std::thread([this] { RunTask(); }).detach();
86        }
87
88        needRunCondition_.notify_all();
89    }
90
91private:
92    bool IsQueueEmpty()
93    {
94        std::lock_guard<std::mutex> guard(mutex_);
95        return taskQueue_.empty();
96    }
97
98    bool GetTask(Task &task)
99    {
100        std::lock_guard<std::mutex> guard(mutex_);
101
102        // if taskQueue_ is empty, means timeout
103        if (taskQueue_.empty()) {
104            return false;
105        }
106
107        // if run to this line, means that taskQueue_ is not empty
108        task = taskQueue_.top();
109        taskQueue_.pop();
110        return true;
111    }
112
113    void PushTask(const Task &task)
114    {
115        std::lock_guard<std::mutex> guard(mutex_);
116        taskQueue_.push(task);
117    }
118
119    class NumWrapper {
120    public:
121        NumWrapper() = delete;
122
123        explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num)
124        {
125            ++num_;
126        }
127
128        ~NumWrapper()
129        {
130            --num_;
131        }
132
133    private:
134        std::atomic<uint32_t> &num_;
135    };
136
137    void Sleep()
138    {
139        std::mutex needRunMutex;
140        std::unique_lock<std::mutex> lock(needRunMutex);
141
142        /**
143         * if the thread is waiting, it is idle
144         * if wake up, this thread is not idle:
145         *     1 this thread should return
146         *     2 this thread should run task
147         *     3 this thread should go to next loop
148         */
149        NumWrapper idleWrapper(idleThreadNum_);
150        (void)idleWrapper;
151
152        needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_),
153                                   [this] { return !needRun_ || !IsQueueEmpty(); });
154    }
155
156    void RunTask()
157    {
158        NumWrapper runningWrapper(runningNum_);
159        (void)runningWrapper;
160
161        while (needRun_) {
162            Task task;
163            if (GetTask(task)) {
164                task.Execute();
165                continue;
166            }
167
168            Sleep();
169
170            if (!needRun_) {
171                return;
172            }
173
174            if (GetTask(task)) {
175                task.Execute();
176                continue;
177            }
178
179            if (runningNum_ > DEFAULT_THREAD_NUM) {
180                return;
181            }
182        }
183    }
184
185private:
186    /**
187     * other thread put a task to the taskQueue_
188     */
189    std::mutex mutex_;
190    std::priority_queue<Task> taskQueue_;
191    /**
192     * 1 terminate the thread if it is idle for timeout_ seconds
193     * 2 wait for the thread started util timeout_
194     * 3 wait for the thread notified util timeout_
195     * 4 wait for the thread terminated util timeout_
196     */
197    uint32_t timeout_;
198    /**
199     * if idleThreadNum_ is zero, make a new thread
200     */
201    std::atomic<uint32_t> idleThreadNum_;
202    /**
203     * when ThreadPool object is deleted, wait until runningNum_ is zero.
204     */
205    std::atomic<uint32_t> runningNum_;
206    /**
207     * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated
208     */
209    std::atomic_bool needRun_;
210    std::condition_variable needRunCondition_;
211};
212} // namespace OHOS::NetStack
213#endif /* NETSTACK_THREAD_POOL */
214