1/*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "taskpool.h"
17
18#include <cinttypes>
19
20#include "helper/error_helper.h"
21#include "helper/hitrace_helper.h"
22#include "helper/napi_helper.h"
23#include "helper/object_helper.h"
24#include "message_queue.h"
25#include "task_manager.h"
26#include "tools/log.h"
27#include "worker.h"
28
29namespace Commonlibrary::Concurrent::TaskPoolModule {
30using namespace Commonlibrary::Concurrent::Common::Helper;
31
32napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports)
33{
34    HILOG_INFO("taskpool:: Import taskpool");
35    HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
36    napi_value taskClass = nullptr;
37    napi_define_class(env, "Task", NAPI_AUTO_LENGTH, Task::TaskConstructor, nullptr, 0, nullptr, &taskClass);
38    napi_value longTaskClass = nullptr;
39    napi_define_class(env, "LongTask", NAPI_AUTO_LENGTH, Task::LongTaskConstructor,
40                      nullptr, 0, nullptr, &longTaskClass);
41    napi_value genericsTaskClass = nullptr;
42    napi_define_class(env, "GenericsTask", NAPI_AUTO_LENGTH, Task::TaskConstructor,
43                      nullptr, 0, nullptr, &genericsTaskClass);
44    napi_value isCanceledFunc = nullptr;
45    napi_create_function(env, "isCanceled", NAPI_AUTO_LENGTH, Task::IsCanceled, NULL, &isCanceledFunc);
46    napi_set_named_property(env, taskClass, "isCanceled", isCanceledFunc);
47    napi_value sendDataFunc = nullptr;
48    napi_create_function(env, "sendData", NAPI_AUTO_LENGTH, Task::SendData, NULL, &sendDataFunc);
49    napi_set_named_property(env, taskClass, "sendData", sendDataFunc);
50    napi_value taskGroupClass = nullptr;
51    napi_define_class(env, "TaskGroup", NAPI_AUTO_LENGTH, TaskGroup::TaskGroupConstructor, nullptr, 0, nullptr,
52                      &taskGroupClass);
53    napi_value seqRunnerClass = nullptr;
54    napi_define_class(env, "SequenceRunner", NAPI_AUTO_LENGTH, SequenceRunner::SeqRunnerConstructor,
55                      nullptr, 0, nullptr, &seqRunnerClass);
56
57    // define priority
58    napi_value priorityObj = NapiHelper::CreateObject(env);
59    napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH);
60    napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM);
61    napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW);
62    napi_value idlePriority = NapiHelper::CreateUint32(env, Priority::IDLE);
63    napi_property_descriptor exportPriority[] = {
64        DECLARE_NAPI_PROPERTY("HIGH", highPriority),
65        DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority),
66        DECLARE_NAPI_PROPERTY("LOW", lowPriority),
67        DECLARE_NAPI_PROPERTY("IDLE", idlePriority),
68    };
69    napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority);
70
71    // define State
72    napi_value stateObj = NapiHelper::CreateObject(env);
73    napi_value waitingState = NapiHelper::CreateUint32(env, ExecuteState::WAITING);
74    napi_value runningState = NapiHelper::CreateUint32(env, ExecuteState::RUNNING);
75    napi_value canceledState = NapiHelper::CreateUint32(env, ExecuteState::CANCELED);
76    napi_property_descriptor exportState[] = {
77        DECLARE_NAPI_PROPERTY("WAITING", waitingState),
78        DECLARE_NAPI_PROPERTY("RUNNING", runningState),
79        DECLARE_NAPI_PROPERTY("CANCELED", canceledState),
80    };
81    napi_define_properties(env, stateObj, sizeof(exportState) / sizeof(exportState[0]), exportState);
82
83    napi_property_descriptor properties[] = {
84        DECLARE_NAPI_PROPERTY("Task", taskClass),
85        DECLARE_NAPI_PROPERTY("LongTask", longTaskClass),
86        DECLARE_NAPI_PROPERTY("GenericsTask", genericsTaskClass),
87        DECLARE_NAPI_PROPERTY("TaskGroup", taskGroupClass),
88        DECLARE_NAPI_PROPERTY("SequenceRunner", seqRunnerClass),
89        DECLARE_NAPI_PROPERTY("Priority", priorityObj),
90        DECLARE_NAPI_PROPERTY("State", stateObj),
91        DECLARE_NAPI_FUNCTION("execute", Execute),
92        DECLARE_NAPI_FUNCTION("executeDelayed", ExecuteDelayed),
93        DECLARE_NAPI_FUNCTION("cancel", Cancel),
94        DECLARE_NAPI_FUNCTION("getTaskPoolInfo", GetTaskPoolInfo),
95        DECLARE_NAPI_FUNCTION("terminateTask", TerminateTask),
96        DECLARE_NAPI_FUNCTION("isConcurrent", IsConcurrent),
97        DECLARE_NAPI_FUNCTION("executePeriodically", ExecutePeriodically),
98    };
99    napi_define_properties(env, exports, sizeof(properties) / sizeof(properties[0]), properties);
100
101    TaskManager::GetInstance().InitTaskManager(env);
102    return exports;
103}
104
105// ---------------------------------- SendData ---------------------------------------
106void TaskPool::ExecuteCallback(const uv_async_t* req)
107{
108    auto* msgQueue = TaskManager::GetInstance().GetMessageQueue(req);
109    if (msgQueue == nullptr) {
110        HILOG_ERROR("taskpool:: msgQueue is nullptr");
111        return;
112    }
113    ExecuteCallbackInner(*msgQueue);
114}
115
116void TaskPool::ExecuteCallbackTask(CallbackInfo* callbackInfo)
117{
118    auto* msgQueue = TaskManager::GetInstance().GetMessageQueueFromCallbackInfo(callbackInfo);
119    if (msgQueue == nullptr) {
120        HILOG_ERROR("taskpool:: msgQueue is nullptr");
121        return;
122    }
123    ExecuteCallbackInner(*msgQueue);
124}
125
126void TaskPool::ExecuteCallbackInner(MsgQueue& msgQueue)
127{
128    while (!msgQueue.IsEmpty()) {
129        auto resultInfo = msgQueue.DeQueue();
130        if (resultInfo == nullptr) {
131            HILOG_DEBUG("taskpool:: resultInfo is nullptr");
132            continue;
133        }
134        ObjectScope<TaskResultInfo> resultInfoScope(resultInfo, false);
135        napi_status status = napi_ok;
136        CallbackScope callbackScope(resultInfo->hostEnv, resultInfo->workerEnv, resultInfo->taskId, status);
137        if (status != napi_ok) {
138            HILOG_ERROR("napi_open_handle_scope failed");
139            return;
140        }
141        auto env = resultInfo->hostEnv;
142        auto callbackInfo = TaskManager::GetInstance().GetCallbackInfo(resultInfo->taskId);
143        if (callbackInfo == nullptr) {
144            HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
145            ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
146            continue;
147        }
148        auto func = NapiHelper::GetReferenceValue(env, callbackInfo->callbackRef);
149        napi_value args;
150        napi_value result;
151        status = napi_deserialize(env, resultInfo->serializationArgs, &args);
152        napi_delete_serialization_data(env, resultInfo->serializationArgs);
153        if (status != napi_ok || args == nullptr) {
154            std::string errMessage = "taskpool:: failed to serialize function";
155            HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
156            ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
157            continue;
158        }
159        uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
160        napi_value argsArray[argsNum];
161        for (size_t i = 0; i < argsNum; i++) {
162            argsArray[i] = NapiHelper::GetElement(env, args, i);
163        }
164        napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, &result);
165        if (NapiHelper::IsExceptionPending(env)) {
166            napi_value exception = nullptr;
167            napi_get_and_clear_last_exception(env, &exception);
168            HILOG_ERROR("taskpool:: an exception has occurred in napi_call_function");
169        }
170    }
171}
172// ---------------------------------- SendData ---------------------------------------
173
174napi_value TaskPool::GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)
175{
176    napi_value result = nullptr;
177    napi_create_object(env, &result);
178    napi_value threadInfos = TaskManager::GetInstance().GetThreadInfos(env);
179    napi_value taskInfos = TaskManager::GetInstance().GetTaskInfos(env);
180    napi_set_named_property(env, result, "threadInfos", threadInfos);
181    napi_set_named_property(env, result, "taskInfos", taskInfos);
182    return result;
183}
184
185napi_value TaskPool::TerminateTask(napi_env env, napi_callback_info cbinfo)
186{
187    HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
188    size_t argc = 1; // 1: long task
189    napi_value args[1];
190    napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
191    if (argc < 1) {
192        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be one.");
193        return nullptr;
194    }
195    if (!NapiHelper::IsObject(env, args[0])) {
196        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
197        return nullptr;
198    }
199    napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
200    uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
201    auto task = TaskManager::GetInstance().GetTask(taskId);
202    if (task == nullptr || !task->IsLongTask()) {
203        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be long task.");
204        return nullptr;
205    }
206    TaskManager::GetInstance().TerminateTask(taskId);
207    return nullptr;
208}
209
210napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo)
211{
212    HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
213    size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
214    if (argc < 1) {
215        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one.");
216        return nullptr;
217    }
218    napi_value* args = new napi_value[argc];
219    ObjectScope<napi_value> scope(args, true);
220    napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
221    napi_valuetype type = napi_undefined;
222    napi_typeof(env, args[0], &type);
223    if (type == napi_object) {
224        uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
225        if (argc > 1) {
226            if (!NapiHelper::IsNumber(env, args[1])) {
227                ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be number.");
228                return nullptr;
229            }
230            priority = NapiHelper::GetUint32Value(env, args[1]);
231            if (priority >= Priority::NUMBER) {
232                ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error");
233                return nullptr;
234            }
235        }
236        if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
237            return ExecuteGroup(env, args[0], static_cast<Priority>(priority));
238        }
239        Task* task = nullptr;
240        napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
241        if (task == nullptr) {
242            ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be task.");
243            return nullptr;
244        }
245        if (!task->CanExecute(env)) {
246            return nullptr;
247        }
248        napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::COMMON_TASK,
249                                                      static_cast<Priority>(priority));
250        if (promise == nullptr) {
251            return nullptr;
252        }
253        ExecuteTask(env, task, static_cast<Priority>(priority));
254        return promise;
255    }
256    if (type != napi_function) {
257        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
258            "the type of the first param must be object or function.");
259        return nullptr;
260    }
261    Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::FUNCTION_TASK);
262    if (task == nullptr) {
263        HILOG_ERROR("taskpool:: GenerateFunctionTask failed");
264        return nullptr;
265    }
266    TaskManager::GetInstance().StoreTask(task->taskId_, task);
267    napi_value promise = NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred);
268    ExecuteTask(env, task);
269    return promise;
270}
271
272void TaskPool::DelayTask(uv_timer_t* handle)
273{
274    TaskMessage *taskMessage = static_cast<TaskMessage *>(handle->data);
275    auto task = TaskManager::GetInstance().GetTask(taskMessage->taskId);
276    if (task == nullptr) {
277        HILOG_DEBUG("taskpool:: task is nullptr");
278    } else if (task->taskState_ == ExecuteState::CANCELED) {
279        HILOG_DEBUG("taskpool:: DelayTask task has been canceled");
280        napi_value error = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
281        napi_reject_deferred(task->env_, taskMessage->deferred, error);
282    } else {
283        HILOG_INFO("taskpool:: DelayTask taskId %{public}s", std::to_string(taskMessage->taskId).c_str());
284        TaskManager::GetInstance().IncreaseRefCount(taskMessage->taskId);
285        task->IncreaseRefCount();
286        napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
287        TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, taskMessage->priority);
288        if (taskInfo != nullptr) {
289            taskInfo->deferred = taskMessage->deferred;
290            if (task->taskState_ == ExecuteState::DELAYED || task->taskState_ == ExecuteState::FINISHED) {
291                task->taskState_ = ExecuteState::WAITING;
292                TaskManager::GetInstance().EnqueueTaskId(taskMessage->taskId, Priority(taskMessage->priority));
293            }
294        } else {
295            napi_value execption = nullptr;
296            napi_get_and_clear_last_exception(task->env_, &execption);
297            if (execption != nullptr) {
298                napi_reject_deferred(task->env_, taskMessage->deferred, execption);
299            }
300        }
301    }
302    if (task != nullptr) {
303        std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
304        task->delayedTimers_.erase(handle);
305    }
306    uv_timer_stop(handle);
307    uv_close(reinterpret_cast<uv_handle_t*>(handle), [](uv_handle_t* handle) {
308        delete reinterpret_cast<uv_timer_t*>(handle);
309        handle = nullptr;
310    });
311    delete taskMessage;
312    taskMessage = nullptr;
313}
314
315napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo)
316{
317    HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
318    uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
319    int32_t delayTime = 0;
320    Task* task = nullptr;
321    if (!CheckDelayedParams(env, cbinfo, priority, delayTime, task)) {
322        return nullptr;
323    }
324
325    if (!task->IsExecuted() || task->taskState_ == ExecuteState::CANCELED ||
326        task->taskState_ == ExecuteState::FINISHED) {
327        task->taskState_ = ExecuteState::DELAYED;
328    }
329    task->UpdateTaskType(TaskType::COMMON_TASK);
330
331    uv_loop_t* loop = NapiHelper::GetLibUV(env);
332    uv_update_time(loop);
333    uv_timer_t* timer = new uv_timer_t;
334    uv_timer_init(loop, timer);
335    TaskMessage *taskMessage = new TaskMessage();
336    taskMessage->priority = static_cast<Priority>(priority);
337    taskMessage->taskId = task->taskId_;
338    napi_value promise = NapiHelper::CreatePromise(env, &taskMessage->deferred);
339    timer->data = taskMessage;
340
341    std::string strTrace = "ExecuteDelayed: taskId: " + std::to_string(task->taskId_);
342    strTrace += ", priority: " + std::to_string(priority);
343    strTrace += ", delayTime " + std::to_string(delayTime);
344    HITRACE_HELPER_METER_NAME(strTrace);
345    HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
346
347    uv_timer_start(timer, reinterpret_cast<uv_timer_cb>(DelayTask), delayTime, 0);
348    {
349        std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
350        task->delayedTimers_.insert(timer);
351    }
352    NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
353    if (engine->IsMainThread()) {
354        uv_async_send(&loop->wq_async);
355    } else {
356        uv_work_t *work = new uv_work_t;
357        uv_queue_work_with_qos(loop, work, [](uv_work_t *) {},
358                               [](uv_work_t *work, int32_t) {delete work; }, uv_qos_user_initiated);
359    }
360    return promise;
361}
362
363napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority)
364{
365    napi_value napiGroupId = NapiHelper::GetNameProperty(env, napiTaskGroup, GROUP_ID_STR);
366    uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
367    HILOG_INFO("taskpool::ExecuteGroup groupId %{public}s", std::to_string(groupId).c_str());
368    auto taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
369    napi_reference_ref(env, taskGroup->groupRef_, nullptr);
370    if (taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED ||
371        taskGroup->groupState_ == ExecuteState::CANCELED) {
372        taskGroup->groupState_ = ExecuteState::WAITING;
373    }
374    GroupInfo* groupInfo = new GroupInfo();
375    groupInfo->priority = priority;
376    napi_value resArr;
377    napi_create_array_with_length(env, taskGroup->taskIds_.size(), &resArr);
378    napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1);
379    groupInfo->resArr = arrRef;
380    napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred);
381    {
382        std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
383        if (taskGroup->currentGroupInfo_ == nullptr) {
384            taskGroup->currentGroupInfo_ = groupInfo;
385            for (auto iter = taskGroup->taskRefs_.begin(); iter != taskGroup->taskRefs_.end(); iter++) {
386                napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
387                Task* task = nullptr;
388                napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
389                if (task == nullptr) {
390                    HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
391                    return nullptr;
392                }
393                napi_reference_ref(env, task->taskRef_, nullptr);
394                if (task->IsGroupCommonTask()) {
395                    task->GetTaskInfo(env, napiTask, static_cast<Priority>(priority));
396                }
397                ExecuteTask(env, task, static_cast<Priority>(priority));
398            }
399        } else {
400            taskGroup->pendingGroupInfos_.push_back(groupInfo);
401        }
402    }
403    return promise;
404}
405
406void TaskPool::HandleTaskResult(const uv_async_t* req)
407{
408    HILOG_DEBUG("taskpool:: HandleTaskResult task");
409    HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
410    auto task = static_cast<Task*>(req->data);
411    if (task == nullptr) { // LCOV_EXCL_BR_LINE
412        HILOG_FATAL("taskpool:: HandleTaskResult task is null");
413        return;
414    }
415    if (!task->IsMainThreadTask()) {
416        if (task->ShouldDeleteTask(false)) {
417            delete task;
418            return;
419        }
420        if (task->IsFunctionTask()) {
421            napi_remove_env_cleanup_hook(task->env_, Task::CleanupHookFunc, task);
422        }
423    }
424    task->DecreaseTaskRefCount();
425    HandleTaskResultCallback(task);
426}
427
428void TaskPool::HandleTaskResultCallback(Task* task)
429{
430    napi_handle_scope scope = nullptr;
431    NAPI_CALL_RETURN_VOID(task->env_, napi_open_handle_scope(task->env_, &scope));
432    napi_value napiTaskResult = nullptr;
433    napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult);
434    napi_delete_serialization_data(task->env_, task->result_);
435
436    // tag for trace parse: Task PerformTask End
437    std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_);
438    if (task->taskState_ == ExecuteState::CANCELED) {
439        strTrace += ", performResult : IsCanceled";
440        napiTaskResult = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
441    } else if (status != napi_ok) {
442        HILOG_ERROR("taskpool: failed to deserialize result");
443        strTrace += ", performResult : DeserializeFailed";
444    } else if (task->success_) {
445        strTrace += ", performResult : Successful";
446    } else {
447        strTrace += ", performResult : Unsuccessful";
448    }
449    HITRACE_HELPER_METER_NAME(strTrace);
450    HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
451    if (napiTaskResult == nullptr) {
452        napi_get_undefined(task->env_, &napiTaskResult);
453    }
454    reinterpret_cast<NativeEngine*>(task->env_)->DecreaseSubEnvCounter();
455    bool success = ((status == napi_ok) && (task->taskState_ != ExecuteState::CANCELED)) && (task->success_);
456    task->taskState_ = ExecuteState::ENDING;
457    if (task->IsGroupTask()) {
458        UpdateGroupInfoByResult(task->env_, task, napiTaskResult, success);
459    } else if (!task->IsPeriodicTask()) {
460        if (success) {
461            napi_resolve_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
462            if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
463                task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
464            }
465        } else {
466            napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
467            if (task->onExecutionFailedCallBackInfo_ != nullptr) {
468                task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult;
469                task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
470            }
471        }
472    }
473    NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope));
474    TriggerTask(task);
475}
476
477void TaskPool::TriggerTask(Task* task)
478{
479    HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str());
480    if (task->IsGroupTask()) {
481        return;
482    }
483    TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
484    task->taskState_ = ExecuteState::FINISHED;
485    // seqRunnerTask will trigger the next
486    if (task->IsSeqRunnerTask()) {
487        if (!TaskGroupManager::GetInstance().TriggerSeqRunner(task->env_, task)) {
488            HILOG_ERROR("seqRunner:: task %{public}s trigger in seqRunner %{public}s failed",
489                        std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str());
490        }
491    } else if (task->IsCommonTask()) {
492        task->NotifyPendingTask();
493    }
494    if (task->IsPeriodicTask()) {
495        return;
496    }
497    if (!task->IsFunctionTask()) {
498        napi_reference_unref(task->env_, task->taskRef_, nullptr);
499        return;
500    }
501    TaskManager::GetInstance().RemoveTask(task->taskId_);
502    delete task;
503}
504
505void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success)
506{
507    HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str());
508    TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
509    task->taskState_ = ExecuteState::FINISHED;
510    napi_reference_unref(env, task->taskRef_, nullptr);
511    if (task->IsGroupCommonTask()) {
512        delete task->currentTaskInfo_;
513        task->currentTaskInfo_ = nullptr;
514    }
515    TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
516    if (taskGroup == nullptr) {
517        HILOG_DEBUG("taskpool:: taskGroup has been released");
518        return;
519    }
520    if (taskGroup->currentGroupInfo_ == nullptr) {
521        HILOG_DEBUG("taskpool:: taskGroup has been canceled");
522        return;
523    }
524    uint32_t index = taskGroup->GetTaskIndex(task->taskId_);
525    auto groupInfo = taskGroup->currentGroupInfo_;
526    if (success) {
527        // Update res at resArr
528        napi_ref arrRef = groupInfo->resArr;
529        napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef);
530        napi_set_element(env, resArr, index, res);
531
532        groupInfo->finishedTask++;
533        if (groupInfo->finishedTask < taskGroup->taskNum_) {
534            return;
535        }
536        HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str());
537        napi_resolve_deferred(env, groupInfo->deferred, resArr);
538        for (uint64_t taskId : taskGroup->taskIds_) {
539            auto task = TaskManager::GetInstance().GetTask(taskId);
540            if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
541                task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
542            }
543        }
544    } else {
545        napi_reject_deferred(env, groupInfo->deferred, res);
546        if (task->onExecutionFailedCallBackInfo_ != nullptr) {
547            task->onExecutionFailedCallBackInfo_->taskError_ = res;
548            task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
549        }
550    }
551    taskGroup->groupState_ = ExecuteState::FINISHED;
552    napi_delete_reference(env, groupInfo->resArr);
553    napi_reference_unref(env, taskGroup->groupRef_, nullptr);
554    delete groupInfo;
555    taskGroup->currentGroupInfo_ = nullptr;
556    taskGroup->NotifyGroupTask(env);
557}
558
559void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority)
560{
561    // tag for trace parse: Task Allocation
562    std::string strTrace = "Task Allocation: taskId : " + std::to_string(task->taskId_)
563        + ", priority : " + std::to_string(priority)
564        + ", executeState : " + std::to_string(ExecuteState::WAITING);
565    HITRACE_HELPER_METER_NAME(strTrace);
566    HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
567    task->IncreaseRefCount();
568    TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
569    if (task->IsFunctionTask() || (task->taskState_ != ExecuteState::WAITING &&
570        task->taskState_ != ExecuteState::RUNNING && task->taskState_ != ExecuteState::ENDING)) {
571        task->taskState_ = ExecuteState::WAITING;
572        TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
573    }
574}
575
576napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo)
577{
578    HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
579    size_t argc = 1;
580    napi_value args[1];
581    napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
582    if (argc < 1) {
583        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
584        return nullptr;
585    }
586
587    if (!NapiHelper::IsObject(env, args[0])) {
588        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
589        return nullptr;
590    }
591
592    if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
593        napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
594        if (napiTaskId == nullptr) {
595            ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task.");
596            return nullptr;
597        }
598        uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
599        TaskManager::GetInstance().CancelTask(env, taskId);
600    } else {
601        napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR);
602        if (napiGroupId == nullptr) {
603            ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be taskGroup.");
604            return nullptr;
605        }
606        uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
607        TaskGroupManager::GetInstance().CancelGroup(env, groupId);
608    }
609    return nullptr;
610}
611
612napi_value TaskPool::IsConcurrent(napi_env env, napi_callback_info cbinfo)
613{
614    size_t argc = 1;
615    napi_value args[1];
616    napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
617    if (argc != 1) {
618        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
619        return nullptr;
620    }
621
622    if (!NapiHelper::IsFunction(env, args[0])) {
623        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be function.");
624        return nullptr;
625    }
626
627    bool isConcurrent = NapiHelper::IsConcurrentFunction(env, args[0]);
628    return NapiHelper::CreateBooleanValue(env, isConcurrent);
629}
630
631void TaskPool::PeriodicTaskCallback(uv_timer_t* handle)
632{
633    Task* task = reinterpret_cast<Task*>(handle->data);
634    if (task == nullptr) {
635        HILOG_DEBUG("taskpool:: the task is nullptr");
636        return;
637    } else if (!task->IsPeriodicTask()) {
638        HILOG_DEBUG("taskpool:: the current task is not a periodic task");
639        return;
640    } else if (task->taskState_ == ExecuteState::CANCELED) {
641        HILOG_DEBUG("taskpool:: the periodic task has been canceled");
642        return;
643    }
644    TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
645
646    if (!task->isFirstTaskInfo_) {
647        napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
648        TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_);
649        if (taskInfo == nullptr) {
650            HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr");
651            return;
652        }
653    }
654    task->isFirstTaskInfo_ = false;
655
656    task->IncreaseRefCount();
657    HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str());
658    if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) {
659        task->taskState_ = ExecuteState::WAITING;
660        TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->periodicTaskPriority_);
661    }
662}
663
664napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo)
665{
666    int32_t period = 0;
667    uint32_t priority = Priority::DEFAULT;
668    Task* periodicTask = nullptr;
669    if (!CheckPeriodicallyParams(env, cbinfo, period, priority, periodicTask)) {
670        return nullptr;
671    }
672
673    if (!periodicTask->CanExecutePeriodically(env)) {
674        return nullptr;
675    }
676    periodicTask->UpdatePeriodicTask();
677
678    periodicTask->periodicTaskPriority_ = static_cast<Priority>(priority);
679    napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_);
680    TaskInfo* taskInfo = periodicTask->GetTaskInfo(env, napiTask, periodicTask->periodicTaskPriority_);
681    if (taskInfo == nullptr) {
682        return nullptr;
683    }
684
685    periodicTask->isFirstTaskInfo_ = true; // periodic task first Generate TaskInfo
686
687    TriggerTimer(env, periodicTask, period);
688    return nullptr;
689}
690
691void TaskPool::TriggerTimer(napi_env env, Task* task, int32_t period)
692{
693    HILOG_INFO("taskpool::TriggerTimer taskId %{public}s", std::to_string(task->taskId_).c_str());
694    uv_loop_t* loop = NapiHelper::GetLibUV(env);
695    task->timer_ = new uv_timer_t;
696    uv_timer_init(loop, task->timer_);
697    task->timer_->data = task;
698    uv_update_time(loop);
699    uv_timer_start(task->timer_, PeriodicTaskCallback, period, period);
700    NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
701    if (engine->IsMainThread()) {
702        uv_async_send(&loop->wq_async);
703    } else {
704        uv_work_t* work = new uv_work_t;
705        uv_queue_work_with_qos(loop, work, [](uv_work_t*) {},
706                               [](uv_work_t* work, int32_t) { delete work; }, uv_qos_user_initiated);
707    }
708}
709
710bool TaskPool::CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime,
711                                  Task* &task)
712{
713    size_t argc = 3; // 3: delayTime, task and priority
714    napi_value args[3]; // 3: delayTime, task and priority
715    napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
716    if (argc < 2 || argc > 3) { // 2: delayTime and task 3: delayTime, task and priority
717        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
718        return false;
719    }
720
721    if (!NapiHelper::IsNumber(env, args[0])) {
722        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
723        return false;
724    }
725
726    if (!NapiHelper::IsObject(env, args[1])) {
727        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be object.");
728        return false;
729    }
730
731    delayTime = NapiHelper::GetInt32Value(env, args[0]);
732    if (delayTime < 0) {
733        ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The delayTime is less than zero");
734        return false;
735    }
736
737    if (argc > 2) { // 2: the params might have priority
738        if (!NapiHelper::IsNumber(env, args[2])) {
739            ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the third param must be number.");
740            return false;
741        }
742        priority = NapiHelper::GetUint32Value(env, args[2]); // 2: get task priority
743        if (priority >= Priority::NUMBER) {
744            ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error.");
745            return false;
746        }
747    }
748
749    napi_unwrap(env, args[1], reinterpret_cast<void**>(&task));
750    if (task == nullptr) {
751        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of second param must be task");
752        return false;
753    }
754    if (!task->CanExecuteDelayed(env)) {
755        return false;
756    }
757    return true;
758}
759
760bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period,
761                                       uint32_t &priority, Task* &periodicTask)
762{
763    size_t argc = 3; // 3 : period, task, priority
764    napi_value args[3]; // 3 : period, task, priority
765    napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
766    if (argc < 2 || argc > 3) { // 2 : period, task and 3 : period, task, priority
767        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
768        return false;
769    }
770    if (!NapiHelper::IsNumber(env, args[0])) {
771        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
772        return false;
773    }
774    period = NapiHelper::GetInt32Value(env, args[0]);
775    if (period < 0) {
776        ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The period value is less than zero.");
777        return false;
778    }
779    if (!NapiHelper::IsObject(env, args[1])) {
780        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
781        return false;
782    }
783
784    if (argc >= 3) { // 3 : third param maybe priority
785        if (!NapiHelper::IsNumber(env, args[2])) { // 2 : priority
786            ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the third param must be priority.");
787            return false;
788        }
789        priority = NapiHelper::GetUint32Value(env, args[2]); // 2 : priority
790        if (priority >= Priority::NUMBER) {
791            ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the value of the priority is invalid.");
792            return false;
793        }
794    }
795
796    napi_unwrap(env, args[1], reinterpret_cast<void**>(&periodicTask));
797    if (periodicTask == nullptr) {
798        ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
799        return false;
800    }
801
802    return true;
803}
804} // namespace Commonlibrary::Concurrent::TaskPoolModule
805