1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H
18 
19 #include <mutex>
20 
21 #if defined(ENABLE_TASKPOOL_FFRT)
22 #include "cpp/task.h"
23 #endif
24 #include "helper/concurrent_helper.h"
25 #include "helper/error_helper.h"
26 #include "helper/napi_helper.h"
27 #include "helper/object_helper.h"
28 #include "message_queue.h"
29 #include "napi/native_api.h"
30 #include "napi/native_node_api.h"
31 #include "native_engine/native_engine.h"
32 #include "qos_helper.h"
33 #include "task.h"
34 #include "task_runner.h"
35 #include "tools/log.h"
36 
37 namespace Commonlibrary::Concurrent::TaskPoolModule {
38 using namespace Commonlibrary::Concurrent::Common;
39 using namespace Commonlibrary::Concurrent::Common::Helper;
40 using namespace Commonlibrary::Platform;
41 using MsgQueue = MessageQueue<TaskResultInfo*>;
42 
43 enum class WorkerState { IDLE, RUNNING, BLOCKED };
44 
45 #if defined(ENABLE_TASKPOOL_FFRT)
46 static const std::map<Priority, int> WORKERPRIORITY_FFRTQOS_MAP = {
47     {Priority::IDLE, ffrt::qos_background},
48     {Priority::LOW, ffrt::qos_utility},
49     {Priority::MEDIUM, ffrt::qos_default},
50     {Priority::HIGH, ffrt::qos_user_initiated},
51 };
52 #endif
53 
54 class Worker {
55 public:
56     using DebuggerPostTask = std::function<void()>;
57 
58     static Worker* WorkerConstructor(napi_env env);
59 
60     void NotifyExecuteTask();
61 
Enqueue(napi_env env, TaskResultInfo* resultInfo)62     void Enqueue(napi_env env, TaskResultInfo* resultInfo)
63     {
64         std::lock_guard<std::mutex> lock(queueMutex_);
65         msgQueueMap_[env].EnQueue(resultInfo);
66     }
67 
Dequeue(napi_env env, MsgQueue*& queue)68     void Dequeue(napi_env env, MsgQueue*& queue)
69     {
70         std::lock_guard<std::mutex> lock(queueMutex_);
71         auto item = msgQueueMap_.find(env);
72         if (item != msgQueueMap_.end()) {
73             queue = &(item->second);
74         }
75     }
76 
77     void NotifyTaskBegin();
78     // the function will only be called when the task is finished or
79     // exits abnormally, so we can not put it in the scope directly
80     void NotifyTaskFinished();
81     static void NotifyTaskResult(napi_env env, Task* task, napi_value result);
82     static void NotifyHandleTaskResult(Task* task);
83 
84 #if defined(ENABLE_TASKPOOL_FFRT)
85     bool IsLoopActive();
86     uint64_t GetWaitTime();
87 #endif
88 
89 private:
Worker(napi_env env)90     explicit Worker(napi_env env) : hostEnv_(env) {};
91 
92     ~Worker() = default;
93 
94     Worker(const Worker &) = delete;
95     Worker& operator=(const Worker &) = delete;
96     Worker(Worker &&) = delete;
97     Worker& operator=(Worker &&) = delete;
98 
99     void NotifyIdle();
100     void NotifyWorkerCreated();
NotifyTaskRunning()101     void NotifyTaskRunning()
102     {
103         state_ = WorkerState::RUNNING;
104         startTime_ = ConcurrentHelper::GetMilliseconds();
105         runningCount_++;
106     }
107 
HasRunningTasks() const108     bool HasRunningTasks() const
109     {
110         return runningCount_ != 0;
111     }
112 
113 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
114     static void HandleDebuggerTask(const uv_async_t* req);
115     void DebuggerOnPostTask(std::function<void()>&& task);
116 #endif
117 
GetWorkerLoop() const118     uv_loop_t* GetWorkerLoop() const
119     {
120         if (workerEnv_ != nullptr) {
121             return NapiHelper::GetLibUV(workerEnv_);
122         }
123         return nullptr;
124     }
125 
RunLoop() const126     void RunLoop() const
127     {
128         uv_loop_t* loop = GetWorkerLoop();
129         if (loop != nullptr) {
130             uv_run(loop, UV_RUN_DEFAULT);
131         } else {
132             HILOG_ERROR("taskpool:: Worker loop is nullptr when start worker loop");
133             return;
134         }
135     }
136 
137     // we will use the scope to manage resources automatically,
138     // including the HandleScope and NotifyRunning/NotifyIdle
139     class RunningScope {
140     public:
RunningScope(Worker* worker)141         explicit RunningScope(Worker* worker) : worker_(worker)
142         {
143             napi_open_handle_scope(worker_->workerEnv_, &scope_);
144             worker_->idleState_ = false;
145             worker->isExecutingLongTask_ = false;
146             worker_->NotifyTaskRunning();
147         }
148 
149         ~RunningScope();
150 
151     private:
152         Worker* worker_ = nullptr;
153         napi_handle_scope scope_ = nullptr;
154     };
155 
156     // use PriorityScope to manage the priority setting of workers
157     // reset qos_user_initiated when exit PriorityScope
158     class PriorityScope {
159     public:
160         PriorityScope(Worker* worker, Priority taskPriority);
~PriorityScope()161         ~PriorityScope()
162         {
163             worker_->ResetWorkerPriority();
164         }
165 
166     private:
167         Worker* worker_ = nullptr;
168     };
169 
170     void StartExecuteInThread();
171     static void ExecuteInThread(const void* data);
172     bool PrepareForWorkerInstance();
173     void ReleaseWorkerThreadContent();
174     void ResetWorkerPriority();
175     bool CheckFreeConditions();
176     bool UpdateWorkerState(WorkerState expect, WorkerState desired);
177     void StoreTaskId(uint64_t taskId);
178     bool InitTaskPoolFunc(napi_env env, napi_value func, Task* task);
179     void UpdateExecutedInfo();
180     void UpdateLongTaskInfo(Task* task);
181     bool IsExecutingLongTask();
182     bool HasLongTask();
183     void TerminateTask(uint64_t taskId);
184     void CloseHandles();
185     void PostReleaseSignal();
186 
187     static void HandleFunctionException(napi_env env, Task* task);
188     static void PerformTask(const uv_async_t* req);
189     static void TaskResultCallback(napi_env env, napi_value result, bool success, void* data);
190     static void ReleaseWorkerHandles(const uv_async_t* req);
191     static void TriggerGCCheck(const uv_async_t* req);
192 
193 #if defined(ENABLE_TASKPOOL_FFRT)
194     void InitFfrtInfo();
195     void InitLoopHandleNum();
196 #endif
197 
198     napi_env hostEnv_ {nullptr};
199     napi_env workerEnv_ {nullptr};
200     uv_async_t* performTaskSignal_ {nullptr};
201     uv_async_t* clearWorkerSignal_ {nullptr};
202     uv_async_t* triggerGCCheckSignal_ {nullptr};
203 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
204     uv_async_t* debuggerOnPostTaskSignal_ {nullptr};
205     std::mutex debuggerMutex_;
206     std::queue<DebuggerPostTask> debuggerQueue_ {};
207 #endif
208     std::unique_ptr<TaskRunner> runner_ {nullptr};
209 
210     std::atomic<int32_t> runningCount_ = 0;
211     std::atomic<bool> idleState_ = true; // true means the worker is idle
212     std::atomic<uint64_t> idlePoint_ = ConcurrentHelper::GetMilliseconds();
213     std::atomic<uint64_t> startTime_ = ConcurrentHelper::GetMilliseconds();
214     std::atomic<WorkerState> state_ {WorkerState::IDLE};
215     std::atomic<bool> hasExecuted_ = false; // false means this worker hasn't execute any tasks
216     Priority priority_ {Priority::DEFAULT};
217     pid_t tid_ = 0;
218     std::vector<uint64_t> currentTaskId_ {};
219     std::mutex currentTaskIdMutex_;
220     MessageQueue<TaskResultInfo*> hostMessageQueue_ {};
221     uint64_t lastCpuTime_ = 0;
222     uint32_t idleCount_ = 0;
223     std::atomic<bool> hasLongTask_ = false;
224     std::atomic<bool> isExecutingLongTask_ = false;
225     std::mutex longMutex_;
226     std::unordered_set<uint64_t> longTasksSet_ {};
227     std::mutex queueMutex_; // for sendData
228     std::unordered_map<napi_env, MsgQueue> msgQueueMap_ {};
229     friend class TaskManager;
230     friend class NativeEngineTest;
231 
232 #if defined(ENABLE_TASKPOOL_FFRT)
233     void* ffrtTaskHandle_ = nullptr;
234     uint32_t initActiveHandleNum_ = 0;
235 #endif
236 };
237 } // namespace Commonlibrary::Concurrent::TaskPoolModule
238 #endif // JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H