1/*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15#include "thread_pool.h"
16#include <cstring>
17#include "script_utils.h"
18#include <unistd.h>
19
20namespace Uscript {
21static thread_local float g_scriptProportion = 1.0f;
22static ThreadPool* g_threadPool = nullptr;
23static std::mutex g_initMutex;
24
25void SetScriptProportion(float proportion)
26{
27    g_scriptProportion = proportion;
28}
29
30float GetScriptProportion()
31{
32    return g_scriptProportion;
33}
34
35ThreadPool* ThreadPool::CreateThreadPool(int number)
36{
37    std::lock_guard<std::mutex> lock(g_initMutex);
38    if (g_threadPool != nullptr) {
39        return g_threadPool;
40    }
41    g_threadPool = new ThreadPool();
42    g_threadPool->Init(number);
43    return g_threadPool;
44}
45
46void ThreadPool::Destroy()
47{
48    if (g_threadPool == nullptr) {
49        return;
50    }
51    std::lock_guard<std::mutex> lock(g_initMutex);
52    delete g_threadPool;
53    g_threadPool = nullptr;
54}
55
56void ThreadPool::Init(int32_t number)
57{
58    threadNumber_ = number;
59    taskQueue_.resize(threadPoolMaxTasks);
60    for (size_t t = 0; t < taskQueue_.size(); ++t) {
61        taskQueue_[t].available = true;
62        for (int32_t i = 0; i < threadNumber_; ++i) {
63            taskQueue_[t].subTaskFlag.emplace_back(new std::atomic_bool { false });
64        }
65    }
66    // Create workers
67    for (int32_t threadIndex = 0; threadIndex < threadNumber_; ++threadIndex) {
68        workers_.emplace_back(std::thread([this, threadIndex] {
69            ThreadExecute(this, threadIndex);
70            }));
71    }
72}
73
74void ThreadPool::ThreadRun(int32_t threadIndex)
75{
76    USCRIPT_LOGI("Create new thread successfully, tid: %d", gettid());
77    while (!stop_) {
78        for (int32_t k = 0; k < threadPoolMaxTasks; ++k) {
79            if (*taskQueue_[k].subTaskFlag[threadIndex]) {
80                taskQueue_[k].task.processor(threadIndex);
81                *taskQueue_[k].subTaskFlag[threadIndex] = false;
82            }
83        }
84        std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms
85    }
86}
87
88ThreadPool::~ThreadPool()
89{
90    {
91        std::lock_guard<std::mutex> lock(queueMutex_);
92        stop_ = true;
93    }
94    for (auto& worker : workers_) {
95        worker.join();
96    }
97    for (auto& task : taskQueue_) {
98        for (auto c : task.subTaskFlag) {
99            delete c;
100        }
101    }
102}
103
104void ThreadPool::AddTask(Task &&task)
105{
106    if (g_threadPool != nullptr) {
107        g_threadPool->AddNewTask(std::move(task));
108    }
109}
110
111void ThreadPool::AddNewTask(Task &&task)
112{
113    int32_t index = AcquireWorkIndex();
114    if (index < 0) {
115        USCRIPT_LOGI("ThreadPool::AddNewTask Failed");
116        return;
117    }
118
119    RunTask(std::move(task), index);
120    // Works done. make this task available
121    std::lock_guard<std::mutex> lock(queueMutex_);
122    taskQueue_[index].available = true;
123}
124
125int32_t ThreadPool::AcquireWorkIndex()
126{
127    std::lock_guard<std::mutex> lock(queueMutex_);
128    for (int32_t i = 0; i < threadPoolMaxTasks; ++i) {
129        if (taskQueue_[i].available) {
130            taskQueue_[i].available = false;
131            return i;
132        }
133    }
134    return -1;
135}
136
137void ThreadPool::RunTask(Task &&task, int32_t index)
138{
139    int32_t workSize = task.workSize;
140    taskQueue_[index].task = std::move(task);
141    // Mark each task should be executed
142    int32_t num = workSize > threadNumber_ ? threadNumber_ : workSize;
143    for (int32_t i = 0; i < num; ++i) {
144        *taskQueue_[index].subTaskFlag[i] = true;
145    }
146
147    bool complete = true;
148    do {
149        std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms
150        complete = true;
151        // 检查是否所有子任务执行结束
152        for (int32_t i = 0; i < num; ++i) {
153            if (*taskQueue_[index].subTaskFlag[i]) {
154                complete = false;
155                break;
156            }
157        }
158    } while (!complete);
159}
160} // namespace Uscript
161