1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "taskpool.h"
17 
18 #include <cinttypes>
19 
20 #include "helper/error_helper.h"
21 #include "helper/hitrace_helper.h"
22 #include "helper/napi_helper.h"
23 #include "helper/object_helper.h"
24 #include "message_queue.h"
25 #include "task_manager.h"
26 #include "tools/log.h"
27 #include "worker.h"
28 
29 namespace Commonlibrary::Concurrent::TaskPoolModule {
30 using namespace Commonlibrary::Concurrent::Common::Helper;
31 
InitTaskPool(napi_env env, napi_value exports)32 napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports)
33 {
34     HILOG_INFO("taskpool:: Import taskpool");
35     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
36     napi_value taskClass = nullptr;
37     napi_define_class(env, "Task", NAPI_AUTO_LENGTH, Task::TaskConstructor, nullptr, 0, nullptr, &taskClass);
38     napi_value longTaskClass = nullptr;
39     napi_define_class(env, "LongTask", NAPI_AUTO_LENGTH, Task::LongTaskConstructor,
40                       nullptr, 0, nullptr, &longTaskClass);
41     napi_value genericsTaskClass = nullptr;
42     napi_define_class(env, "GenericsTask", NAPI_AUTO_LENGTH, Task::TaskConstructor,
43                       nullptr, 0, nullptr, &genericsTaskClass);
44     napi_value isCanceledFunc = nullptr;
45     napi_create_function(env, "isCanceled", NAPI_AUTO_LENGTH, Task::IsCanceled, NULL, &isCanceledFunc);
46     napi_set_named_property(env, taskClass, "isCanceled", isCanceledFunc);
47     napi_value sendDataFunc = nullptr;
48     napi_create_function(env, "sendData", NAPI_AUTO_LENGTH, Task::SendData, NULL, &sendDataFunc);
49     napi_set_named_property(env, taskClass, "sendData", sendDataFunc);
50     napi_value taskGroupClass = nullptr;
51     napi_define_class(env, "TaskGroup", NAPI_AUTO_LENGTH, TaskGroup::TaskGroupConstructor, nullptr, 0, nullptr,
52                       &taskGroupClass);
53     napi_value seqRunnerClass = nullptr;
54     napi_define_class(env, "SequenceRunner", NAPI_AUTO_LENGTH, SequenceRunner::SeqRunnerConstructor,
55                       nullptr, 0, nullptr, &seqRunnerClass);
56 
57     // define priority
58     napi_value priorityObj = NapiHelper::CreateObject(env);
59     napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH);
60     napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM);
61     napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW);
62     napi_value idlePriority = NapiHelper::CreateUint32(env, Priority::IDLE);
63     napi_property_descriptor exportPriority[] = {
64         DECLARE_NAPI_PROPERTY("HIGH", highPriority),
65         DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority),
66         DECLARE_NAPI_PROPERTY("LOW", lowPriority),
67         DECLARE_NAPI_PROPERTY("IDLE", idlePriority),
68     };
69     napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority);
70 
71     // define State
72     napi_value stateObj = NapiHelper::CreateObject(env);
73     napi_value waitingState = NapiHelper::CreateUint32(env, ExecuteState::WAITING);
74     napi_value runningState = NapiHelper::CreateUint32(env, ExecuteState::RUNNING);
75     napi_value canceledState = NapiHelper::CreateUint32(env, ExecuteState::CANCELED);
76     napi_property_descriptor exportState[] = {
77         DECLARE_NAPI_PROPERTY("WAITING", waitingState),
78         DECLARE_NAPI_PROPERTY("RUNNING", runningState),
79         DECLARE_NAPI_PROPERTY("CANCELED", canceledState),
80     };
81     napi_define_properties(env, stateObj, sizeof(exportState) / sizeof(exportState[0]), exportState);
82 
83     napi_property_descriptor properties[] = {
84         DECLARE_NAPI_PROPERTY("Task", taskClass),
85         DECLARE_NAPI_PROPERTY("LongTask", longTaskClass),
86         DECLARE_NAPI_PROPERTY("GenericsTask", genericsTaskClass),
87         DECLARE_NAPI_PROPERTY("TaskGroup", taskGroupClass),
88         DECLARE_NAPI_PROPERTY("SequenceRunner", seqRunnerClass),
89         DECLARE_NAPI_PROPERTY("Priority", priorityObj),
90         DECLARE_NAPI_PROPERTY("State", stateObj),
91         DECLARE_NAPI_FUNCTION("execute", Execute),
92         DECLARE_NAPI_FUNCTION("executeDelayed", ExecuteDelayed),
93         DECLARE_NAPI_FUNCTION("cancel", Cancel),
94         DECLARE_NAPI_FUNCTION("getTaskPoolInfo", GetTaskPoolInfo),
95         DECLARE_NAPI_FUNCTION("terminateTask", TerminateTask),
96         DECLARE_NAPI_FUNCTION("isConcurrent", IsConcurrent),
97         DECLARE_NAPI_FUNCTION("executePeriodically", ExecutePeriodically),
98     };
99     napi_define_properties(env, exports, sizeof(properties) / sizeof(properties[0]), properties);
100 
101     TaskManager::GetInstance().InitTaskManager(env);
102     return exports;
103 }
104 
105 // ---------------------------------- SendData ---------------------------------------
ExecuteCallback(const uv_async_t* req)106 void TaskPool::ExecuteCallback(const uv_async_t* req)
107 {
108     auto* msgQueue = TaskManager::GetInstance().GetMessageQueue(req);
109     if (msgQueue == nullptr) {
110         HILOG_ERROR("taskpool:: msgQueue is nullptr");
111         return;
112     }
113     ExecuteCallbackInner(*msgQueue);
114 }
115 
ExecuteCallbackTask(CallbackInfo* callbackInfo)116 void TaskPool::ExecuteCallbackTask(CallbackInfo* callbackInfo)
117 {
118     auto* msgQueue = TaskManager::GetInstance().GetMessageQueueFromCallbackInfo(callbackInfo);
119     if (msgQueue == nullptr) {
120         HILOG_ERROR("taskpool:: msgQueue is nullptr");
121         return;
122     }
123     ExecuteCallbackInner(*msgQueue);
124 }
125 
ExecuteCallbackInner(MsgQueue& msgQueue)126 void TaskPool::ExecuteCallbackInner(MsgQueue& msgQueue)
127 {
128     while (!msgQueue.IsEmpty()) {
129         auto resultInfo = msgQueue.DeQueue();
130         if (resultInfo == nullptr) {
131             HILOG_DEBUG("taskpool:: resultInfo is nullptr");
132             continue;
133         }
134         ObjectScope<TaskResultInfo> resultInfoScope(resultInfo, false);
135         napi_status status = napi_ok;
136         CallbackScope callbackScope(resultInfo->hostEnv, resultInfo->workerEnv, resultInfo->taskId, status);
137         if (status != napi_ok) {
138             HILOG_ERROR("napi_open_handle_scope failed");
139             return;
140         }
141         auto env = resultInfo->hostEnv;
142         auto callbackInfo = TaskManager::GetInstance().GetCallbackInfo(resultInfo->taskId);
143         if (callbackInfo == nullptr) {
144             HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
145             ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
146             continue;
147         }
148         auto func = NapiHelper::GetReferenceValue(env, callbackInfo->callbackRef);
149         napi_value args;
150         napi_value result;
151         status = napi_deserialize(env, resultInfo->serializationArgs, &args);
152         napi_delete_serialization_data(env, resultInfo->serializationArgs);
153         if (status != napi_ok || args == nullptr) {
154             std::string errMessage = "taskpool:: failed to serialize function";
155             HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
156             ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
157             continue;
158         }
159         uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
160         napi_value argsArray[argsNum];
161         for (size_t i = 0; i < argsNum; i++) {
162             argsArray[i] = NapiHelper::GetElement(env, args, i);
163         }
164         napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, &result);
165         if (NapiHelper::IsExceptionPending(env)) {
166             napi_value exception = nullptr;
167             napi_get_and_clear_last_exception(env, &exception);
168             HILOG_ERROR("taskpool:: an exception has occurred in napi_call_function");
169         }
170     }
171 }
172 // ---------------------------------- SendData ---------------------------------------
173 
GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)174 napi_value TaskPool::GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)
175 {
176     napi_value result = nullptr;
177     napi_create_object(env, &result);
178     napi_value threadInfos = TaskManager::GetInstance().GetThreadInfos(env);
179     napi_value taskInfos = TaskManager::GetInstance().GetTaskInfos(env);
180     napi_set_named_property(env, result, "threadInfos", threadInfos);
181     napi_set_named_property(env, result, "taskInfos", taskInfos);
182     return result;
183 }
184 
TerminateTask(napi_env env, napi_callback_info cbinfo)185 napi_value TaskPool::TerminateTask(napi_env env, napi_callback_info cbinfo)
186 {
187     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
188     size_t argc = 1; // 1: long task
189     napi_value args[1];
190     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
191     if (argc < 1) {
192         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be one.");
193         return nullptr;
194     }
195     if (!NapiHelper::IsObject(env, args[0])) {
196         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
197         return nullptr;
198     }
199     napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
200     uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
201     auto task = TaskManager::GetInstance().GetTask(taskId);
202     if (task == nullptr || !task->IsLongTask()) {
203         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be long task.");
204         return nullptr;
205     }
206     TaskManager::GetInstance().TerminateTask(taskId);
207     return nullptr;
208 }
209 
Execute(napi_env env, napi_callback_info cbinfo)210 napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo)
211 {
212     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
213     size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
214     if (argc < 1) {
215         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one.");
216         return nullptr;
217     }
218     napi_value* args = new napi_value[argc];
219     ObjectScope<napi_value> scope(args, true);
220     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
221     napi_valuetype type = napi_undefined;
222     napi_typeof(env, args[0], &type);
223     if (type == napi_object) {
224         uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
225         if (argc > 1) {
226             if (!NapiHelper::IsNumber(env, args[1])) {
227                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be number.");
228                 return nullptr;
229             }
230             priority = NapiHelper::GetUint32Value(env, args[1]);
231             if (priority >= Priority::NUMBER) {
232                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error");
233                 return nullptr;
234             }
235         }
236         if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
237             return ExecuteGroup(env, args[0], static_cast<Priority>(priority));
238         }
239         Task* task = nullptr;
240         napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
241         if (task == nullptr) {
242             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be task.");
243             return nullptr;
244         }
245         if (!task->CanExecute(env)) {
246             return nullptr;
247         }
248         napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::COMMON_TASK,
249                                                       static_cast<Priority>(priority));
250         if (promise == nullptr) {
251             return nullptr;
252         }
253         ExecuteTask(env, task, static_cast<Priority>(priority));
254         return promise;
255     }
256     if (type != napi_function) {
257         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
258             "the type of the first param must be object or function.");
259         return nullptr;
260     }
261     Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::FUNCTION_TASK);
262     if (task == nullptr) {
263         HILOG_ERROR("taskpool:: GenerateFunctionTask failed");
264         return nullptr;
265     }
266     TaskManager::GetInstance().StoreTask(task->taskId_, task);
267     napi_value promise = NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred);
268     ExecuteTask(env, task);
269     return promise;
270 }
271 
DelayTask(uv_timer_t* handle)272 void TaskPool::DelayTask(uv_timer_t* handle)
273 {
274     TaskMessage *taskMessage = static_cast<TaskMessage *>(handle->data);
275     auto task = TaskManager::GetInstance().GetTask(taskMessage->taskId);
276     if (task == nullptr) {
277         HILOG_DEBUG("taskpool:: task is nullptr");
278     } else if (task->taskState_ == ExecuteState::CANCELED) {
279         HILOG_DEBUG("taskpool:: DelayTask task has been canceled");
280         napi_value error = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
281         napi_reject_deferred(task->env_, taskMessage->deferred, error);
282     } else {
283         HILOG_INFO("taskpool:: DelayTask taskId %{public}s", std::to_string(taskMessage->taskId).c_str());
284         TaskManager::GetInstance().IncreaseRefCount(taskMessage->taskId);
285         task->IncreaseRefCount();
286         napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
287         TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, taskMessage->priority);
288         if (taskInfo != nullptr) {
289             taskInfo->deferred = taskMessage->deferred;
290             if (task->taskState_ == ExecuteState::DELAYED || task->taskState_ == ExecuteState::FINISHED) {
291                 task->taskState_ = ExecuteState::WAITING;
292                 TaskManager::GetInstance().EnqueueTaskId(taskMessage->taskId, Priority(taskMessage->priority));
293             }
294         } else {
295             napi_value execption = nullptr;
296             napi_get_and_clear_last_exception(task->env_, &execption);
297             if (execption != nullptr) {
298                 napi_reject_deferred(task->env_, taskMessage->deferred, execption);
299             }
300         }
301     }
302     if (task != nullptr) {
303         std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
304         task->delayedTimers_.erase(handle);
305     }
306     uv_timer_stop(handle);
307     uv_close(reinterpret_cast<uv_handle_t*>(handle), [](uv_handle_t* handle) {
308         delete reinterpret_cast<uv_timer_t*>(handle);
309         handle = nullptr;
310     });
311     delete taskMessage;
312     taskMessage = nullptr;
313 }
314 
ExecuteDelayed(napi_env env, napi_callback_info cbinfo)315 napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo)
316 {
317     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
318     uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
319     int32_t delayTime = 0;
320     Task* task = nullptr;
321     if (!CheckDelayedParams(env, cbinfo, priority, delayTime, task)) {
322         return nullptr;
323     }
324 
325     if (!task->IsExecuted() || task->taskState_ == ExecuteState::CANCELED ||
326         task->taskState_ == ExecuteState::FINISHED) {
327         task->taskState_ = ExecuteState::DELAYED;
328     }
329     task->UpdateTaskType(TaskType::COMMON_TASK);
330 
331     uv_loop_t* loop = NapiHelper::GetLibUV(env);
332     uv_update_time(loop);
333     uv_timer_t* timer = new uv_timer_t;
334     uv_timer_init(loop, timer);
335     TaskMessage *taskMessage = new TaskMessage();
336     taskMessage->priority = static_cast<Priority>(priority);
337     taskMessage->taskId = task->taskId_;
338     napi_value promise = NapiHelper::CreatePromise(env, &taskMessage->deferred);
339     timer->data = taskMessage;
340 
341     std::string strTrace = "ExecuteDelayed: taskId: " + std::to_string(task->taskId_);
342     strTrace += ", priority: " + std::to_string(priority);
343     strTrace += ", delayTime " + std::to_string(delayTime);
344     HITRACE_HELPER_METER_NAME(strTrace);
345     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
346 
347     uv_timer_start(timer, reinterpret_cast<uv_timer_cb>(DelayTask), delayTime, 0);
348     {
349         std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
350         task->delayedTimers_.insert(timer);
351     }
352     NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
353     if (engine->IsMainThread()) {
354         uv_async_send(&loop->wq_async);
355     } else {
356         uv_work_t *work = new uv_work_t;
357         uv_queue_work_with_qos(loop, work, [](uv_work_t *) {},
358                                [](uv_work_t *work, int32_t) {delete work; }, uv_qos_user_initiated);
359     }
360     return promise;
361 }
362 
ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority)363 napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority)
364 {
365     napi_value napiGroupId = NapiHelper::GetNameProperty(env, napiTaskGroup, GROUP_ID_STR);
366     uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
367     HILOG_INFO("taskpool::ExecuteGroup groupId %{public}s", std::to_string(groupId).c_str());
368     auto taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
369     napi_reference_ref(env, taskGroup->groupRef_, nullptr);
370     if (taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED ||
371         taskGroup->groupState_ == ExecuteState::CANCELED) {
372         taskGroup->groupState_ = ExecuteState::WAITING;
373     }
374     GroupInfo* groupInfo = new GroupInfo();
375     groupInfo->priority = priority;
376     napi_value resArr;
377     napi_create_array_with_length(env, taskGroup->taskIds_.size(), &resArr);
378     napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1);
379     groupInfo->resArr = arrRef;
380     napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred);
381     {
382         std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
383         if (taskGroup->currentGroupInfo_ == nullptr) {
384             taskGroup->currentGroupInfo_ = groupInfo;
385             for (auto iter = taskGroup->taskRefs_.begin(); iter != taskGroup->taskRefs_.end(); iter++) {
386                 napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
387                 Task* task = nullptr;
388                 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
389                 if (task == nullptr) {
390                     HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
391                     return nullptr;
392                 }
393                 napi_reference_ref(env, task->taskRef_, nullptr);
394                 if (task->IsGroupCommonTask()) {
395                     task->GetTaskInfo(env, napiTask, static_cast<Priority>(priority));
396                 }
397                 ExecuteTask(env, task, static_cast<Priority>(priority));
398             }
399         } else {
400             taskGroup->pendingGroupInfos_.push_back(groupInfo);
401         }
402     }
403     return promise;
404 }
405 
HandleTaskResult(const uv_async_t* req)406 void TaskPool::HandleTaskResult(const uv_async_t* req)
407 {
408     HILOG_DEBUG("taskpool:: HandleTaskResult task");
409     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
410     auto task = static_cast<Task*>(req->data);
411     if (task == nullptr) { // LCOV_EXCL_BR_LINE
412         HILOG_FATAL("taskpool:: HandleTaskResult task is null");
413         return;
414     }
415     if (!task->IsMainThreadTask()) {
416         if (task->ShouldDeleteTask(false)) {
417             delete task;
418             return;
419         }
420         if (task->IsFunctionTask()) {
421             napi_remove_env_cleanup_hook(task->env_, Task::CleanupHookFunc, task);
422         }
423     }
424     task->DecreaseTaskRefCount();
425     HandleTaskResultCallback(task);
426 }
427 
HandleTaskResultCallback(Task* task)428 void TaskPool::HandleTaskResultCallback(Task* task)
429 {
430     napi_handle_scope scope = nullptr;
431     NAPI_CALL_RETURN_VOID(task->env_, napi_open_handle_scope(task->env_, &scope));
432     napi_value napiTaskResult = nullptr;
433     napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult);
434     napi_delete_serialization_data(task->env_, task->result_);
435 
436     // tag for trace parse: Task PerformTask End
437     std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_);
438     if (task->taskState_ == ExecuteState::CANCELED) {
439         strTrace += ", performResult : IsCanceled";
440         napiTaskResult = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
441     } else if (status != napi_ok) {
442         HILOG_ERROR("taskpool: failed to deserialize result");
443         strTrace += ", performResult : DeserializeFailed";
444     } else if (task->success_) {
445         strTrace += ", performResult : Successful";
446     } else {
447         strTrace += ", performResult : Unsuccessful";
448     }
449     HITRACE_HELPER_METER_NAME(strTrace);
450     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
451     if (napiTaskResult == nullptr) {
452         napi_get_undefined(task->env_, &napiTaskResult);
453     }
454     reinterpret_cast<NativeEngine*>(task->env_)->DecreaseSubEnvCounter();
455     bool success = ((status == napi_ok) && (task->taskState_ != ExecuteState::CANCELED)) && (task->success_);
456     task->taskState_ = ExecuteState::ENDING;
457     if (task->IsGroupTask()) {
458         UpdateGroupInfoByResult(task->env_, task, napiTaskResult, success);
459     } else if (!task->IsPeriodicTask()) {
460         if (success) {
461             napi_resolve_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
462             if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
463                 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
464             }
465         } else {
466             napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
467             if (task->onExecutionFailedCallBackInfo_ != nullptr) {
468                 task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult;
469                 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
470             }
471         }
472     }
473     NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope));
474     TriggerTask(task);
475 }
476 
TriggerTask(Task* task)477 void TaskPool::TriggerTask(Task* task)
478 {
479     HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str());
480     if (task->IsGroupTask()) {
481         return;
482     }
483     TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
484     task->taskState_ = ExecuteState::FINISHED;
485     // seqRunnerTask will trigger the next
486     if (task->IsSeqRunnerTask()) {
487         if (!TaskGroupManager::GetInstance().TriggerSeqRunner(task->env_, task)) {
488             HILOG_ERROR("seqRunner:: task %{public}s trigger in seqRunner %{public}s failed",
489                         std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str());
490         }
491     } else if (task->IsCommonTask()) {
492         task->NotifyPendingTask();
493     }
494     if (task->IsPeriodicTask()) {
495         return;
496     }
497     if (!task->IsFunctionTask()) {
498         napi_reference_unref(task->env_, task->taskRef_, nullptr);
499         return;
500     }
501     TaskManager::GetInstance().RemoveTask(task->taskId_);
502     delete task;
503 }
504 
UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success)505 void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success)
506 {
507     HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str());
508     TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
509     task->taskState_ = ExecuteState::FINISHED;
510     napi_reference_unref(env, task->taskRef_, nullptr);
511     if (task->IsGroupCommonTask()) {
512         delete task->currentTaskInfo_;
513         task->currentTaskInfo_ = nullptr;
514     }
515     TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
516     if (taskGroup == nullptr) {
517         HILOG_DEBUG("taskpool:: taskGroup has been released");
518         return;
519     }
520     if (taskGroup->currentGroupInfo_ == nullptr) {
521         HILOG_DEBUG("taskpool:: taskGroup has been canceled");
522         return;
523     }
524     uint32_t index = taskGroup->GetTaskIndex(task->taskId_);
525     auto groupInfo = taskGroup->currentGroupInfo_;
526     if (success) {
527         // Update res at resArr
528         napi_ref arrRef = groupInfo->resArr;
529         napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef);
530         napi_set_element(env, resArr, index, res);
531 
532         groupInfo->finishedTask++;
533         if (groupInfo->finishedTask < taskGroup->taskNum_) {
534             return;
535         }
536         HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str());
537         napi_resolve_deferred(env, groupInfo->deferred, resArr);
538         for (uint64_t taskId : taskGroup->taskIds_) {
539             auto task = TaskManager::GetInstance().GetTask(taskId);
540             if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
541                 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
542             }
543         }
544     } else {
545         napi_reject_deferred(env, groupInfo->deferred, res);
546         if (task->onExecutionFailedCallBackInfo_ != nullptr) {
547             task->onExecutionFailedCallBackInfo_->taskError_ = res;
548             task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
549         }
550     }
551     taskGroup->groupState_ = ExecuteState::FINISHED;
552     napi_delete_reference(env, groupInfo->resArr);
553     napi_reference_unref(env, taskGroup->groupRef_, nullptr);
554     delete groupInfo;
555     taskGroup->currentGroupInfo_ = nullptr;
556     taskGroup->NotifyGroupTask(env);
557 }
558 
ExecuteTask(napi_env env, Task* task, Priority priority)559 void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority)
560 {
561     // tag for trace parse: Task Allocation
562     std::string strTrace = "Task Allocation: taskId : " + std::to_string(task->taskId_)
563         + ", priority : " + std::to_string(priority)
564         + ", executeState : " + std::to_string(ExecuteState::WAITING);
565     HITRACE_HELPER_METER_NAME(strTrace);
566     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
567     task->IncreaseRefCount();
568     TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
569     if (task->IsFunctionTask() || (task->taskState_ != ExecuteState::WAITING &&
570         task->taskState_ != ExecuteState::RUNNING && task->taskState_ != ExecuteState::ENDING)) {
571         task->taskState_ = ExecuteState::WAITING;
572         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
573     }
574 }
575 
Cancel(napi_env env, napi_callback_info cbinfo)576 napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo)
577 {
578     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
579     size_t argc = 1;
580     napi_value args[1];
581     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
582     if (argc < 1) {
583         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
584         return nullptr;
585     }
586 
587     if (!NapiHelper::IsObject(env, args[0])) {
588         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
589         return nullptr;
590     }
591 
592     if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
593         napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
594         if (napiTaskId == nullptr) {
595             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task.");
596             return nullptr;
597         }
598         uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
599         TaskManager::GetInstance().CancelTask(env, taskId);
600     } else {
601         napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR);
602         if (napiGroupId == nullptr) {
603             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be taskGroup.");
604             return nullptr;
605         }
606         uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
607         TaskGroupManager::GetInstance().CancelGroup(env, groupId);
608     }
609     return nullptr;
610 }
611 
IsConcurrent(napi_env env, napi_callback_info cbinfo)612 napi_value TaskPool::IsConcurrent(napi_env env, napi_callback_info cbinfo)
613 {
614     size_t argc = 1;
615     napi_value args[1];
616     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
617     if (argc != 1) {
618         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
619         return nullptr;
620     }
621 
622     if (!NapiHelper::IsFunction(env, args[0])) {
623         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be function.");
624         return nullptr;
625     }
626 
627     bool isConcurrent = NapiHelper::IsConcurrentFunction(env, args[0]);
628     return NapiHelper::CreateBooleanValue(env, isConcurrent);
629 }
630 
PeriodicTaskCallback(uv_timer_t* handle)631 void TaskPool::PeriodicTaskCallback(uv_timer_t* handle)
632 {
633     Task* task = reinterpret_cast<Task*>(handle->data);
634     if (task == nullptr) {
635         HILOG_DEBUG("taskpool:: the task is nullptr");
636         return;
637     } else if (!task->IsPeriodicTask()) {
638         HILOG_DEBUG("taskpool:: the current task is not a periodic task");
639         return;
640     } else if (task->taskState_ == ExecuteState::CANCELED) {
641         HILOG_DEBUG("taskpool:: the periodic task has been canceled");
642         return;
643     }
644     TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
645 
646     if (!task->isFirstTaskInfo_) {
647         napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
648         TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_);
649         if (taskInfo == nullptr) {
650             HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr");
651             return;
652         }
653     }
654     task->isFirstTaskInfo_ = false;
655 
656     task->IncreaseRefCount();
657     HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str());
658     if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) {
659         task->taskState_ = ExecuteState::WAITING;
660         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->periodicTaskPriority_);
661     }
662 }
663 
ExecutePeriodically(napi_env env, napi_callback_info cbinfo)664 napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo)
665 {
666     int32_t period = 0;
667     uint32_t priority = Priority::DEFAULT;
668     Task* periodicTask = nullptr;
669     if (!CheckPeriodicallyParams(env, cbinfo, period, priority, periodicTask)) {
670         return nullptr;
671     }
672 
673     if (!periodicTask->CanExecutePeriodically(env)) {
674         return nullptr;
675     }
676     periodicTask->UpdatePeriodicTask();
677 
678     periodicTask->periodicTaskPriority_ = static_cast<Priority>(priority);
679     napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_);
680     TaskInfo* taskInfo = periodicTask->GetTaskInfo(env, napiTask, periodicTask->periodicTaskPriority_);
681     if (taskInfo == nullptr) {
682         return nullptr;
683     }
684 
685     periodicTask->isFirstTaskInfo_ = true; // periodic task first Generate TaskInfo
686 
687     TriggerTimer(env, periodicTask, period);
688     return nullptr;
689 }
690 
TriggerTimer(napi_env env, Task* task, int32_t period)691 void TaskPool::TriggerTimer(napi_env env, Task* task, int32_t period)
692 {
693     HILOG_INFO("taskpool::TriggerTimer taskId %{public}s", std::to_string(task->taskId_).c_str());
694     uv_loop_t* loop = NapiHelper::GetLibUV(env);
695     task->timer_ = new uv_timer_t;
696     uv_timer_init(loop, task->timer_);
697     task->timer_->data = task;
698     uv_update_time(loop);
699     uv_timer_start(task->timer_, PeriodicTaskCallback, period, period);
700     NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
701     if (engine->IsMainThread()) {
702         uv_async_send(&loop->wq_async);
703     } else {
704         uv_work_t* work = new uv_work_t;
705         uv_queue_work_with_qos(loop, work, [](uv_work_t*) {},
706                                [](uv_work_t* work, int32_t) { delete work; }, uv_qos_user_initiated);
707     }
708 }
709 
CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime, Task* &task)710 bool TaskPool::CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime,
711                                   Task* &task)
712 {
713     size_t argc = 3; // 3: delayTime, task and priority
714     napi_value args[3]; // 3: delayTime, task and priority
715     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
716     if (argc < 2 || argc > 3) { // 2: delayTime and task 3: delayTime, task and priority
717         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
718         return false;
719     }
720 
721     if (!NapiHelper::IsNumber(env, args[0])) {
722         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
723         return false;
724     }
725 
726     if (!NapiHelper::IsObject(env, args[1])) {
727         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be object.");
728         return false;
729     }
730 
731     delayTime = NapiHelper::GetInt32Value(env, args[0]);
732     if (delayTime < 0) {
733         ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The delayTime is less than zero");
734         return false;
735     }
736 
737     if (argc > 2) { // 2: the params might have priority
738         if (!NapiHelper::IsNumber(env, args[2])) {
739             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the third param must be number.");
740             return false;
741         }
742         priority = NapiHelper::GetUint32Value(env, args[2]); // 2: get task priority
743         if (priority >= Priority::NUMBER) {
744             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error.");
745             return false;
746         }
747     }
748 
749     napi_unwrap(env, args[1], reinterpret_cast<void**>(&task));
750     if (task == nullptr) {
751         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of second param must be task");
752         return false;
753     }
754     if (!task->CanExecuteDelayed(env)) {
755         return false;
756     }
757     return true;
758 }
759 
CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period, uint32_t &priority, Task* &periodicTask)760 bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period,
761                                        uint32_t &priority, Task* &periodicTask)
762 {
763     size_t argc = 3; // 3 : period, task, priority
764     napi_value args[3]; // 3 : period, task, priority
765     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
766     if (argc < 2 || argc > 3) { // 2 : period, task and 3 : period, task, priority
767         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
768         return false;
769     }
770     if (!NapiHelper::IsNumber(env, args[0])) {
771         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
772         return false;
773     }
774     period = NapiHelper::GetInt32Value(env, args[0]);
775     if (period < 0) {
776         ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The period value is less than zero.");
777         return false;
778     }
779     if (!NapiHelper::IsObject(env, args[1])) {
780         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
781         return false;
782     }
783 
784     if (argc >= 3) { // 3 : third param maybe priority
785         if (!NapiHelper::IsNumber(env, args[2])) { // 2 : priority
786             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the third param must be priority.");
787             return false;
788         }
789         priority = NapiHelper::GetUint32Value(env, args[2]); // 2 : priority
790         if (priority >= Priority::NUMBER) {
791             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the value of the priority is invalid.");
792             return false;
793         }
794     }
795 
796     napi_unwrap(env, args[1], reinterpret_cast<void**>(&periodicTask));
797     if (periodicTask == nullptr) {
798         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
799         return false;
800     }
801 
802     return true;
803 }
804 } // namespace Commonlibrary::Concurrent::TaskPoolModule
805