1fb299fa2Sopenharmony_ci/*
2fb299fa2Sopenharmony_ci * Copyright (c) 2021 Huawei Device Co., Ltd.
3fb299fa2Sopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
4fb299fa2Sopenharmony_ci * you may not use this file except in compliance with the License.
5fb299fa2Sopenharmony_ci * You may obtain a copy of the License at
6fb299fa2Sopenharmony_ci *
7fb299fa2Sopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0
8fb299fa2Sopenharmony_ci *
9fb299fa2Sopenharmony_ci * Unless required by applicable law or agreed to in writing, software
10fb299fa2Sopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
11fb299fa2Sopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12fb299fa2Sopenharmony_ci * See the License for the specific language governing permissions and
13fb299fa2Sopenharmony_ci * limitations under the License.
14fb299fa2Sopenharmony_ci */
15fb299fa2Sopenharmony_ci#include "thread_pool.h"
16fb299fa2Sopenharmony_ci#include <cstring>
17fb299fa2Sopenharmony_ci#include "script_utils.h"
18fb299fa2Sopenharmony_ci#include <unistd.h>
19fb299fa2Sopenharmony_ci
20fb299fa2Sopenharmony_cinamespace Uscript {
21fb299fa2Sopenharmony_cistatic thread_local float g_scriptProportion = 1.0f;
22fb299fa2Sopenharmony_cistatic ThreadPool* g_threadPool = nullptr;
23fb299fa2Sopenharmony_cistatic std::mutex g_initMutex;
24fb299fa2Sopenharmony_ci
25fb299fa2Sopenharmony_civoid SetScriptProportion(float proportion)
26fb299fa2Sopenharmony_ci{
27fb299fa2Sopenharmony_ci    g_scriptProportion = proportion;
28fb299fa2Sopenharmony_ci}
29fb299fa2Sopenharmony_ci
30fb299fa2Sopenharmony_cifloat GetScriptProportion()
31fb299fa2Sopenharmony_ci{
32fb299fa2Sopenharmony_ci    return g_scriptProportion;
33fb299fa2Sopenharmony_ci}
34fb299fa2Sopenharmony_ci
35fb299fa2Sopenharmony_ciThreadPool* ThreadPool::CreateThreadPool(int number)
36fb299fa2Sopenharmony_ci{
37fb299fa2Sopenharmony_ci    std::lock_guard<std::mutex> lock(g_initMutex);
38fb299fa2Sopenharmony_ci    if (g_threadPool != nullptr) {
39fb299fa2Sopenharmony_ci        return g_threadPool;
40fb299fa2Sopenharmony_ci    }
41fb299fa2Sopenharmony_ci    g_threadPool = new ThreadPool();
42fb299fa2Sopenharmony_ci    g_threadPool->Init(number);
43fb299fa2Sopenharmony_ci    return g_threadPool;
44fb299fa2Sopenharmony_ci}
45fb299fa2Sopenharmony_ci
46fb299fa2Sopenharmony_civoid ThreadPool::Destroy()
47fb299fa2Sopenharmony_ci{
48fb299fa2Sopenharmony_ci    if (g_threadPool == nullptr) {
49fb299fa2Sopenharmony_ci        return;
50fb299fa2Sopenharmony_ci    }
51fb299fa2Sopenharmony_ci    std::lock_guard<std::mutex> lock(g_initMutex);
52fb299fa2Sopenharmony_ci    delete g_threadPool;
53fb299fa2Sopenharmony_ci    g_threadPool = nullptr;
54fb299fa2Sopenharmony_ci}
55fb299fa2Sopenharmony_ci
56fb299fa2Sopenharmony_civoid ThreadPool::Init(int32_t number)
57fb299fa2Sopenharmony_ci{
58fb299fa2Sopenharmony_ci    threadNumber_ = number;
59fb299fa2Sopenharmony_ci    taskQueue_.resize(threadPoolMaxTasks);
60fb299fa2Sopenharmony_ci    for (size_t t = 0; t < taskQueue_.size(); ++t) {
61fb299fa2Sopenharmony_ci        taskQueue_[t].available = true;
62fb299fa2Sopenharmony_ci        for (int32_t i = 0; i < threadNumber_; ++i) {
63fb299fa2Sopenharmony_ci            taskQueue_[t].subTaskFlag.emplace_back(new std::atomic_bool { false });
64fb299fa2Sopenharmony_ci        }
65fb299fa2Sopenharmony_ci    }
66fb299fa2Sopenharmony_ci    // Create workers
67fb299fa2Sopenharmony_ci    for (int32_t threadIndex = 0; threadIndex < threadNumber_; ++threadIndex) {
68fb299fa2Sopenharmony_ci        workers_.emplace_back(std::thread([this, threadIndex] {
69fb299fa2Sopenharmony_ci            ThreadExecute(this, threadIndex);
70fb299fa2Sopenharmony_ci            }));
71fb299fa2Sopenharmony_ci    }
72fb299fa2Sopenharmony_ci}
73fb299fa2Sopenharmony_ci
74fb299fa2Sopenharmony_civoid ThreadPool::ThreadRun(int32_t threadIndex)
75fb299fa2Sopenharmony_ci{
76fb299fa2Sopenharmony_ci    USCRIPT_LOGI("Create new thread successfully, tid: %d", gettid());
77fb299fa2Sopenharmony_ci    while (!stop_) {
78fb299fa2Sopenharmony_ci        for (int32_t k = 0; k < threadPoolMaxTasks; ++k) {
79fb299fa2Sopenharmony_ci            if (*taskQueue_[k].subTaskFlag[threadIndex]) {
80fb299fa2Sopenharmony_ci                taskQueue_[k].task.processor(threadIndex);
81fb299fa2Sopenharmony_ci                *taskQueue_[k].subTaskFlag[threadIndex] = false;
82fb299fa2Sopenharmony_ci            }
83fb299fa2Sopenharmony_ci        }
84fb299fa2Sopenharmony_ci        std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms
85fb299fa2Sopenharmony_ci    }
86fb299fa2Sopenharmony_ci}
87fb299fa2Sopenharmony_ci
88fb299fa2Sopenharmony_ciThreadPool::~ThreadPool()
89fb299fa2Sopenharmony_ci{
90fb299fa2Sopenharmony_ci    {
91fb299fa2Sopenharmony_ci        std::lock_guard<std::mutex> lock(queueMutex_);
92fb299fa2Sopenharmony_ci        stop_ = true;
93fb299fa2Sopenharmony_ci    }
94fb299fa2Sopenharmony_ci    for (auto& worker : workers_) {
95fb299fa2Sopenharmony_ci        worker.join();
96fb299fa2Sopenharmony_ci    }
97fb299fa2Sopenharmony_ci    for (auto& task : taskQueue_) {
98fb299fa2Sopenharmony_ci        for (auto c : task.subTaskFlag) {
99fb299fa2Sopenharmony_ci            delete c;
100fb299fa2Sopenharmony_ci        }
101fb299fa2Sopenharmony_ci    }
102fb299fa2Sopenharmony_ci}
103fb299fa2Sopenharmony_ci
104fb299fa2Sopenharmony_civoid ThreadPool::AddTask(Task &&task)
105fb299fa2Sopenharmony_ci{
106fb299fa2Sopenharmony_ci    if (g_threadPool != nullptr) {
107fb299fa2Sopenharmony_ci        g_threadPool->AddNewTask(std::move(task));
108fb299fa2Sopenharmony_ci    }
109fb299fa2Sopenharmony_ci}
110fb299fa2Sopenharmony_ci
111fb299fa2Sopenharmony_civoid ThreadPool::AddNewTask(Task &&task)
112fb299fa2Sopenharmony_ci{
113fb299fa2Sopenharmony_ci    int32_t index = AcquireWorkIndex();
114fb299fa2Sopenharmony_ci    if (index < 0) {
115fb299fa2Sopenharmony_ci        USCRIPT_LOGI("ThreadPool::AddNewTask Failed");
116fb299fa2Sopenharmony_ci        return;
117fb299fa2Sopenharmony_ci    }
118fb299fa2Sopenharmony_ci
119fb299fa2Sopenharmony_ci    RunTask(std::move(task), index);
120fb299fa2Sopenharmony_ci    // Works done. make this task available
121fb299fa2Sopenharmony_ci    std::lock_guard<std::mutex> lock(queueMutex_);
122fb299fa2Sopenharmony_ci    taskQueue_[index].available = true;
123fb299fa2Sopenharmony_ci}
124fb299fa2Sopenharmony_ci
125fb299fa2Sopenharmony_ciint32_t ThreadPool::AcquireWorkIndex()
126fb299fa2Sopenharmony_ci{
127fb299fa2Sopenharmony_ci    std::lock_guard<std::mutex> lock(queueMutex_);
128fb299fa2Sopenharmony_ci    for (int32_t i = 0; i < threadPoolMaxTasks; ++i) {
129fb299fa2Sopenharmony_ci        if (taskQueue_[i].available) {
130fb299fa2Sopenharmony_ci            taskQueue_[i].available = false;
131fb299fa2Sopenharmony_ci            return i;
132fb299fa2Sopenharmony_ci        }
133fb299fa2Sopenharmony_ci    }
134fb299fa2Sopenharmony_ci    return -1;
135fb299fa2Sopenharmony_ci}
136fb299fa2Sopenharmony_ci
137fb299fa2Sopenharmony_civoid ThreadPool::RunTask(Task &&task, int32_t index)
138fb299fa2Sopenharmony_ci{
139fb299fa2Sopenharmony_ci    int32_t workSize = task.workSize;
140fb299fa2Sopenharmony_ci    taskQueue_[index].task = std::move(task);
141fb299fa2Sopenharmony_ci    // Mark each task should be executed
142fb299fa2Sopenharmony_ci    int32_t num = workSize > threadNumber_ ? threadNumber_ : workSize;
143fb299fa2Sopenharmony_ci    for (int32_t i = 0; i < num; ++i) {
144fb299fa2Sopenharmony_ci        *taskQueue_[index].subTaskFlag[i] = true;
145fb299fa2Sopenharmony_ci    }
146fb299fa2Sopenharmony_ci
147fb299fa2Sopenharmony_ci    bool complete = true;
148fb299fa2Sopenharmony_ci    do {
149fb299fa2Sopenharmony_ci        std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50ms
150fb299fa2Sopenharmony_ci        complete = true;
151fb299fa2Sopenharmony_ci        // 检查是否所有子任务执行结束
152fb299fa2Sopenharmony_ci        for (int32_t i = 0; i < num; ++i) {
153fb299fa2Sopenharmony_ci            if (*taskQueue_[index].subTaskFlag[i]) {
154fb299fa2Sopenharmony_ci                complete = false;
155fb299fa2Sopenharmony_ci                break;
156fb299fa2Sopenharmony_ci            }
157fb299fa2Sopenharmony_ci        }
158fb299fa2Sopenharmony_ci    } while (!complete);
159fb299fa2Sopenharmony_ci}
160fb299fa2Sopenharmony_ci} // namespace Uscript
161