1 /*
2  * Copyright (c) 2024-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef SIGNATRUETOOLS_THREAD_POOL_H
16 #define SIGNATRUETOOLS_THREAD_POOL_H
17 
18 #include <vector>
19 #include <queue>
20 #include <memory>
21 #include <thread>
22 #include <mutex>
23 #include <condition_variable>
24 #include <future>
25 #include <functional>
26 #include <stdexcept>
27 
28 #define TASK_NUM (std::thread::hardware_concurrency())
29 
30 namespace OHOS {
31 namespace SignatureTools {
32 namespace Uscript {
33 class ThreadPool {
34 public:
ThreadPool(size_t threads = TASK_NUM)35     ThreadPool(size_t threads = TASK_NUM)
36         : m_stop(false)
37     {
38         for (size_t i = 0; i < threads; ++i)
39             m_workers.emplace_back([this] {
40             std::function<void()> task;
41             std::unique_lock<std::mutex> lock(m_queueMutex);
42             while (!(m_stop && m_tasks.empty())) {
43                 m_condition.wait(lock, [this] { return m_stop || !m_tasks.empty(); });
44                 if (m_stop && m_tasks.empty()) {
45                     return;
46                 }
47                 task = std::move(m_tasks.front());
48                 m_tasks.pop();
49                 lock.unlock();
50                 task();
51                 lock.lock();
52                 m_conditionMax.notify_one();
53             }
54         });
55     }
56 
57     template<class F, class... Args>
58     auto Enqueue(F&& f, Args&& ... args)
59         -> std::future<typename std::result_of<F(Args...)>::type>
60     {
61         using returnType = typename std::result_of<F(Args...)>::type;
62         auto task = std::make_shared< std::packaged_task<returnType()> >(
63             std::bind(std::forward<F>(f), std::forward<Args>(args)...)
64         );
65         std::future<returnType> res = task->get_future();
66         {
67             std::unique_lock<std::mutex> lock(m_queueMutex);
68             while (m_stop == false && m_tasks.size() >= TASK_NUM) {
69                 m_conditionMax.wait(lock);
70             }
71             m_tasks.emplace([task] () { (*task)(); });
72             m_condition.notify_one();
73         }
74         return res;
75     }
76 
~ThreadPool()77     ~ThreadPool()
78     {
79         if (m_stop == false) {
80             {
81                 std::unique_lock<std::mutex> lock(m_queueMutex);
82                 m_stop = true;
83             }
84             m_condition.notify_all();
85             for (std::thread& worker : m_workers) {
86                 worker.join();
87             }
88         }
89     }
90 
91 private:
92     // need to keep track of threads so we can join them
93     std::vector<std::thread> m_workers;
94     // the task queue
95     std::queue<std::function<void()>> m_tasks;
96     // synchronization
97     std::mutex m_queueMutex;
98     std::condition_variable m_condition;
99     std::condition_variable m_conditionMax;
100     bool m_stop;
101 };
102 } // namespace Uscript
103 } // namespace SignatureTools
104 } // namespace OHOS
105 #endif