1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "taskpool.h"
17
18 #include <cinttypes>
19
20 #include "helper/error_helper.h"
21 #include "helper/hitrace_helper.h"
22 #include "helper/napi_helper.h"
23 #include "helper/object_helper.h"
24 #include "message_queue.h"
25 #include "task_manager.h"
26 #include "tools/log.h"
27 #include "worker.h"
28
29 namespace Commonlibrary::Concurrent::TaskPoolModule {
30 using namespace Commonlibrary::Concurrent::Common::Helper;
31
InitTaskPool(napi_env env, napi_value exports)32 napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports)
33 {
34 HILOG_INFO("taskpool:: Import taskpool");
35 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
36 napi_value taskClass = nullptr;
37 napi_define_class(env, "Task", NAPI_AUTO_LENGTH, Task::TaskConstructor, nullptr, 0, nullptr, &taskClass);
38 napi_value longTaskClass = nullptr;
39 napi_define_class(env, "LongTask", NAPI_AUTO_LENGTH, Task::LongTaskConstructor,
40 nullptr, 0, nullptr, &longTaskClass);
41 napi_value genericsTaskClass = nullptr;
42 napi_define_class(env, "GenericsTask", NAPI_AUTO_LENGTH, Task::TaskConstructor,
43 nullptr, 0, nullptr, &genericsTaskClass);
44 napi_value isCanceledFunc = nullptr;
45 napi_create_function(env, "isCanceled", NAPI_AUTO_LENGTH, Task::IsCanceled, NULL, &isCanceledFunc);
46 napi_set_named_property(env, taskClass, "isCanceled", isCanceledFunc);
47 napi_value sendDataFunc = nullptr;
48 napi_create_function(env, "sendData", NAPI_AUTO_LENGTH, Task::SendData, NULL, &sendDataFunc);
49 napi_set_named_property(env, taskClass, "sendData", sendDataFunc);
50 napi_value taskGroupClass = nullptr;
51 napi_define_class(env, "TaskGroup", NAPI_AUTO_LENGTH, TaskGroup::TaskGroupConstructor, nullptr, 0, nullptr,
52 &taskGroupClass);
53 napi_value seqRunnerClass = nullptr;
54 napi_define_class(env, "SequenceRunner", NAPI_AUTO_LENGTH, SequenceRunner::SeqRunnerConstructor,
55 nullptr, 0, nullptr, &seqRunnerClass);
56
57 // define priority
58 napi_value priorityObj = NapiHelper::CreateObject(env);
59 napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH);
60 napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM);
61 napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW);
62 napi_value idlePriority = NapiHelper::CreateUint32(env, Priority::IDLE);
63 napi_property_descriptor exportPriority[] = {
64 DECLARE_NAPI_PROPERTY("HIGH", highPriority),
65 DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority),
66 DECLARE_NAPI_PROPERTY("LOW", lowPriority),
67 DECLARE_NAPI_PROPERTY("IDLE", idlePriority),
68 };
69 napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority);
70
71 // define State
72 napi_value stateObj = NapiHelper::CreateObject(env);
73 napi_value waitingState = NapiHelper::CreateUint32(env, ExecuteState::WAITING);
74 napi_value runningState = NapiHelper::CreateUint32(env, ExecuteState::RUNNING);
75 napi_value canceledState = NapiHelper::CreateUint32(env, ExecuteState::CANCELED);
76 napi_property_descriptor exportState[] = {
77 DECLARE_NAPI_PROPERTY("WAITING", waitingState),
78 DECLARE_NAPI_PROPERTY("RUNNING", runningState),
79 DECLARE_NAPI_PROPERTY("CANCELED", canceledState),
80 };
81 napi_define_properties(env, stateObj, sizeof(exportState) / sizeof(exportState[0]), exportState);
82
83 napi_property_descriptor properties[] = {
84 DECLARE_NAPI_PROPERTY("Task", taskClass),
85 DECLARE_NAPI_PROPERTY("LongTask", longTaskClass),
86 DECLARE_NAPI_PROPERTY("GenericsTask", genericsTaskClass),
87 DECLARE_NAPI_PROPERTY("TaskGroup", taskGroupClass),
88 DECLARE_NAPI_PROPERTY("SequenceRunner", seqRunnerClass),
89 DECLARE_NAPI_PROPERTY("Priority", priorityObj),
90 DECLARE_NAPI_PROPERTY("State", stateObj),
91 DECLARE_NAPI_FUNCTION("execute", Execute),
92 DECLARE_NAPI_FUNCTION("executeDelayed", ExecuteDelayed),
93 DECLARE_NAPI_FUNCTION("cancel", Cancel),
94 DECLARE_NAPI_FUNCTION("getTaskPoolInfo", GetTaskPoolInfo),
95 DECLARE_NAPI_FUNCTION("terminateTask", TerminateTask),
96 DECLARE_NAPI_FUNCTION("isConcurrent", IsConcurrent),
97 DECLARE_NAPI_FUNCTION("executePeriodically", ExecutePeriodically),
98 };
99 napi_define_properties(env, exports, sizeof(properties) / sizeof(properties[0]), properties);
100
101 TaskManager::GetInstance().InitTaskManager(env);
102 return exports;
103 }
104
105 // ---------------------------------- SendData ---------------------------------------
ExecuteCallback(const uv_async_t* req)106 void TaskPool::ExecuteCallback(const uv_async_t* req)
107 {
108 auto* msgQueue = TaskManager::GetInstance().GetMessageQueue(req);
109 if (msgQueue == nullptr) {
110 HILOG_ERROR("taskpool:: msgQueue is nullptr");
111 return;
112 }
113 ExecuteCallbackInner(*msgQueue);
114 }
115
ExecuteCallbackTask(CallbackInfo* callbackInfo)116 void TaskPool::ExecuteCallbackTask(CallbackInfo* callbackInfo)
117 {
118 auto* msgQueue = TaskManager::GetInstance().GetMessageQueueFromCallbackInfo(callbackInfo);
119 if (msgQueue == nullptr) {
120 HILOG_ERROR("taskpool:: msgQueue is nullptr");
121 return;
122 }
123 ExecuteCallbackInner(*msgQueue);
124 }
125
ExecuteCallbackInner(MsgQueue& msgQueue)126 void TaskPool::ExecuteCallbackInner(MsgQueue& msgQueue)
127 {
128 while (!msgQueue.IsEmpty()) {
129 auto resultInfo = msgQueue.DeQueue();
130 if (resultInfo == nullptr) {
131 HILOG_DEBUG("taskpool:: resultInfo is nullptr");
132 continue;
133 }
134 ObjectScope<TaskResultInfo> resultInfoScope(resultInfo, false);
135 napi_status status = napi_ok;
136 CallbackScope callbackScope(resultInfo->hostEnv, resultInfo->workerEnv, resultInfo->taskId, status);
137 if (status != napi_ok) {
138 HILOG_ERROR("napi_open_handle_scope failed");
139 return;
140 }
141 auto env = resultInfo->hostEnv;
142 auto callbackInfo = TaskManager::GetInstance().GetCallbackInfo(resultInfo->taskId);
143 if (callbackInfo == nullptr) {
144 HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
145 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
146 continue;
147 }
148 auto func = NapiHelper::GetReferenceValue(env, callbackInfo->callbackRef);
149 napi_value args;
150 napi_value result;
151 status = napi_deserialize(env, resultInfo->serializationArgs, &args);
152 napi_delete_serialization_data(env, resultInfo->serializationArgs);
153 if (status != napi_ok || args == nullptr) {
154 std::string errMessage = "taskpool:: failed to serialize function";
155 HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
156 ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
157 continue;
158 }
159 uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
160 napi_value argsArray[argsNum];
161 for (size_t i = 0; i < argsNum; i++) {
162 argsArray[i] = NapiHelper::GetElement(env, args, i);
163 }
164 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, &result);
165 if (NapiHelper::IsExceptionPending(env)) {
166 napi_value exception = nullptr;
167 napi_get_and_clear_last_exception(env, &exception);
168 HILOG_ERROR("taskpool:: an exception has occurred in napi_call_function");
169 }
170 }
171 }
172 // ---------------------------------- SendData ---------------------------------------
173
GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)174 napi_value TaskPool::GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)
175 {
176 napi_value result = nullptr;
177 napi_create_object(env, &result);
178 napi_value threadInfos = TaskManager::GetInstance().GetThreadInfos(env);
179 napi_value taskInfos = TaskManager::GetInstance().GetTaskInfos(env);
180 napi_set_named_property(env, result, "threadInfos", threadInfos);
181 napi_set_named_property(env, result, "taskInfos", taskInfos);
182 return result;
183 }
184
TerminateTask(napi_env env, napi_callback_info cbinfo)185 napi_value TaskPool::TerminateTask(napi_env env, napi_callback_info cbinfo)
186 {
187 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
188 size_t argc = 1; // 1: long task
189 napi_value args[1];
190 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
191 if (argc < 1) {
192 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be one.");
193 return nullptr;
194 }
195 if (!NapiHelper::IsObject(env, args[0])) {
196 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
197 return nullptr;
198 }
199 napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
200 uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
201 auto task = TaskManager::GetInstance().GetTask(taskId);
202 if (task == nullptr || !task->IsLongTask()) {
203 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be long task.");
204 return nullptr;
205 }
206 TaskManager::GetInstance().TerminateTask(taskId);
207 return nullptr;
208 }
209
Execute(napi_env env, napi_callback_info cbinfo)210 napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo)
211 {
212 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
213 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
214 if (argc < 1) {
215 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one.");
216 return nullptr;
217 }
218 napi_value* args = new napi_value[argc];
219 ObjectScope<napi_value> scope(args, true);
220 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
221 napi_valuetype type = napi_undefined;
222 napi_typeof(env, args[0], &type);
223 if (type == napi_object) {
224 uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
225 if (argc > 1) {
226 if (!NapiHelper::IsNumber(env, args[1])) {
227 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be number.");
228 return nullptr;
229 }
230 priority = NapiHelper::GetUint32Value(env, args[1]);
231 if (priority >= Priority::NUMBER) {
232 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error");
233 return nullptr;
234 }
235 }
236 if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
237 return ExecuteGroup(env, args[0], static_cast<Priority>(priority));
238 }
239 Task* task = nullptr;
240 napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
241 if (task == nullptr) {
242 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be task.");
243 return nullptr;
244 }
245 if (!task->CanExecute(env)) {
246 return nullptr;
247 }
248 napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::COMMON_TASK,
249 static_cast<Priority>(priority));
250 if (promise == nullptr) {
251 return nullptr;
252 }
253 ExecuteTask(env, task, static_cast<Priority>(priority));
254 return promise;
255 }
256 if (type != napi_function) {
257 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
258 "the type of the first param must be object or function.");
259 return nullptr;
260 }
261 Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::FUNCTION_TASK);
262 if (task == nullptr) {
263 HILOG_ERROR("taskpool:: GenerateFunctionTask failed");
264 return nullptr;
265 }
266 TaskManager::GetInstance().StoreTask(task->taskId_, task);
267 napi_value promise = NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred);
268 ExecuteTask(env, task);
269 return promise;
270 }
271
DelayTask(uv_timer_t* handle)272 void TaskPool::DelayTask(uv_timer_t* handle)
273 {
274 TaskMessage *taskMessage = static_cast<TaskMessage *>(handle->data);
275 auto task = TaskManager::GetInstance().GetTask(taskMessage->taskId);
276 if (task == nullptr) {
277 HILOG_DEBUG("taskpool:: task is nullptr");
278 } else if (task->taskState_ == ExecuteState::CANCELED) {
279 HILOG_DEBUG("taskpool:: DelayTask task has been canceled");
280 napi_value error = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
281 napi_reject_deferred(task->env_, taskMessage->deferred, error);
282 } else {
283 HILOG_INFO("taskpool:: DelayTask taskId %{public}s", std::to_string(taskMessage->taskId).c_str());
284 TaskManager::GetInstance().IncreaseRefCount(taskMessage->taskId);
285 task->IncreaseRefCount();
286 napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
287 TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, taskMessage->priority);
288 if (taskInfo != nullptr) {
289 taskInfo->deferred = taskMessage->deferred;
290 if (task->taskState_ == ExecuteState::DELAYED || task->taskState_ == ExecuteState::FINISHED) {
291 task->taskState_ = ExecuteState::WAITING;
292 TaskManager::GetInstance().EnqueueTaskId(taskMessage->taskId, Priority(taskMessage->priority));
293 }
294 } else {
295 napi_value execption = nullptr;
296 napi_get_and_clear_last_exception(task->env_, &execption);
297 if (execption != nullptr) {
298 napi_reject_deferred(task->env_, taskMessage->deferred, execption);
299 }
300 }
301 }
302 if (task != nullptr) {
303 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
304 task->delayedTimers_.erase(handle);
305 }
306 uv_timer_stop(handle);
307 uv_close(reinterpret_cast<uv_handle_t*>(handle), [](uv_handle_t* handle) {
308 delete reinterpret_cast<uv_timer_t*>(handle);
309 handle = nullptr;
310 });
311 delete taskMessage;
312 taskMessage = nullptr;
313 }
314
ExecuteDelayed(napi_env env, napi_callback_info cbinfo)315 napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo)
316 {
317 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
318 uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
319 int32_t delayTime = 0;
320 Task* task = nullptr;
321 if (!CheckDelayedParams(env, cbinfo, priority, delayTime, task)) {
322 return nullptr;
323 }
324
325 if (!task->IsExecuted() || task->taskState_ == ExecuteState::CANCELED ||
326 task->taskState_ == ExecuteState::FINISHED) {
327 task->taskState_ = ExecuteState::DELAYED;
328 }
329 task->UpdateTaskType(TaskType::COMMON_TASK);
330
331 uv_loop_t* loop = NapiHelper::GetLibUV(env);
332 uv_update_time(loop);
333 uv_timer_t* timer = new uv_timer_t;
334 uv_timer_init(loop, timer);
335 TaskMessage *taskMessage = new TaskMessage();
336 taskMessage->priority = static_cast<Priority>(priority);
337 taskMessage->taskId = task->taskId_;
338 napi_value promise = NapiHelper::CreatePromise(env, &taskMessage->deferred);
339 timer->data = taskMessage;
340
341 std::string strTrace = "ExecuteDelayed: taskId: " + std::to_string(task->taskId_);
342 strTrace += ", priority: " + std::to_string(priority);
343 strTrace += ", delayTime " + std::to_string(delayTime);
344 HITRACE_HELPER_METER_NAME(strTrace);
345 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
346
347 uv_timer_start(timer, reinterpret_cast<uv_timer_cb>(DelayTask), delayTime, 0);
348 {
349 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
350 task->delayedTimers_.insert(timer);
351 }
352 NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
353 if (engine->IsMainThread()) {
354 uv_async_send(&loop->wq_async);
355 } else {
356 uv_work_t *work = new uv_work_t;
357 uv_queue_work_with_qos(loop, work, [](uv_work_t *) {},
358 [](uv_work_t *work, int32_t) {delete work; }, uv_qos_user_initiated);
359 }
360 return promise;
361 }
362
ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority)363 napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority)
364 {
365 napi_value napiGroupId = NapiHelper::GetNameProperty(env, napiTaskGroup, GROUP_ID_STR);
366 uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
367 HILOG_INFO("taskpool::ExecuteGroup groupId %{public}s", std::to_string(groupId).c_str());
368 auto taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
369 napi_reference_ref(env, taskGroup->groupRef_, nullptr);
370 if (taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED ||
371 taskGroup->groupState_ == ExecuteState::CANCELED) {
372 taskGroup->groupState_ = ExecuteState::WAITING;
373 }
374 GroupInfo* groupInfo = new GroupInfo();
375 groupInfo->priority = priority;
376 napi_value resArr;
377 napi_create_array_with_length(env, taskGroup->taskIds_.size(), &resArr);
378 napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1);
379 groupInfo->resArr = arrRef;
380 napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred);
381 {
382 std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
383 if (taskGroup->currentGroupInfo_ == nullptr) {
384 taskGroup->currentGroupInfo_ = groupInfo;
385 for (auto iter = taskGroup->taskRefs_.begin(); iter != taskGroup->taskRefs_.end(); iter++) {
386 napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
387 Task* task = nullptr;
388 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
389 if (task == nullptr) {
390 HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
391 return nullptr;
392 }
393 napi_reference_ref(env, task->taskRef_, nullptr);
394 if (task->IsGroupCommonTask()) {
395 task->GetTaskInfo(env, napiTask, static_cast<Priority>(priority));
396 }
397 ExecuteTask(env, task, static_cast<Priority>(priority));
398 }
399 } else {
400 taskGroup->pendingGroupInfos_.push_back(groupInfo);
401 }
402 }
403 return promise;
404 }
405
HandleTaskResult(const uv_async_t* req)406 void TaskPool::HandleTaskResult(const uv_async_t* req)
407 {
408 HILOG_DEBUG("taskpool:: HandleTaskResult task");
409 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
410 auto task = static_cast<Task*>(req->data);
411 if (task == nullptr) { // LCOV_EXCL_BR_LINE
412 HILOG_FATAL("taskpool:: HandleTaskResult task is null");
413 return;
414 }
415 if (!task->IsMainThreadTask()) {
416 if (task->ShouldDeleteTask(false)) {
417 delete task;
418 return;
419 }
420 if (task->IsFunctionTask()) {
421 napi_remove_env_cleanup_hook(task->env_, Task::CleanupHookFunc, task);
422 }
423 }
424 task->DecreaseTaskRefCount();
425 HandleTaskResultCallback(task);
426 }
427
HandleTaskResultCallback(Task* task)428 void TaskPool::HandleTaskResultCallback(Task* task)
429 {
430 napi_handle_scope scope = nullptr;
431 NAPI_CALL_RETURN_VOID(task->env_, napi_open_handle_scope(task->env_, &scope));
432 napi_value napiTaskResult = nullptr;
433 napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult);
434 napi_delete_serialization_data(task->env_, task->result_);
435
436 // tag for trace parse: Task PerformTask End
437 std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_);
438 if (task->taskState_ == ExecuteState::CANCELED) {
439 strTrace += ", performResult : IsCanceled";
440 napiTaskResult = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
441 } else if (status != napi_ok) {
442 HILOG_ERROR("taskpool: failed to deserialize result");
443 strTrace += ", performResult : DeserializeFailed";
444 } else if (task->success_) {
445 strTrace += ", performResult : Successful";
446 } else {
447 strTrace += ", performResult : Unsuccessful";
448 }
449 HITRACE_HELPER_METER_NAME(strTrace);
450 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
451 if (napiTaskResult == nullptr) {
452 napi_get_undefined(task->env_, &napiTaskResult);
453 }
454 reinterpret_cast<NativeEngine*>(task->env_)->DecreaseSubEnvCounter();
455 bool success = ((status == napi_ok) && (task->taskState_ != ExecuteState::CANCELED)) && (task->success_);
456 task->taskState_ = ExecuteState::ENDING;
457 if (task->IsGroupTask()) {
458 UpdateGroupInfoByResult(task->env_, task, napiTaskResult, success);
459 } else if (!task->IsPeriodicTask()) {
460 if (success) {
461 napi_resolve_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
462 if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
463 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
464 }
465 } else {
466 napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
467 if (task->onExecutionFailedCallBackInfo_ != nullptr) {
468 task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult;
469 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
470 }
471 }
472 }
473 NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope));
474 TriggerTask(task);
475 }
476
TriggerTask(Task* task)477 void TaskPool::TriggerTask(Task* task)
478 {
479 HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str());
480 if (task->IsGroupTask()) {
481 return;
482 }
483 TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
484 task->taskState_ = ExecuteState::FINISHED;
485 // seqRunnerTask will trigger the next
486 if (task->IsSeqRunnerTask()) {
487 if (!TaskGroupManager::GetInstance().TriggerSeqRunner(task->env_, task)) {
488 HILOG_ERROR("seqRunner:: task %{public}s trigger in seqRunner %{public}s failed",
489 std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str());
490 }
491 } else if (task->IsCommonTask()) {
492 task->NotifyPendingTask();
493 }
494 if (task->IsPeriodicTask()) {
495 return;
496 }
497 if (!task->IsFunctionTask()) {
498 napi_reference_unref(task->env_, task->taskRef_, nullptr);
499 return;
500 }
501 TaskManager::GetInstance().RemoveTask(task->taskId_);
502 delete task;
503 }
504
UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success)505 void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success)
506 {
507 HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str());
508 TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
509 task->taskState_ = ExecuteState::FINISHED;
510 napi_reference_unref(env, task->taskRef_, nullptr);
511 if (task->IsGroupCommonTask()) {
512 delete task->currentTaskInfo_;
513 task->currentTaskInfo_ = nullptr;
514 }
515 TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
516 if (taskGroup == nullptr) {
517 HILOG_DEBUG("taskpool:: taskGroup has been released");
518 return;
519 }
520 if (taskGroup->currentGroupInfo_ == nullptr) {
521 HILOG_DEBUG("taskpool:: taskGroup has been canceled");
522 return;
523 }
524 uint32_t index = taskGroup->GetTaskIndex(task->taskId_);
525 auto groupInfo = taskGroup->currentGroupInfo_;
526 if (success) {
527 // Update res at resArr
528 napi_ref arrRef = groupInfo->resArr;
529 napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef);
530 napi_set_element(env, resArr, index, res);
531
532 groupInfo->finishedTask++;
533 if (groupInfo->finishedTask < taskGroup->taskNum_) {
534 return;
535 }
536 HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str());
537 napi_resolve_deferred(env, groupInfo->deferred, resArr);
538 for (uint64_t taskId : taskGroup->taskIds_) {
539 auto task = TaskManager::GetInstance().GetTask(taskId);
540 if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
541 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
542 }
543 }
544 } else {
545 napi_reject_deferred(env, groupInfo->deferred, res);
546 if (task->onExecutionFailedCallBackInfo_ != nullptr) {
547 task->onExecutionFailedCallBackInfo_->taskError_ = res;
548 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
549 }
550 }
551 taskGroup->groupState_ = ExecuteState::FINISHED;
552 napi_delete_reference(env, groupInfo->resArr);
553 napi_reference_unref(env, taskGroup->groupRef_, nullptr);
554 delete groupInfo;
555 taskGroup->currentGroupInfo_ = nullptr;
556 taskGroup->NotifyGroupTask(env);
557 }
558
ExecuteTask(napi_env env, Task* task, Priority priority)559 void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority)
560 {
561 // tag for trace parse: Task Allocation
562 std::string strTrace = "Task Allocation: taskId : " + std::to_string(task->taskId_)
563 + ", priority : " + std::to_string(priority)
564 + ", executeState : " + std::to_string(ExecuteState::WAITING);
565 HITRACE_HELPER_METER_NAME(strTrace);
566 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
567 task->IncreaseRefCount();
568 TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
569 if (task->IsFunctionTask() || (task->taskState_ != ExecuteState::WAITING &&
570 task->taskState_ != ExecuteState::RUNNING && task->taskState_ != ExecuteState::ENDING)) {
571 task->taskState_ = ExecuteState::WAITING;
572 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
573 }
574 }
575
Cancel(napi_env env, napi_callback_info cbinfo)576 napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo)
577 {
578 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
579 size_t argc = 1;
580 napi_value args[1];
581 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
582 if (argc < 1) {
583 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
584 return nullptr;
585 }
586
587 if (!NapiHelper::IsObject(env, args[0])) {
588 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
589 return nullptr;
590 }
591
592 if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
593 napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
594 if (napiTaskId == nullptr) {
595 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task.");
596 return nullptr;
597 }
598 uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
599 TaskManager::GetInstance().CancelTask(env, taskId);
600 } else {
601 napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR);
602 if (napiGroupId == nullptr) {
603 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be taskGroup.");
604 return nullptr;
605 }
606 uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
607 TaskGroupManager::GetInstance().CancelGroup(env, groupId);
608 }
609 return nullptr;
610 }
611
IsConcurrent(napi_env env, napi_callback_info cbinfo)612 napi_value TaskPool::IsConcurrent(napi_env env, napi_callback_info cbinfo)
613 {
614 size_t argc = 1;
615 napi_value args[1];
616 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
617 if (argc != 1) {
618 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
619 return nullptr;
620 }
621
622 if (!NapiHelper::IsFunction(env, args[0])) {
623 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be function.");
624 return nullptr;
625 }
626
627 bool isConcurrent = NapiHelper::IsConcurrentFunction(env, args[0]);
628 return NapiHelper::CreateBooleanValue(env, isConcurrent);
629 }
630
PeriodicTaskCallback(uv_timer_t* handle)631 void TaskPool::PeriodicTaskCallback(uv_timer_t* handle)
632 {
633 Task* task = reinterpret_cast<Task*>(handle->data);
634 if (task == nullptr) {
635 HILOG_DEBUG("taskpool:: the task is nullptr");
636 return;
637 } else if (!task->IsPeriodicTask()) {
638 HILOG_DEBUG("taskpool:: the current task is not a periodic task");
639 return;
640 } else if (task->taskState_ == ExecuteState::CANCELED) {
641 HILOG_DEBUG("taskpool:: the periodic task has been canceled");
642 return;
643 }
644 TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
645
646 if (!task->isFirstTaskInfo_) {
647 napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
648 TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_);
649 if (taskInfo == nullptr) {
650 HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr");
651 return;
652 }
653 }
654 task->isFirstTaskInfo_ = false;
655
656 task->IncreaseRefCount();
657 HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str());
658 if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) {
659 task->taskState_ = ExecuteState::WAITING;
660 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->periodicTaskPriority_);
661 }
662 }
663
ExecutePeriodically(napi_env env, napi_callback_info cbinfo)664 napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo)
665 {
666 int32_t period = 0;
667 uint32_t priority = Priority::DEFAULT;
668 Task* periodicTask = nullptr;
669 if (!CheckPeriodicallyParams(env, cbinfo, period, priority, periodicTask)) {
670 return nullptr;
671 }
672
673 if (!periodicTask->CanExecutePeriodically(env)) {
674 return nullptr;
675 }
676 periodicTask->UpdatePeriodicTask();
677
678 periodicTask->periodicTaskPriority_ = static_cast<Priority>(priority);
679 napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_);
680 TaskInfo* taskInfo = periodicTask->GetTaskInfo(env, napiTask, periodicTask->periodicTaskPriority_);
681 if (taskInfo == nullptr) {
682 return nullptr;
683 }
684
685 periodicTask->isFirstTaskInfo_ = true; // periodic task first Generate TaskInfo
686
687 TriggerTimer(env, periodicTask, period);
688 return nullptr;
689 }
690
TriggerTimer(napi_env env, Task* task, int32_t period)691 void TaskPool::TriggerTimer(napi_env env, Task* task, int32_t period)
692 {
693 HILOG_INFO("taskpool::TriggerTimer taskId %{public}s", std::to_string(task->taskId_).c_str());
694 uv_loop_t* loop = NapiHelper::GetLibUV(env);
695 task->timer_ = new uv_timer_t;
696 uv_timer_init(loop, task->timer_);
697 task->timer_->data = task;
698 uv_update_time(loop);
699 uv_timer_start(task->timer_, PeriodicTaskCallback, period, period);
700 NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
701 if (engine->IsMainThread()) {
702 uv_async_send(&loop->wq_async);
703 } else {
704 uv_work_t* work = new uv_work_t;
705 uv_queue_work_with_qos(loop, work, [](uv_work_t*) {},
706 [](uv_work_t* work, int32_t) { delete work; }, uv_qos_user_initiated);
707 }
708 }
709
CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime, Task* &task)710 bool TaskPool::CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime,
711 Task* &task)
712 {
713 size_t argc = 3; // 3: delayTime, task and priority
714 napi_value args[3]; // 3: delayTime, task and priority
715 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
716 if (argc < 2 || argc > 3) { // 2: delayTime and task 3: delayTime, task and priority
717 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
718 return false;
719 }
720
721 if (!NapiHelper::IsNumber(env, args[0])) {
722 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
723 return false;
724 }
725
726 if (!NapiHelper::IsObject(env, args[1])) {
727 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be object.");
728 return false;
729 }
730
731 delayTime = NapiHelper::GetInt32Value(env, args[0]);
732 if (delayTime < 0) {
733 ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The delayTime is less than zero");
734 return false;
735 }
736
737 if (argc > 2) { // 2: the params might have priority
738 if (!NapiHelper::IsNumber(env, args[2])) {
739 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the third param must be number.");
740 return false;
741 }
742 priority = NapiHelper::GetUint32Value(env, args[2]); // 2: get task priority
743 if (priority >= Priority::NUMBER) {
744 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error.");
745 return false;
746 }
747 }
748
749 napi_unwrap(env, args[1], reinterpret_cast<void**>(&task));
750 if (task == nullptr) {
751 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of second param must be task");
752 return false;
753 }
754 if (!task->CanExecuteDelayed(env)) {
755 return false;
756 }
757 return true;
758 }
759
CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period, uint32_t &priority, Task* &periodicTask)760 bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period,
761 uint32_t &priority, Task* &periodicTask)
762 {
763 size_t argc = 3; // 3 : period, task, priority
764 napi_value args[3]; // 3 : period, task, priority
765 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
766 if (argc < 2 || argc > 3) { // 2 : period, task and 3 : period, task, priority
767 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
768 return false;
769 }
770 if (!NapiHelper::IsNumber(env, args[0])) {
771 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
772 return false;
773 }
774 period = NapiHelper::GetInt32Value(env, args[0]);
775 if (period < 0) {
776 ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The period value is less than zero.");
777 return false;
778 }
779 if (!NapiHelper::IsObject(env, args[1])) {
780 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
781 return false;
782 }
783
784 if (argc >= 3) { // 3 : third param maybe priority
785 if (!NapiHelper::IsNumber(env, args[2])) { // 2 : priority
786 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the third param must be priority.");
787 return false;
788 }
789 priority = NapiHelper::GetUint32Value(env, args[2]); // 2 : priority
790 if (priority >= Priority::NUMBER) {
791 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the value of the priority is invalid.");
792 return false;
793 }
794 }
795
796 napi_unwrap(env, args[1], reinterpret_cast<void**>(&periodicTask));
797 if (periodicTask == nullptr) {
798 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
799 return false;
800 }
801
802 return true;
803 }
804 } // namespace Commonlibrary::Concurrent::TaskPoolModule
805