1/*
2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "workerQueue.h"
17
18namespace panda::es2panda::util {
19
20void WorkerJob::DependsOn(WorkerJob *job)
21{
22    job->dependants_.push_back(this);
23    dependencies_++;
24}
25
26void WorkerJob::Signal()
27{
28    {
29        std::lock_guard<std::mutex> lock(m_);
30        dependencies_--;
31    }
32
33    cond_.notify_one();
34}
35
36WorkerQueue::WorkerQueue(size_t threadCount)
37{
38    threads_.reserve(threadCount);
39
40    for (size_t i = 0; i < threadCount; i++) {
41        threads_.push_back(os::thread::ThreadStart(Worker, this));
42    }
43}
44
45WorkerQueue::~WorkerQueue()
46{
47    void *retval = nullptr;
48
49    std::unique_lock<std::mutex> lock(m_);
50    terminate_ = true;
51    lock.unlock();
52    jobsAvailable_.notify_all();
53
54    for (const auto handle_id : threads_) {
55        os::thread::ThreadJoin(handle_id, &retval);
56    }
57}
58
59void WorkerQueue::Worker(WorkerQueue *queue)
60{
61    while (true) {
62        std::unique_lock<std::mutex> lock(queue->m_);
63        queue->jobsAvailable_.wait(lock, [queue]() { return queue->terminate_ || queue->jobsCount_ != 0; });
64
65        if (queue->terminate_) {
66            return;
67        }
68
69        lock.unlock();
70
71        queue->Consume();
72        queue->jobsFinished_.notify_one();
73    }
74}
75
76void WorkerQueue::Consume()
77{
78    std::unique_lock<std::mutex> lock(m_);
79    activeWorkers_++;
80
81    while (jobsCount_ > 0) {
82        --jobsCount_;
83        auto &job = *(jobs_[jobsCount_]);
84
85        lock.unlock();
86
87        try {
88            job.Run();
89        } catch (const Error &e) {
90            lock.lock();
91            errors_.push_back(e);
92            lock.unlock();
93        }
94
95        lock.lock();
96    }
97
98    activeWorkers_--;
99}
100
101void WorkerQueue::Wait()
102{
103    std::unique_lock<std::mutex> lock(m_);
104    jobsFinished_.wait(lock, [this]() { return activeWorkers_ == 0 && jobsCount_ == 0; });
105    for (auto it = jobs_.begin(); it != jobs_.end(); it++) {
106        if (*it != nullptr) {
107            delete *it;
108            *it = nullptr;
109        }
110    }
111    jobs_.clear();
112
113    if (!errors_.empty()) {
114        // NOLINTNEXTLINE
115        throw errors_.front();
116    }
117}
118}  // namespace panda::es2panda::util
119