1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H 18 19 #include <mutex> 20 21 #if defined(ENABLE_TASKPOOL_FFRT) 22 #include "cpp/task.h" 23 #endif 24 #include "helper/concurrent_helper.h" 25 #include "helper/error_helper.h" 26 #include "helper/napi_helper.h" 27 #include "helper/object_helper.h" 28 #include "message_queue.h" 29 #include "napi/native_api.h" 30 #include "napi/native_node_api.h" 31 #include "native_engine/native_engine.h" 32 #include "qos_helper.h" 33 #include "task.h" 34 #include "task_runner.h" 35 #include "tools/log.h" 36 37 namespace Commonlibrary::Concurrent::TaskPoolModule { 38 using namespace Commonlibrary::Concurrent::Common; 39 using namespace Commonlibrary::Concurrent::Common::Helper; 40 using namespace Commonlibrary::Platform; 41 using MsgQueue = MessageQueue<TaskResultInfo*>; 42 43 enum class WorkerState { IDLE, RUNNING, BLOCKED }; 44 45 #if defined(ENABLE_TASKPOOL_FFRT) 46 static const std::map<Priority, int> WORKERPRIORITY_FFRTQOS_MAP = { 47 {Priority::IDLE, ffrt::qos_background}, 48 {Priority::LOW, ffrt::qos_utility}, 49 {Priority::MEDIUM, ffrt::qos_default}, 50 {Priority::HIGH, ffrt::qos_user_initiated}, 51 }; 52 #endif 53 54 class Worker { 55 public: 56 using DebuggerPostTask = std::function<void()>; 57 58 static Worker* WorkerConstructor(napi_env env); 59 60 void NotifyExecuteTask(); 61 Enqueue(napi_env env, TaskResultInfo* resultInfo)62 void Enqueue(napi_env env, TaskResultInfo* resultInfo) 63 { 64 std::lock_guard<std::mutex> lock(queueMutex_); 65 msgQueueMap_[env].EnQueue(resultInfo); 66 } 67 Dequeue(napi_env env, MsgQueue*& queue)68 void Dequeue(napi_env env, MsgQueue*& queue) 69 { 70 std::lock_guard<std::mutex> lock(queueMutex_); 71 auto item = msgQueueMap_.find(env); 72 if (item != msgQueueMap_.end()) { 73 queue = &(item->second); 74 } 75 } 76 77 void NotifyTaskBegin(); 78 // the function will only be called when the task is finished or 79 // exits abnormally, so we can not put it in the scope directly 80 void NotifyTaskFinished(); 81 static void NotifyTaskResult(napi_env env, Task* task, napi_value result); 82 static void NotifyHandleTaskResult(Task* task); 83 84 #if defined(ENABLE_TASKPOOL_FFRT) 85 bool IsLoopActive(); 86 uint64_t GetWaitTime(); 87 #endif 88 89 private: Worker(napi_env env)90 explicit Worker(napi_env env) : hostEnv_(env) {}; 91 92 ~Worker() = default; 93 94 Worker(const Worker &) = delete; 95 Worker& operator=(const Worker &) = delete; 96 Worker(Worker &&) = delete; 97 Worker& operator=(Worker &&) = delete; 98 99 void NotifyIdle(); 100 void NotifyWorkerCreated(); NotifyTaskRunning()101 void NotifyTaskRunning() 102 { 103 state_ = WorkerState::RUNNING; 104 startTime_ = ConcurrentHelper::GetMilliseconds(); 105 runningCount_++; 106 } 107 HasRunningTasks() const108 bool HasRunningTasks() const 109 { 110 return runningCount_ != 0; 111 } 112 113 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 114 static void HandleDebuggerTask(const uv_async_t* req); 115 void DebuggerOnPostTask(std::function<void()>&& task); 116 #endif 117 GetWorkerLoop() const118 uv_loop_t* GetWorkerLoop() const 119 { 120 if (workerEnv_ != nullptr) { 121 return NapiHelper::GetLibUV(workerEnv_); 122 } 123 return nullptr; 124 } 125 RunLoop() const126 void RunLoop() const 127 { 128 uv_loop_t* loop = GetWorkerLoop(); 129 if (loop != nullptr) { 130 uv_run(loop, UV_RUN_DEFAULT); 131 } else { 132 HILOG_ERROR("taskpool:: Worker loop is nullptr when start worker loop"); 133 return; 134 } 135 } 136 137 // we will use the scope to manage resources automatically, 138 // including the HandleScope and NotifyRunning/NotifyIdle 139 class RunningScope { 140 public: RunningScope(Worker* worker)141 explicit RunningScope(Worker* worker) : worker_(worker) 142 { 143 napi_open_handle_scope(worker_->workerEnv_, &scope_); 144 worker_->idleState_ = false; 145 worker->isExecutingLongTask_ = false; 146 worker_->NotifyTaskRunning(); 147 } 148 149 ~RunningScope(); 150 151 private: 152 Worker* worker_ = nullptr; 153 napi_handle_scope scope_ = nullptr; 154 }; 155 156 // use PriorityScope to manage the priority setting of workers 157 // reset qos_user_initiated when exit PriorityScope 158 class PriorityScope { 159 public: 160 PriorityScope(Worker* worker, Priority taskPriority); ~PriorityScope()161 ~PriorityScope() 162 { 163 worker_->ResetWorkerPriority(); 164 } 165 166 private: 167 Worker* worker_ = nullptr; 168 }; 169 170 void StartExecuteInThread(); 171 static void ExecuteInThread(const void* data); 172 bool PrepareForWorkerInstance(); 173 void ReleaseWorkerThreadContent(); 174 void ResetWorkerPriority(); 175 bool CheckFreeConditions(); 176 bool UpdateWorkerState(WorkerState expect, WorkerState desired); 177 void StoreTaskId(uint64_t taskId); 178 bool InitTaskPoolFunc(napi_env env, napi_value func, Task* task); 179 void UpdateExecutedInfo(); 180 void UpdateLongTaskInfo(Task* task); 181 bool IsExecutingLongTask(); 182 bool HasLongTask(); 183 void TerminateTask(uint64_t taskId); 184 void CloseHandles(); 185 void PostReleaseSignal(); 186 187 static void HandleFunctionException(napi_env env, Task* task); 188 static void PerformTask(const uv_async_t* req); 189 static void TaskResultCallback(napi_env env, napi_value result, bool success, void* data); 190 static void ReleaseWorkerHandles(const uv_async_t* req); 191 static void TriggerGCCheck(const uv_async_t* req); 192 193 #if defined(ENABLE_TASKPOOL_FFRT) 194 void InitFfrtInfo(); 195 void InitLoopHandleNum(); 196 #endif 197 198 napi_env hostEnv_ {nullptr}; 199 napi_env workerEnv_ {nullptr}; 200 uv_async_t* performTaskSignal_ {nullptr}; 201 uv_async_t* clearWorkerSignal_ {nullptr}; 202 uv_async_t* triggerGCCheckSignal_ {nullptr}; 203 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 204 uv_async_t* debuggerOnPostTaskSignal_ {nullptr}; 205 std::mutex debuggerMutex_; 206 std::queue<DebuggerPostTask> debuggerQueue_ {}; 207 #endif 208 std::unique_ptr<TaskRunner> runner_ {nullptr}; 209 210 std::atomic<int32_t> runningCount_ = 0; 211 std::atomic<bool> idleState_ = true; // true means the worker is idle 212 std::atomic<uint64_t> idlePoint_ = ConcurrentHelper::GetMilliseconds(); 213 std::atomic<uint64_t> startTime_ = ConcurrentHelper::GetMilliseconds(); 214 std::atomic<WorkerState> state_ {WorkerState::IDLE}; 215 std::atomic<bool> hasExecuted_ = false; // false means this worker hasn't execute any tasks 216 Priority priority_ {Priority::DEFAULT}; 217 pid_t tid_ = 0; 218 std::vector<uint64_t> currentTaskId_ {}; 219 std::mutex currentTaskIdMutex_; 220 MessageQueue<TaskResultInfo*> hostMessageQueue_ {}; 221 uint64_t lastCpuTime_ = 0; 222 uint32_t idleCount_ = 0; 223 std::atomic<bool> hasLongTask_ = false; 224 std::atomic<bool> isExecutingLongTask_ = false; 225 std::mutex longMutex_; 226 std::unordered_set<uint64_t> longTasksSet_ {}; 227 std::mutex queueMutex_; // for sendData 228 std::unordered_map<napi_env, MsgQueue> msgQueueMap_ {}; 229 friend class TaskManager; 230 friend class NativeEngineTest; 231 232 #if defined(ENABLE_TASKPOOL_FFRT) 233 void* ffrtTaskHandle_ = nullptr; 234 uint32_t initActiveHandleNum_ = 0; 235 #endif 236 }; 237 } // namespace Commonlibrary::Concurrent::TaskPoolModule 238 #endif // JS_CONCURRENT_MODULE_TASKPOOL_WORKER_H