1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H
18 
19 #include <array>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <set>
24 #include <shared_mutex>
25 #include <unordered_map>
26 #include <unordered_set>
27 #include <vector>
28 
29 #include "napi/native_api.h"
30 #include "sequence_runner.h"
31 #include "task.h"
32 #include "task_queue.h"
33 #include "task_group.h"
34 #include "worker.h"
35 
36 namespace Commonlibrary::Concurrent::TaskPoolModule {
37 using namespace Commonlibrary::Concurrent::Common;
38 
39 static constexpr char ARGUMENTS_STR[] = "arguments";
40 static constexpr char NAME[] = "name";
41 static constexpr char FUNCTION_STR[] = "function";
42 static constexpr char GROUP_ID_STR[] = "groupId";
43 static constexpr char TASKID_STR[] = "taskId";
44 static constexpr char TASKINFO_STR[] = "taskInfo";
45 static constexpr char TRANSFERLIST_STR[] = "transferList";
46 static constexpr char CLONE_LIST_STR[] = "cloneList";
47 static constexpr char ADD_DEPENDENCY_STR[] = "addDependency";
48 static constexpr char REMOVE_DEPENDENCY_STR[] = "removeDependency";
49 static constexpr char TASK_CPU_TIME[] = "cpuDuration";
50 static constexpr char TASK_IO_TIME[] = "ioDuration";
51 static constexpr char TASK_TOTAL_TIME[] = "totalDuration";
52 static constexpr char DEFAULT_TRANSFER_STR[] = "defaultTransfer";
53 static constexpr char DEFAULT_CLONE_SENDABLE_STR[] = "defaultCloneSendable";
54 
55 class TaskGroup;
56 
57 class TaskManager {
58 public:
59     static TaskManager& GetInstance();
60 
61     void StoreTask(uint64_t taskId, Task* task);
62     void RemoveTask(uint64_t taskId);
63     Task* GetTask(uint64_t taskId);
64     void EnqueueTaskId(uint64_t taskId, Priority priority = Priority::DEFAULT);
65     void EraseWaitingTaskId(uint64_t taskId, Priority priority);
66     std::pair<uint64_t, Priority> DequeueTaskId();
67     void CancelTask(napi_env env, uint64_t taskId);
68     void CancelSeqRunnerTask(napi_env env, Task* task);
69     void ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask = true);
70 
71     // for worker state
72     void NotifyWorkerIdle(Worker* worker);
73     void NotifyWorkerCreated(Worker* worker);
74     void NotifyWorkerRunning(Worker* worker);
75     void RemoveWorker(Worker* worker);
76     void RestoreWorker(Worker* worker);
77 
78     // for load balance
79     void InitTaskManager(napi_env env);
80     void UpdateExecutedInfo(uint64_t duration);
81     void TryTriggerExpand();
82 
83     // for taskpool state
84     uint32_t GetTaskNum();
85     uint32_t GetIdleWorkers();
86     uint32_t GetThreadNum();
87     uint32_t GetRunningWorkers();
88     uint32_t GetTimeoutWorkers();
89     void GetIdleWorkersList(uint32_t step);
90     bool ReadThreadInfo(pid_t tid, char* buf, uint32_t size);
91 
92     // for get thread info
93     napi_value GetThreadInfos(napi_env env);
94 
95     // for get task info
96     napi_value GetTaskInfos(napi_env env);
97 
98     // for countTrace for worker
99     void CountTraceForWorker();
100 
101     std::shared_ptr<CallbackInfo> GetCallbackInfo(uint64_t taskId);
102     void RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo);
103     void IncreaseRefCount(uint64_t taskId);
104     void DecreaseRefCount(napi_env env, uint64_t taskId);
105     napi_value NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task);
106     MsgQueue* GetMessageQueue(const uv_async_t* req);
107     MsgQueue* GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo);
108 
109     // for task dependency
110     bool IsDependendByTaskId(uint64_t taskId);
111     bool IsDependentByTaskId(uint64_t dependentTaskId);
112     void NotifyDependencyTaskInfo(uint64_t taskId);
113     void RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId);
114     bool StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet);
115     bool RemoveTaskDependency(uint64_t taskId, uint64_t dependentId);
116     bool CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId);
117     void EnqueuePendingTaskInfo(uint64_t taskId, Priority priority);
118     std::pair<uint64_t, Priority> DequeuePendingTaskInfo(uint64_t taskId);
119     void RemovePendingTaskInfo(uint64_t taskId);
120     void StoreDependentTaskInfo(std::set<uint64_t> dependTaskIdSet, uint64_t taskId);
121     void RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId);
122     std::string GetTaskDependInfoToString(uint64_t taskId);
123 
124     bool PostTask(std::function<void()> task, const char* taskName, Priority priority = Priority::DEFAULT);
125 
126     // for duration
127     void StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration);
128     uint64_t GetTaskDuration(uint64_t taskId, std::string durationType);
129     void RemoveTaskDuration(uint64_t taskId);
130     void StoreLongTaskInfo(uint64_t taskId, Worker* worker);
131     void RemoveLongTaskInfo(uint64_t taskId);
132     void TerminateTask(uint64_t taskId);
133     Worker* GetLongTaskInfo(uint64_t taskId);
134 
135     // for callback
136     void ReleaseCallBackInfo(Task* task);
137 
138     void UpdateSystemAppFlag();
IsSystemApp() const139     bool IsSystemApp() const
140     {
141         return isSystemApp_;
142     }
EnableFfrt() const143     bool EnableFfrt() const
144     {
145         return globalEnableFfrtFlag_ || (isSystemApp_ && !disableFfrtFlag_);
146     }
147 
148     bool CheckTask(uint64_t taskId);
149 
150 private:
151     TaskManager();
152     ~TaskManager();
153     TaskManager(const TaskManager &) = delete;
154     TaskManager& operator=(const TaskManager &) = delete;
155     TaskManager(TaskManager &&) = delete;
156     TaskManager& operator=(TaskManager &&) = delete;
157 
158     void CreateWorkers(napi_env env, uint32_t num = 1);
159     void NotifyExecuteTask();
160     void NotifyWorkerAdded(Worker* worker);
161 
162     // for load balance
163     void RunTaskManager();
164     void CheckForBlockedWorkers();
165     void TryExpand();
166     void NotifyShrink(uint32_t targetNum);
167     void TriggerShrink(uint32_t step);
168     uint32_t ComputeSuitableThreadNum();
169     uint32_t ComputeSuitableIdleNum();
170     static void NotifyExpand(const uv_async_t* req);
171     static void TriggerLoadBalance(const uv_timer_t* req = nullptr);
172 
173     bool IsChooseIdle();
174     uint32_t GetNonIdleTaskNum();
175     std::pair<uint64_t, Priority> GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, Priority priority);
176     void IncreaseNumIfNoIdle(Priority priority);
177     void DecreaseNumIfNoIdle(Priority priority);
178 
179     // <taskId, Task>
180     std::unordered_map<uint64_t, Task*> tasks_ {};
181     RECURSIVE_MUTEX tasksMutex_;
182 
183     // <taskId, <dependent taskId1, dependent taskId2, ...>>, update when removeDependency or executeTask
184     std::unordered_map<uint64_t, std::set<uint64_t>> dependTaskInfos_ {};
185     std::shared_mutex dependTaskInfosMutex_;
186 
187     // <dependent taskId, <taskId1, taskId2, ...>>, update when removeDependency or executeTask
188     std::unordered_map<uint64_t, std::set<uint64_t>> dependentTaskInfos_ {};
189     std::shared_mutex dependentTaskInfosMutex_;
190 
191     // <<pendingTaskId1, priority>, <pendingTaskId2, priority>, ...>
192     std::unordered_map<uint64_t, Priority> pendingTaskInfos_ {};
193     std::shared_mutex pendingTaskInfosMutex_;
194 
195     // <<taskId1, <totalDuration1, cpuDuration1>>, <taskId2, <totalDuration2, cpuDuration2>>, ...>
196     std::unordered_map<uint64_t, std::pair<uint64_t, uint64_t>> taskDurationInfos_ {};
197     std::shared_mutex taskDurationInfosMutex_;
198 
199     // record the longTasks and workers for efficiency
200     std::unordered_map<uint64_t, Worker*> longTasksMap_ {};
201     std::shared_mutex longTasksMutex_{};
202 
203     std::unordered_set<Worker*> workers_ {};
204     std::unordered_set<Worker*> idleWorkers_ {};
205     std::unordered_set<Worker*> timeoutWorkers_ {};
206     RECURSIVE_MUTEX workersMutex_;
207 
208     // for load balance
209     napi_env hostEnv_ = nullptr;
210     uv_loop_t* loop_ = nullptr;
211     uv_timer_t* timer_ = nullptr;
212     uv_async_t* expandHandle_ = nullptr;
213     std::atomic<bool> suspend_ = false;
214     std::atomic<uint32_t> retryCount_ = 0;
215     std::atomic<uint32_t> nonIdleTaskNum_ = 0;
216     std::atomic<uint32_t> totalExecCount_ = 0;
217     std::atomic<uint64_t> totalExecTime_ = 0;
218     std::atomic<bool> needChecking_ = false;
219     std::atomic<bool> isHandleInited_ = false;
220 
221     // for task priority
222     uint32_t highPrioExecuteCount_ = 0;
223     uint32_t mediumPrioExecuteCount_ = 0;
224     std::array<std::unique_ptr<ExecuteQueue>, Priority::NUMBER> taskQueues_ {};
225     FFRT_MUTEX taskQueuesMutex_;
226 
227     std::atomic<bool> isInitialized_ = false;
228     std::atomic<bool> isSystemApp_ = false;
229     int disableFfrtFlag_ = 0; // 0 means enable ffrt
230     int globalEnableFfrtFlag_ = 0; // 0 means not global enable ffrt
231 
232     std::mutex callbackMutex_;
233     std::map<uint32_t, std::shared_ptr<CallbackInfo>> callbackTable_ {};
234     std::vector<Worker*> freeList_ {};
235 
236 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
237     std::shared_ptr<OHOS::AppExecFwk::EventHandler> mainThreadHandler_ {};
238 #endif
239 
240     friend class TaskGroupManager;
241     friend class NativeEngineTest;
242 };
243 
244 class TaskGroupManager {
245 public:
246     TaskGroupManager() = default;
247     ~TaskGroupManager() = default;
248 
249     static TaskGroupManager &GetInstance();
250 
251     void AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId);
252     void StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup);
253     void RemoveTaskGroup(uint64_t groupId);
254     TaskGroup* GetTaskGroup(uint64_t groupId);
255     void CancelGroup(napi_env env, uint64_t groupId);
256     void CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group);
257     void ReleaseTaskGroupData(napi_env env, TaskGroup* group);
258     bool UpdateGroupState(uint64_t groupId);
259 
260     void AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task);
261     bool TriggerSeqRunner(napi_env env, Task* lastTask);
262     void DisposeCanceledTask(napi_env env, Task* task);
263     void StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner);
264     void RemoveSequenceRunner(uint64_t seqRunnerId);
265     SequenceRunner* GetSeqRunner(uint64_t seqRunnerId);
266 
267 private:
268     TaskGroupManager(const TaskGroupManager &) = delete;
269     TaskGroupManager& operator=(const TaskGroupManager &) = delete;
270     TaskGroupManager(TaskGroupManager &&) = delete;
271     TaskGroupManager& operator=(TaskGroupManager &&) = delete;
272 
273     // <groupId, TaskGroup>
274     std::unordered_map<uint64_t, TaskGroup*> taskGroups_ {};
275     std::mutex taskGroupsMutex_;
276 
277     // <seqRunnerId, SequenceRunner>
278     std::unordered_map<uint64_t, SequenceRunner*> seqRunners_ {};
279     std::mutex seqRunnersMutex_;
280     friend class NativeEngineTest;
281 };
282 
283 class SequenceRunnerManager {
284 public:
285     SequenceRunnerManager() = default;
286     ~SequenceRunnerManager() = default;
287 
288     static SequenceRunnerManager &GetInstance();
289     SequenceRunner* CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc,
290                                             const std::string &name, uint32_t priority);
291     uint64_t DecreaseSeqCount(SequenceRunner* seqRunner);
292     void RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner);
293     void RemoveSequenceRunner(const std::string &name);
294     bool TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner);
295     void GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner);
296 
297 private:
298     SequenceRunnerManager(const SequenceRunnerManager &) = delete;
299     SequenceRunnerManager& operator=(const SequenceRunnerManager &) = delete;
300     SequenceRunnerManager(SequenceRunnerManager &&) = delete;
301     SequenceRunnerManager& operator=(SequenceRunnerManager &&) = delete;
302 
303     // <<name1, seqRunner>, <name2, seqRunner>, ...>
304     std::unordered_map<std::string, SequenceRunner*> globalSeqRunner_ {};
305     std::mutex globalSeqRunnerMutex_;
306 };
307 } // namespace Commonlibrary::Concurrent::TaskPoolModule
308 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_MANAGER_H