1 /* 2 * Copyright (c) 2024-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef SIGNATRUETOOLS_THREAD_POOL_H 16 #define SIGNATRUETOOLS_THREAD_POOL_H 17 18 #include <vector> 19 #include <queue> 20 #include <memory> 21 #include <thread> 22 #include <mutex> 23 #include <condition_variable> 24 #include <future> 25 #include <functional> 26 #include <stdexcept> 27 28 #define TASK_NUM (std::thread::hardware_concurrency()) 29 30 namespace OHOS { 31 namespace SignatureTools { 32 namespace Uscript { 33 class ThreadPool { 34 public: ThreadPool(size_t threads = TASK_NUM)35 ThreadPool(size_t threads = TASK_NUM) 36 : m_stop(false) 37 { 38 for (size_t i = 0; i < threads; ++i) 39 m_workers.emplace_back([this] { 40 std::function<void()> task; 41 std::unique_lock<std::mutex> lock(m_queueMutex); 42 while (!(m_stop && m_tasks.empty())) { 43 m_condition.wait(lock, [this] { return m_stop || !m_tasks.empty(); }); 44 if (m_stop && m_tasks.empty()) { 45 return; 46 } 47 task = std::move(m_tasks.front()); 48 m_tasks.pop(); 49 lock.unlock(); 50 task(); 51 lock.lock(); 52 m_conditionMax.notify_one(); 53 } 54 }); 55 } 56 57 template<class F, class... Args> 58 auto Enqueue(F&& f, Args&& ... args) 59 -> std::future<typename std::result_of<F(Args...)>::type> 60 { 61 using returnType = typename std::result_of<F(Args...)>::type; 62 auto task = std::make_shared< std::packaged_task<returnType()> >( 63 std::bind(std::forward<F>(f), std::forward<Args>(args)...) 64 ); 65 std::future<returnType> res = task->get_future(); 66 { 67 std::unique_lock<std::mutex> lock(m_queueMutex); 68 while (m_stop == false && m_tasks.size() >= TASK_NUM) { 69 m_conditionMax.wait(lock); 70 } 71 m_tasks.emplace([task] () { (*task)(); }); 72 m_condition.notify_one(); 73 } 74 return res; 75 } 76 ~ThreadPool()77 ~ThreadPool() 78 { 79 if (m_stop == false) { 80 { 81 std::unique_lock<std::mutex> lock(m_queueMutex); 82 m_stop = true; 83 } 84 m_condition.notify_all(); 85 for (std::thread& worker : m_workers) { 86 worker.join(); 87 } 88 } 89 } 90 91 private: 92 // need to keep track of threads so we can join them 93 std::vector<std::thread> m_workers; 94 // the task queue 95 std::queue<std::function<void()>> m_tasks; 96 // synchronization 97 std::mutex m_queueMutex; 98 std::condition_variable m_condition; 99 std::condition_variable m_conditionMax; 100 bool m_stop; 101 }; 102 } // namespace Uscript 103 } // namespace SignatureTools 104 } // namespace OHOS 105 #endif