1/* 2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "workerQueue.h" 17 18namespace panda::es2panda::util { 19 20void WorkerJob::DependsOn(WorkerJob *job) 21{ 22 job->dependants_.push_back(this); 23 dependencies_++; 24} 25 26void WorkerJob::Signal() 27{ 28 { 29 std::lock_guard<std::mutex> lock(m_); 30 dependencies_--; 31 } 32 33 cond_.notify_one(); 34} 35 36WorkerQueue::WorkerQueue(size_t threadCount) 37{ 38 threads_.reserve(threadCount); 39 40 for (size_t i = 0; i < threadCount; i++) { 41 threads_.push_back(os::thread::ThreadStart(Worker, this)); 42 } 43} 44 45WorkerQueue::~WorkerQueue() 46{ 47 void *retval = nullptr; 48 49 std::unique_lock<std::mutex> lock(m_); 50 terminate_ = true; 51 lock.unlock(); 52 jobsAvailable_.notify_all(); 53 54 for (const auto handle_id : threads_) { 55 os::thread::ThreadJoin(handle_id, &retval); 56 } 57} 58 59void WorkerQueue::Worker(WorkerQueue *queue) 60{ 61 while (true) { 62 std::unique_lock<std::mutex> lock(queue->m_); 63 queue->jobsAvailable_.wait(lock, [queue]() { return queue->terminate_ || queue->jobsCount_ != 0; }); 64 65 if (queue->terminate_) { 66 return; 67 } 68 69 lock.unlock(); 70 71 queue->Consume(); 72 queue->jobsFinished_.notify_one(); 73 } 74} 75 76void WorkerQueue::Consume() 77{ 78 std::unique_lock<std::mutex> lock(m_); 79 activeWorkers_++; 80 81 while (jobsCount_ > 0) { 82 --jobsCount_; 83 auto &job = *(jobs_[jobsCount_]); 84 85 lock.unlock(); 86 87 try { 88 job.Run(); 89 } catch (const Error &e) { 90 lock.lock(); 91 errors_.push_back(e); 92 lock.unlock(); 93 } 94 95 lock.lock(); 96 } 97 98 activeWorkers_--; 99} 100 101void WorkerQueue::Wait() 102{ 103 std::unique_lock<std::mutex> lock(m_); 104 jobsFinished_.wait(lock, [this]() { return activeWorkers_ == 0 && jobsCount_ == 0; }); 105 for (auto it = jobs_.begin(); it != jobs_.end(); it++) { 106 if (*it != nullptr) { 107 delete *it; 108 *it = nullptr; 109 } 110 } 111 jobs_.clear(); 112 113 if (!errors_.empty()) { 114 // NOLINTNEXTLINE 115 throw errors_.front(); 116 } 117} 118} // namespace panda::es2panda::util 119