1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H 18 19 #include <array> 20 #include <list> 21 #include <memory> 22 #include <mutex> 23 #include <set> 24 #include <shared_mutex> 25 #include <unordered_map> 26 #include <unordered_set> 27 #include <vector> 28 29 #include "napi/native_api.h" 30 #include "sequence_runner.h" 31 #include "task.h" 32 #include "task_queue.h" 33 #include "task_group.h" 34 #include "worker.h" 35 36 namespace Commonlibrary::Concurrent::TaskPoolModule { 37 using namespace Commonlibrary::Concurrent::Common; 38 39 static constexpr char ARGUMENTS_STR[] = "arguments"; 40 static constexpr char NAME[] = "name"; 41 static constexpr char FUNCTION_STR[] = "function"; 42 static constexpr char GROUP_ID_STR[] = "groupId"; 43 static constexpr char TASKID_STR[] = "taskId"; 44 static constexpr char TASKINFO_STR[] = "taskInfo"; 45 static constexpr char TRANSFERLIST_STR[] = "transferList"; 46 static constexpr char CLONE_LIST_STR[] = "cloneList"; 47 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency"; 48 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency"; 49 static constexpr char TASK_CPU_TIME[] = "cpuDuration"; 50 static constexpr char TASK_IO_TIME[] = "ioDuration"; 51 static constexpr char TASK_TOTAL_TIME[] = "totalDuration"; 52 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer"; 53 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable"; 54 55 class TaskGroup; 56 57 class TaskManager { 58 public: 59 static TaskManager& GetInstance(); 60 61 void StoreTask(uint64_t taskId, Task* task); 62 void RemoveTask(uint64_t taskId); 63 Task* GetTask(uint64_t taskId); 64 void EnqueueTaskId(uint64_t taskId, Priority priority = Priority::DEFAULT); 65 void EraseWaitingTaskId(uint64_t taskId, Priority priority); 66 std::pair<uint64_t, Priority> DequeueTaskId(); 67 void CancelTask(napi_env env, uint64_t taskId); 68 void CancelSeqRunnerTask(napi_env env, Task* task); 69 void ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask = true); 70 71 // for worker state 72 void NotifyWorkerIdle(Worker* worker); 73 void NotifyWorkerCreated(Worker* worker); 74 void NotifyWorkerRunning(Worker* worker); 75 void RemoveWorker(Worker* worker); 76 void RestoreWorker(Worker* worker); 77 78 // for load balance 79 void InitTaskManager(napi_env env); 80 void UpdateExecutedInfo(uint64_t duration); 81 void TryTriggerExpand(); 82 83 // for taskpool state 84 uint32_t GetTaskNum(); 85 uint32_t GetIdleWorkers(); 86 uint32_t GetThreadNum(); 87 uint32_t GetRunningWorkers(); 88 uint32_t GetTimeoutWorkers(); 89 void GetIdleWorkersList(uint32_t step); 90 bool ReadThreadInfo(pid_t tid, char* buf, uint32_t size); 91 92 // for get thread info 93 napi_value GetThreadInfos(napi_env env); 94 95 // for get task info 96 napi_value GetTaskInfos(napi_env env); 97 98 // for countTrace for worker 99 void CountTraceForWorker(); 100 101 std::shared_ptr<CallbackInfo> GetCallbackInfo(uint64_t taskId); 102 void RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo); 103 void IncreaseRefCount(uint64_t taskId); 104 void DecreaseRefCount(napi_env env, uint64_t taskId); 105 napi_value NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task); 106 MsgQueue* GetMessageQueue(const uv_async_t* req); 107 MsgQueue* GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo); 108 109 // for task dependency 110 bool IsDependendByTaskId(uint64_t taskId); 111 bool IsDependentByTaskId(uint64_t dependentTaskId); 112 void NotifyDependencyTaskInfo(uint64_t taskId); 113 void RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId); 114 bool StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet); 115 bool RemoveTaskDependency(uint64_t taskId, uint64_t dependentId); 116 bool CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId); 117 void EnqueuePendingTaskInfo(uint64_t taskId, Priority priority); 118 std::pair<uint64_t, Priority> DequeuePendingTaskInfo(uint64_t taskId); 119 void RemovePendingTaskInfo(uint64_t taskId); 120 void StoreDependentTaskInfo(std::set<uint64_t> dependTaskIdSet, uint64_t taskId); 121 void RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId); 122 std::string GetTaskDependInfoToString(uint64_t taskId); 123 124 bool PostTask(std::function<void()> task, const char* taskName, Priority priority = Priority::DEFAULT); 125 126 // for duration 127 void StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration); 128 uint64_t GetTaskDuration(uint64_t taskId, std::string durationType); 129 void RemoveTaskDuration(uint64_t taskId); 130 void StoreLongTaskInfo(uint64_t taskId, Worker* worker); 131 void RemoveLongTaskInfo(uint64_t taskId); 132 void TerminateTask(uint64_t taskId); 133 Worker* GetLongTaskInfo(uint64_t taskId); 134 135 // for callback 136 void ReleaseCallBackInfo(Task* task); 137 138 void UpdateSystemAppFlag(); IsSystemApp() const139 bool IsSystemApp() const 140 { 141 return isSystemApp_; 142 } EnableFfrt() const143 bool EnableFfrt() const 144 { 145 return globalEnableFfrtFlag_ || (isSystemApp_ && !disableFfrtFlag_); 146 } 147 148 bool CheckTask(uint64_t taskId); 149 150 private: 151 TaskManager(); 152 ~TaskManager(); 153 TaskManager(const TaskManager &) = delete; 154 TaskManager& operator=(const TaskManager &) = delete; 155 TaskManager(TaskManager &&) = delete; 156 TaskManager& operator=(TaskManager &&) = delete; 157 158 void CreateWorkers(napi_env env, uint32_t num = 1); 159 void NotifyExecuteTask(); 160 void NotifyWorkerAdded(Worker* worker); 161 162 // for load balance 163 void RunTaskManager(); 164 void CheckForBlockedWorkers(); 165 void TryExpand(); 166 void NotifyShrink(uint32_t targetNum); 167 void TriggerShrink(uint32_t step); 168 uint32_t ComputeSuitableThreadNum(); 169 uint32_t ComputeSuitableIdleNum(); 170 static void NotifyExpand(const uv_async_t* req); 171 static void TriggerLoadBalance(const uv_timer_t* req = nullptr); 172 173 bool IsChooseIdle(); 174 uint32_t GetNonIdleTaskNum(); 175 std::pair<uint64_t, Priority> GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, Priority priority); 176 void IncreaseNumIfNoIdle(Priority priority); 177 void DecreaseNumIfNoIdle(Priority priority); 178 179 // <taskId, Task> 180 std::unordered_map<uint64_t, Task*> tasks_ {}; 181 RECURSIVE_MUTEX tasksMutex_; 182 183 // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask 184 std::unordered_map<uint64_t, std::set<uint64_t>> dependTaskInfos_ {}; 185 std::shared_mutex dependTaskInfosMutex_; 186 187 // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask 188 std::unordered_map<uint64_t, std::set<uint64_t>> dependentTaskInfos_ {}; 189 std::shared_mutex dependentTaskInfosMutex_; 190 191 // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...> 192 std::unordered_map<uint64_t, Priority> pendingTaskInfos_ {}; 193 std::shared_mutex pendingTaskInfosMutex_; 194 195 // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...> 196 std::unordered_map<uint64_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {}; 197 std::shared_mutex taskDurationInfosMutex_; 198 199 // record the longTasks and workers for efficiency 200 std::unordered_map<uint64_t, Worker*> longTasksMap_ {}; 201 std::shared_mutex longTasksMutex_{}; 202 203 std::unordered_set<Worker*> workers_ {}; 204 std::unordered_set<Worker*> idleWorkers_ {}; 205 std::unordered_set<Worker*> timeoutWorkers_ {}; 206 RECURSIVE_MUTEX workersMutex_; 207 208 // for load balance 209 napi_env hostEnv_ = nullptr; 210 uv_loop_t* loop_ = nullptr; 211 uv_timer_t* timer_ = nullptr; 212 uv_async_t* expandHandle_ = nullptr; 213 std::atomic<bool> suspend_ = false; 214 std::atomic<uint32_t> retryCount_ = 0; 215 std::atomic<uint32_t> nonIdleTaskNum_ = 0; 216 std::atomic<uint32_t> totalExecCount_ = 0; 217 std::atomic<uint64_t> totalExecTime_ = 0; 218 std::atomic<bool> needChecking_ = false; 219 std::atomic<bool> isHandleInited_ = false; 220 221 // for task priority 222 uint32_t highPrioExecuteCount_ = 0; 223 uint32_t mediumPrioExecuteCount_ = 0; 224 std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {}; 225 FFRT_MUTEX taskQueuesMutex_; 226 227 std::atomic<bool> isInitialized_ = false; 228 std::atomic<bool> isSystemApp_ = false; 229 int disableFfrtFlag_ = 0; // 0 means enable ffrt 230 int globalEnableFfrtFlag_ = 0; // 0 means not global enable ffrt 231 232 std::mutex callbackMutex_; 233 std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {}; 234 std::vector<Worker*> freeList_ {}; 235 236 #if defined(ENABLE_TASKPOOL_EVENTHANDLER) 237 std::shared_ptr<OHOS::AppExecFwk::EventHandler> mainThreadHandler_ {}; 238 #endif 239 240 friend class TaskGroupManager; 241 friend class NativeEngineTest; 242 }; 243 244 class TaskGroupManager { 245 public: 246 TaskGroupManager() = default; 247 ~TaskGroupManager() = default; 248 249 static TaskGroupManager &GetInstance(); 250 251 void AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId); 252 void StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup); 253 void RemoveTaskGroup(uint64_t groupId); 254 TaskGroup* GetTaskGroup(uint64_t groupId); 255 void CancelGroup(napi_env env, uint64_t groupId); 256 void CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group); 257 void ReleaseTaskGroupData(napi_env env, TaskGroup* group); 258 bool UpdateGroupState(uint64_t groupId); 259 260 void AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task); 261 bool TriggerSeqRunner(napi_env env, Task* lastTask); 262 void DisposeCanceledTask(napi_env env, Task* task); 263 void StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner); 264 void RemoveSequenceRunner(uint64_t seqRunnerId); 265 SequenceRunner* GetSeqRunner(uint64_t seqRunnerId); 266 267 private: 268 TaskGroupManager(const TaskGroupManager &) = delete; 269 TaskGroupManager& operator=(const TaskGroupManager &) = delete; 270 TaskGroupManager(TaskGroupManager &&) = delete; 271 TaskGroupManager& operator=(TaskGroupManager &&) = delete; 272 273 // <groupId, TaskGroup> 274 std::unordered_map<uint64_t, TaskGroup*> taskGroups_ {}; 275 std::mutex taskGroupsMutex_; 276 277 // <seqRunnerId, SequenceRunner> 278 std::unordered_map<uint64_t, SequenceRunner*> seqRunners_ {}; 279 std::mutex seqRunnersMutex_; 280 friend class NativeEngineTest; 281 }; 282 283 class SequenceRunnerManager { 284 public: 285 SequenceRunnerManager() = default; 286 ~SequenceRunnerManager() = default; 287 288 static SequenceRunnerManager &GetInstance(); 289 SequenceRunner* CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc, 290 const std::string &name, uint32_t priority); 291 uint64_t DecreaseSeqCount(SequenceRunner* seqRunner); 292 void RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner); 293 void RemoveSequenceRunner(const std::string &name); 294 bool TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner); 295 void GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner); 296 297 private: 298 SequenceRunnerManager(const SequenceRunnerManager &) = delete; 299 SequenceRunnerManager& operator=(const SequenceRunnerManager &) = delete; 300 SequenceRunnerManager(SequenceRunnerManager &&) = delete; 301 SequenceRunnerManager& operator=(SequenceRunnerManager &&) = delete; 302 303 // <<name1, seqRunner>, <name2, seqRunner>, ...> 304 std::unordered_map<std::string, SequenceRunner*> globalSeqRunner_ {}; 305 std::mutex globalSeqRunnerMutex_; 306 }; 307 } // namespace Commonlibrary::Concurrent::TaskPoolModule 308 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H