1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_H
17 #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_H
18 
19 #include <list>
20 #include <map>
21 #include <mutex>
22 #include <set>
23 #include <shared_mutex>
24 #include <string>
25 #include <uv.h>
26 
27 #include "helper/concurrent_helper.h"
28 #include "napi/native_api.h"
29 #include "utils.h"
30 #include "tools/log.h"
31 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
32 #include "event_handler.h"
33 #endif
34 
35 #if defined(ENABLE_TASKPOOL_FFRT)
36 #include "c/executor_task.h"
37 #include "ffrt_inner.h"
38 #endif
39 
40 namespace Commonlibrary::Concurrent::TaskPoolModule {
41 using namespace Commonlibrary::Platform;
42 
43 enum ExecuteState { NOT_FOUND, WAITING, RUNNING, CANCELED, FINISHED, DELAYED, ENDING};
44 enum TaskType { TASK, FUNCTION_TASK, SEQRUNNER_TASK, COMMON_TASK, GROUP_COMMON_TASK, GROUP_FUNCTION_TASK };
45 
46 struct GroupInfo;
47 class Worker;
48 struct TaskInfo {
49     napi_deferred deferred = nullptr;
50     Priority priority {Priority::DEFAULT};
51     void* serializationFunction = nullptr;
52     void* serializationArguments = nullptr;
53 };
54 
55 #if defined(ENABLE_TASKPOOL_FFRT)
56 #define RECURSIVE_MUTEX ffrt::recursive_mutex
57 #define FFRT_MUTEX ffrt::mutex
58 #define SHARED_MUTEX ffrt::shared_mutex
59 #else
60 #define RECURSIVE_MUTEX std::recursive_mutex
61 #define FFRT_MUTEX std::mutex
62 #define SHARED_MUTEX std::shared_mutex
63 #endif
64 
65 struct ListenerCallBackInfo {
ListenerCallBackInfoCommonlibrary::Concurrent::TaskPoolModule::ListenerCallBackInfo66     ListenerCallBackInfo(napi_env env, napi_ref callbackRef, napi_value taskError) : env_(env),
67         callbackRef_(callbackRef), taskError_(taskError) {}
~ListenerCallBackInfoCommonlibrary::Concurrent::TaskPoolModule::ListenerCallBackInfo68     ~ListenerCallBackInfo()
69     {
70         napi_delete_reference(env_, callbackRef_);
71     }
72     napi_env env_;
73     napi_ref callbackRef_;
74     napi_value taskError_;
75 };
76 
77 class Task {
78 public:
79     Task(napi_env env, TaskType taskType, std::string name);
80     Task() = default;
81     ~Task() = default;
82 
83     static napi_value TaskConstructor(napi_env env, napi_callback_info cbinfo);
84     static napi_value LongTaskConstructor(napi_env env, napi_callback_info cbinfo);
85     static napi_value SetTransferList(napi_env env, napi_callback_info cbinfo);
86     static napi_value SetCloneList(napi_env env, napi_callback_info cbinfo);
87     static napi_value IsCanceled(napi_env env, napi_callback_info cbinfo);
88     static napi_value OnReceiveData(napi_env env, napi_callback_info cbinfo);
89     static napi_value SendData(napi_env env, napi_callback_info cbinfo);
90     static napi_value AddDependency(napi_env env, napi_callback_info cbinfo);
91     static napi_value RemoveDependency(napi_env env, napi_callback_info cbinfo);
92     static napi_value OnEnqueued(napi_env env, napi_callback_info cbinfo);
93     static napi_value OnStartExecution(napi_env env, napi_callback_info cbinfo);
94     static napi_value OnExecutionFailed(napi_env env, napi_callback_info cbinfo);
95     static napi_value OnExecutionSucceeded(napi_env env, napi_callback_info cbinfo);
96     static napi_value IsDone(napi_env env, napi_callback_info cbinfo);
97     static napi_value GetTotalDuration(napi_env env, napi_callback_info info);
98     static napi_value GetCPUDuration(napi_env env, napi_callback_info info);
99     static napi_value GetIODuration(napi_env env, napi_callback_info info);
100     static napi_value GetTaskDuration(napi_env env, napi_callback_info& info, std::string durationType);
101     static napi_value GetName(napi_env env, [[maybe_unused]] napi_callback_info info);
102 
103     static Task* GenerateTask(napi_env env, napi_value task, napi_value func,
104                               napi_value name, napi_value* args, size_t argc);
105     static Task* GenerateFunctionTask(napi_env env, napi_value func, napi_value* args, size_t argc, TaskType type);
106     static TaskInfo* GenerateTaskInfo(napi_env env, napi_value func, napi_value args,
107                                       napi_value transferList, napi_value cloneList, Priority priority,
108                                       bool defaultTransfer = true, bool defaultCloneSendable = false);
109     static void TaskDestructor(napi_env env, void* data, void* hint);
110 
111     static void ThrowNoDependencyError(napi_env env);
112     static void StartExecutionCallback(const uv_async_t* req);
113     static void StartExecutionTask(ListenerCallBackInfo* listenerCallBackInfo);
114     static void ExecuteListenerCallback(ListenerCallBackInfo* listenerCallBackInfo);
115     static void CleanupHookFunc(void* arg);
116 
117     void StoreTaskId(uint64_t taskId);
118     napi_value GetTaskInfoPromise(napi_env env, napi_value task, TaskType taskType = TaskType::COMMON_TASK,
119                                   Priority priority = Priority::DEFAULT);
120     TaskInfo* GetTaskInfo(napi_env env, napi_value task, Priority priority);
121     void UpdateTaskType(TaskType taskType);
122     void UpdatePeriodicTask();
123     bool IsRepeatableTask() const;
124     bool IsGroupTask() const;
125     bool IsGroupCommonTask() const;
126     bool IsGroupFunctionTask() const;
127     bool IsCommonTask() const;
128     bool IsSeqRunnerTask() const;
129     bool IsFunctionTask() const;
130     bool IsLongTask() const;
131     bool IsPeriodicTask() const;
132     bool IsMainThreadTask() const;
133     bool IsExecuted() const;
134     void IncreaseRefCount();
135     void DecreaseRefCount();
136     bool IsReadyToHandle() const;
137     void NotifyPendingTask();
138     void CancelPendingTask(napi_env env);
139     bool UpdateTask(uint64_t startTime, void* worker);
140     napi_value DeserializeValue(napi_env env, napi_value* func, napi_value* args);
141     void StoreTaskDuration();
142     bool CanForSequenceRunner(napi_env env);
143     bool CanForTaskGroup(napi_env env);
144     bool CanExecute(napi_env env);
145     bool CanExecuteDelayed(napi_env env);
146     bool CanExecutePeriodically(napi_env env);
147     void SetHasDependency(bool hasDependency);
148     bool HasDependency() const;
149     void TryClearHasDependency();
150     void ClearDelayedTimers();
151     void IncreaseTaskRefCount();
152     void DecreaseTaskRefCount();
153     bool ShouldDeleteTask(bool needUnref = true);
154     bool VerifyAndPostResult(Priority priority);
155     bool CheckStartExecution(Priority priority);
156     bool IsValid();
157     void SetValid(bool isValid);
158 
159 private:
160     Task(const Task &) = delete;
161     Task& operator=(const Task &) = delete;
162     Task(Task &&) = delete;
163     Task& operator=(Task &&) = delete;
164 
165     void InitHandle(napi_env env);
166 
167 public:
168     napi_env env_ = nullptr;
169     TaskType taskType_ {TaskType::TASK};
170     std::string name_ {};
171     uint64_t taskId_ {};
172     std::atomic<ExecuteState> taskState_ {ExecuteState::NOT_FOUND};
173     uint64_t groupId_ {}; // 0 for task outside taskgroup
174     uint64_t seqRunnerId_ {}; // 0 for task without seqRunner
175     TaskInfo* currentTaskInfo_ {};
176     std::list<TaskInfo*> pendingTaskInfos_ {}; // for a common task executes multiple times
177     void* result_ = nullptr;
178     uv_async_t* onResultSignal_ = nullptr;
179     std::atomic<bool> success_ {true};
180     std::atomic<uint64_t> startTime_ {};
181     std::atomic<uint64_t> cpuTime_ {};
182     std::atomic<uint64_t> ioTime_ {};
183     void* worker_ {nullptr};
184     napi_ref taskRef_ {};
185     std::atomic<uint32_t> taskRefCount_ {};
186     RECURSIVE_MUTEX taskMutex_ {};
187     bool hasDependency_ {false};
188     bool isLongTask_ {false};
189     bool defaultTransfer_ {true};
190     bool defaultCloneSendable_ {false};
191     std::atomic<bool> isValid_ {true};
192     std::atomic<uint32_t> refCount_ {false}; // when refCount_ is 0, the task pointer can be deleted
193     uv_async_t* onStartExecutionSignal_ = nullptr;
194     ListenerCallBackInfo* onEnqueuedCallBackInfo_ = nullptr;
195     ListenerCallBackInfo* onStartExecutionCallBackInfo_ = nullptr;
196     ListenerCallBackInfo* onExecutionFailedCallBackInfo_ = nullptr;
197     ListenerCallBackInfo* onExecutionSucceededCallBackInfo_ = nullptr;
198 
199     // for periodic task
200     bool isPeriodicTask_ {false};
201     // periodic task first Generate TaskInfo
202     std::atomic<bool> isFirstTaskInfo_ {false};
203     uv_timer_t* timer_ {nullptr};
204     Priority periodicTaskPriority_ {Priority::DEFAULT};
205 
206     std::set<uv_timer_t*> delayedTimers_ {}; // task delayed timer
207 
208     bool isMainThreadTask_ {false};
209 };
210 
211 struct CallbackInfo {
CallbackInfoCommonlibrary::Concurrent::TaskPoolModule::CallbackInfo212     CallbackInfo(napi_env env, uint32_t count, napi_ref ref, Task* task)
213         : hostEnv(env), refCount(count), callbackRef(ref), task(task), onCallbackSignal(nullptr), worker(nullptr) {}
~CallbackInfoCommonlibrary::Concurrent::TaskPoolModule::CallbackInfo214     ~CallbackInfo()
215     {
216         napi_delete_reference(hostEnv, callbackRef);
217 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
218         if (task == nullptr) {
219             return;
220         }
221         if (!task->IsMainThreadTask() && onCallbackSignal != nullptr) {
222             Common::Helper::ConcurrentHelper::UvHandleClose(onCallbackSignal);
223         }
224 #else
225         if (onCallbackSignal != nullptr) {
226             Common::Helper::ConcurrentHelper::UvHandleClose(onCallbackSignal);
227         }
228 #endif
229     }
230 
231     napi_env hostEnv;
232     uint32_t refCount;
233     napi_ref callbackRef;
234     Task* task;
235     uv_async_t* onCallbackSignal;
236     Worker* worker;
237 };
238 
239 struct TaskResultInfo {
TaskResultInfoCommonlibrary::Concurrent::TaskPoolModule::TaskResultInfo240     TaskResultInfo(napi_env env, napi_env curEnv, uint64_t id, void* args) : hostEnv(env), workerEnv(curEnv),
241         taskId(id), serializationArgs(args) {}
242     ~TaskResultInfo() = default;
243 
244     napi_env hostEnv;
245     napi_env workerEnv;
246     uint64_t taskId;
247     void* serializationArgs;
248 };
249 } // namespace Commonlibrary::Concurrent::TaskPoolModule
250 #endif // JS_CONCURRENT_MODULE_TASKPOOL_TASK_H