1/* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "taskpool.h" 17 18#include <cinttypes> 19 20#include "helper/error_helper.h" 21#include "helper/hitrace_helper.h" 22#include "helper/napi_helper.h" 23#include "helper/object_helper.h" 24#include "message_queue.h" 25#include "task_manager.h" 26#include "tools/log.h" 27#include "worker.h" 28 29namespace Commonlibrary::Concurrent::TaskPoolModule { 30using namespace Commonlibrary::Concurrent::Common::Helper; 31 32napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports) 33{ 34 HILOG_INFO("taskpool:: Import taskpool"); 35 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 36 napi_value taskClass = nullptr; 37 napi_define_class(env, "Task", NAPI_AUTO_LENGTH, Task::TaskConstructor, nullptr, 0, nullptr, &taskClass); 38 napi_value longTaskClass = nullptr; 39 napi_define_class(env, "LongTask", NAPI_AUTO_LENGTH, Task::LongTaskConstructor, 40 nullptr, 0, nullptr, &longTaskClass); 41 napi_value genericsTaskClass = nullptr; 42 napi_define_class(env, "GenericsTask", NAPI_AUTO_LENGTH, Task::TaskConstructor, 43 nullptr, 0, nullptr, &genericsTaskClass); 44 napi_value isCanceledFunc = nullptr; 45 napi_create_function(env, "isCanceled", NAPI_AUTO_LENGTH, Task::IsCanceled, NULL, &isCanceledFunc); 46 napi_set_named_property(env, taskClass, "isCanceled", isCanceledFunc); 47 napi_value sendDataFunc = nullptr; 48 napi_create_function(env, "sendData", NAPI_AUTO_LENGTH, Task::SendData, NULL, &sendDataFunc); 49 napi_set_named_property(env, taskClass, "sendData", sendDataFunc); 50 napi_value taskGroupClass = nullptr; 51 napi_define_class(env, "TaskGroup", NAPI_AUTO_LENGTH, TaskGroup::TaskGroupConstructor, nullptr, 0, nullptr, 52 &taskGroupClass); 53 napi_value seqRunnerClass = nullptr; 54 napi_define_class(env, "SequenceRunner", NAPI_AUTO_LENGTH, SequenceRunner::SeqRunnerConstructor, 55 nullptr, 0, nullptr, &seqRunnerClass); 56 57 // define priority 58 napi_value priorityObj = NapiHelper::CreateObject(env); 59 napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH); 60 napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM); 61 napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW); 62 napi_value idlePriority = NapiHelper::CreateUint32(env, Priority::IDLE); 63 napi_property_descriptor exportPriority[] = { 64 DECLARE_NAPI_PROPERTY("HIGH", highPriority), 65 DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority), 66 DECLARE_NAPI_PROPERTY("LOW", lowPriority), 67 DECLARE_NAPI_PROPERTY("IDLE", idlePriority), 68 }; 69 napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority); 70 71 // define State 72 napi_value stateObj = NapiHelper::CreateObject(env); 73 napi_value waitingState = NapiHelper::CreateUint32(env, ExecuteState::WAITING); 74 napi_value runningState = NapiHelper::CreateUint32(env, ExecuteState::RUNNING); 75 napi_value canceledState = NapiHelper::CreateUint32(env, ExecuteState::CANCELED); 76 napi_property_descriptor exportState[] = { 77 DECLARE_NAPI_PROPERTY("WAITING", waitingState), 78 DECLARE_NAPI_PROPERTY("RUNNING", runningState), 79 DECLARE_NAPI_PROPERTY("CANCELED", canceledState), 80 }; 81 napi_define_properties(env, stateObj, sizeof(exportState) / sizeof(exportState[0]), exportState); 82 83 napi_property_descriptor properties[] = { 84 DECLARE_NAPI_PROPERTY("Task", taskClass), 85 DECLARE_NAPI_PROPERTY("LongTask", longTaskClass), 86 DECLARE_NAPI_PROPERTY("GenericsTask", genericsTaskClass), 87 DECLARE_NAPI_PROPERTY("TaskGroup", taskGroupClass), 88 DECLARE_NAPI_PROPERTY("SequenceRunner", seqRunnerClass), 89 DECLARE_NAPI_PROPERTY("Priority", priorityObj), 90 DECLARE_NAPI_PROPERTY("State", stateObj), 91 DECLARE_NAPI_FUNCTION("execute", Execute), 92 DECLARE_NAPI_FUNCTION("executeDelayed", ExecuteDelayed), 93 DECLARE_NAPI_FUNCTION("cancel", Cancel), 94 DECLARE_NAPI_FUNCTION("getTaskPoolInfo", GetTaskPoolInfo), 95 DECLARE_NAPI_FUNCTION("terminateTask", TerminateTask), 96 DECLARE_NAPI_FUNCTION("isConcurrent", IsConcurrent), 97 DECLARE_NAPI_FUNCTION("executePeriodically", ExecutePeriodically), 98 }; 99 napi_define_properties(env, exports, sizeof(properties) / sizeof(properties[0]), properties); 100 101 TaskManager::GetInstance().InitTaskManager(env); 102 return exports; 103} 104 105// ---------------------------------- SendData --------------------------------------- 106void TaskPool::ExecuteCallback(const uv_async_t* req) 107{ 108 auto* msgQueue = TaskManager::GetInstance().GetMessageQueue(req); 109 if (msgQueue == nullptr) { 110 HILOG_ERROR("taskpool:: msgQueue is nullptr"); 111 return; 112 } 113 ExecuteCallbackInner(*msgQueue); 114} 115 116void TaskPool::ExecuteCallbackTask(CallbackInfo* callbackInfo) 117{ 118 auto* msgQueue = TaskManager::GetInstance().GetMessageQueueFromCallbackInfo(callbackInfo); 119 if (msgQueue == nullptr) { 120 HILOG_ERROR("taskpool:: msgQueue is nullptr"); 121 return; 122 } 123 ExecuteCallbackInner(*msgQueue); 124} 125 126void TaskPool::ExecuteCallbackInner(MsgQueue& msgQueue) 127{ 128 while (!msgQueue.IsEmpty()) { 129 auto resultInfo = msgQueue.DeQueue(); 130 if (resultInfo == nullptr) { 131 HILOG_DEBUG("taskpool:: resultInfo is nullptr"); 132 continue; 133 } 134 ObjectScope<TaskResultInfo> resultInfoScope(resultInfo, false); 135 napi_status status = napi_ok; 136 CallbackScope callbackScope(resultInfo->hostEnv, resultInfo->workerEnv, resultInfo->taskId, status); 137 if (status != napi_ok) { 138 HILOG_ERROR("napi_open_handle_scope failed"); 139 return; 140 } 141 auto env = resultInfo->hostEnv; 142 auto callbackInfo = TaskManager::GetInstance().GetCallbackInfo(resultInfo->taskId); 143 if (callbackInfo == nullptr) { 144 HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side"); 145 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED); 146 continue; 147 } 148 auto func = NapiHelper::GetReferenceValue(env, callbackInfo->callbackRef); 149 napi_value args; 150 napi_value result; 151 status = napi_deserialize(env, resultInfo->serializationArgs, &args); 152 napi_delete_serialization_data(env, resultInfo->serializationArgs); 153 if (status != napi_ok || args == nullptr) { 154 std::string errMessage = "taskpool:: failed to serialize function"; 155 HILOG_ERROR("%{public}s in SendData", errMessage.c_str()); 156 ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str()); 157 continue; 158 } 159 uint32_t argsNum = NapiHelper::GetArrayLength(env, args); 160 napi_value argsArray[argsNum]; 161 for (size_t i = 0; i < argsNum; i++) { 162 argsArray[i] = NapiHelper::GetElement(env, args, i); 163 } 164 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, &result); 165 if (NapiHelper::IsExceptionPending(env)) { 166 napi_value exception = nullptr; 167 napi_get_and_clear_last_exception(env, &exception); 168 HILOG_ERROR("taskpool:: an exception has occurred in napi_call_function"); 169 } 170 } 171} 172// ---------------------------------- SendData --------------------------------------- 173 174napi_value TaskPool::GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo) 175{ 176 napi_value result = nullptr; 177 napi_create_object(env, &result); 178 napi_value threadInfos = TaskManager::GetInstance().GetThreadInfos(env); 179 napi_value taskInfos = TaskManager::GetInstance().GetTaskInfos(env); 180 napi_set_named_property(env, result, "threadInfos", threadInfos); 181 napi_set_named_property(env, result, "taskInfos", taskInfos); 182 return result; 183} 184 185napi_value TaskPool::TerminateTask(napi_env env, napi_callback_info cbinfo) 186{ 187 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 188 size_t argc = 1; // 1: long task 189 napi_value args[1]; 190 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr); 191 if (argc < 1) { 192 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be one."); 193 return nullptr; 194 } 195 if (!NapiHelper::IsObject(env, args[0])) { 196 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object."); 197 return nullptr; 198 } 199 napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR); 200 uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId); 201 auto task = TaskManager::GetInstance().GetTask(taskId); 202 if (task == nullptr || !task->IsLongTask()) { 203 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be long task."); 204 return nullptr; 205 } 206 TaskManager::GetInstance().TerminateTask(taskId); 207 return nullptr; 208} 209 210napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo) 211{ 212 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 213 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo); 214 if (argc < 1) { 215 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one."); 216 return nullptr; 217 } 218 napi_value* args = new napi_value[argc]; 219 ObjectScope<napi_value> scope(args, true); 220 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr); 221 napi_valuetype type = napi_undefined; 222 napi_typeof(env, args[0], &type); 223 if (type == napi_object) { 224 uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM 225 if (argc > 1) { 226 if (!NapiHelper::IsNumber(env, args[1])) { 227 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be number."); 228 return nullptr; 229 } 230 priority = NapiHelper::GetUint32Value(env, args[1]); 231 if (priority >= Priority::NUMBER) { 232 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error"); 233 return nullptr; 234 } 235 } 236 if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) { 237 return ExecuteGroup(env, args[0], static_cast<Priority>(priority)); 238 } 239 Task* task = nullptr; 240 napi_unwrap(env, args[0], reinterpret_cast<void**>(&task)); 241 if (task == nullptr) { 242 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be task."); 243 return nullptr; 244 } 245 if (!task->CanExecute(env)) { 246 return nullptr; 247 } 248 napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::COMMON_TASK, 249 static_cast<Priority>(priority)); 250 if (promise == nullptr) { 251 return nullptr; 252 } 253 ExecuteTask(env, task, static_cast<Priority>(priority)); 254 return promise; 255 } 256 if (type != napi_function) { 257 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, 258 "the type of the first param must be object or function."); 259 return nullptr; 260 } 261 Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::FUNCTION_TASK); 262 if (task == nullptr) { 263 HILOG_ERROR("taskpool:: GenerateFunctionTask failed"); 264 return nullptr; 265 } 266 TaskManager::GetInstance().StoreTask(task->taskId_, task); 267 napi_value promise = NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred); 268 ExecuteTask(env, task); 269 return promise; 270} 271 272void TaskPool::DelayTask(uv_timer_t* handle) 273{ 274 TaskMessage *taskMessage = static_cast<TaskMessage *>(handle->data); 275 auto task = TaskManager::GetInstance().GetTask(taskMessage->taskId); 276 if (task == nullptr) { 277 HILOG_DEBUG("taskpool:: task is nullptr"); 278 } else if (task->taskState_ == ExecuteState::CANCELED) { 279 HILOG_DEBUG("taskpool:: DelayTask task has been canceled"); 280 napi_value error = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled"); 281 napi_reject_deferred(task->env_, taskMessage->deferred, error); 282 } else { 283 HILOG_INFO("taskpool:: DelayTask taskId %{public}s", std::to_string(taskMessage->taskId).c_str()); 284 TaskManager::GetInstance().IncreaseRefCount(taskMessage->taskId); 285 task->IncreaseRefCount(); 286 napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_); 287 TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, taskMessage->priority); 288 if (taskInfo != nullptr) { 289 taskInfo->deferred = taskMessage->deferred; 290 if (task->taskState_ == ExecuteState::DELAYED || task->taskState_ == ExecuteState::FINISHED) { 291 task->taskState_ = ExecuteState::WAITING; 292 TaskManager::GetInstance().EnqueueTaskId(taskMessage->taskId, Priority(taskMessage->priority)); 293 } 294 } else { 295 napi_value execption = nullptr; 296 napi_get_and_clear_last_exception(task->env_, &execption); 297 if (execption != nullptr) { 298 napi_reject_deferred(task->env_, taskMessage->deferred, execption); 299 } 300 } 301 } 302 if (task != nullptr) { 303 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_); 304 task->delayedTimers_.erase(handle); 305 } 306 uv_timer_stop(handle); 307 uv_close(reinterpret_cast<uv_handle_t*>(handle), [](uv_handle_t* handle) { 308 delete reinterpret_cast<uv_timer_t*>(handle); 309 handle = nullptr; 310 }); 311 delete taskMessage; 312 taskMessage = nullptr; 313} 314 315napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo) 316{ 317 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 318 uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM 319 int32_t delayTime = 0; 320 Task* task = nullptr; 321 if (!CheckDelayedParams(env, cbinfo, priority, delayTime, task)) { 322 return nullptr; 323 } 324 325 if (!task->IsExecuted() || task->taskState_ == ExecuteState::CANCELED || 326 task->taskState_ == ExecuteState::FINISHED) { 327 task->taskState_ = ExecuteState::DELAYED; 328 } 329 task->UpdateTaskType(TaskType::COMMON_TASK); 330 331 uv_loop_t* loop = NapiHelper::GetLibUV(env); 332 uv_update_time(loop); 333 uv_timer_t* timer = new uv_timer_t; 334 uv_timer_init(loop, timer); 335 TaskMessage *taskMessage = new TaskMessage(); 336 taskMessage->priority = static_cast<Priority>(priority); 337 taskMessage->taskId = task->taskId_; 338 napi_value promise = NapiHelper::CreatePromise(env, &taskMessage->deferred); 339 timer->data = taskMessage; 340 341 std::string strTrace = "ExecuteDelayed: taskId: " + std::to_string(task->taskId_); 342 strTrace += ", priority: " + std::to_string(priority); 343 strTrace += ", delayTime " + std::to_string(delayTime); 344 HITRACE_HELPER_METER_NAME(strTrace); 345 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str()); 346 347 uv_timer_start(timer, reinterpret_cast<uv_timer_cb>(DelayTask), delayTime, 0); 348 { 349 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_); 350 task->delayedTimers_.insert(timer); 351 } 352 NativeEngine* engine = reinterpret_cast<NativeEngine*>(env); 353 if (engine->IsMainThread()) { 354 uv_async_send(&loop->wq_async); 355 } else { 356 uv_work_t *work = new uv_work_t; 357 uv_queue_work_with_qos(loop, work, [](uv_work_t *) {}, 358 [](uv_work_t *work, int32_t) {delete work; }, uv_qos_user_initiated); 359 } 360 return promise; 361} 362 363napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority) 364{ 365 napi_value napiGroupId = NapiHelper::GetNameProperty(env, napiTaskGroup, GROUP_ID_STR); 366 uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId); 367 HILOG_INFO("taskpool::ExecuteGroup groupId %{public}s", std::to_string(groupId).c_str()); 368 auto taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(groupId); 369 napi_reference_ref(env, taskGroup->groupRef_, nullptr); 370 if (taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED || 371 taskGroup->groupState_ == ExecuteState::CANCELED) { 372 taskGroup->groupState_ = ExecuteState::WAITING; 373 } 374 GroupInfo* groupInfo = new GroupInfo(); 375 groupInfo->priority = priority; 376 napi_value resArr; 377 napi_create_array_with_length(env, taskGroup->taskIds_.size(), &resArr); 378 napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1); 379 groupInfo->resArr = arrRef; 380 napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred); 381 { 382 std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_); 383 if (taskGroup->currentGroupInfo_ == nullptr) { 384 taskGroup->currentGroupInfo_ = groupInfo; 385 for (auto iter = taskGroup->taskRefs_.begin(); iter != taskGroup->taskRefs_.end(); iter++) { 386 napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter); 387 Task* task = nullptr; 388 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task)); 389 if (task == nullptr) { 390 HILOG_ERROR("taskpool::ExecuteGroup task is nullptr"); 391 return nullptr; 392 } 393 napi_reference_ref(env, task->taskRef_, nullptr); 394 if (task->IsGroupCommonTask()) { 395 task->GetTaskInfo(env, napiTask, static_cast<Priority>(priority)); 396 } 397 ExecuteTask(env, task, static_cast<Priority>(priority)); 398 } 399 } else { 400 taskGroup->pendingGroupInfos_.push_back(groupInfo); 401 } 402 } 403 return promise; 404} 405 406void TaskPool::HandleTaskResult(const uv_async_t* req) 407{ 408 HILOG_DEBUG("taskpool:: HandleTaskResult task"); 409 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 410 auto task = static_cast<Task*>(req->data); 411 if (task == nullptr) { // LCOV_EXCL_BR_LINE 412 HILOG_FATAL("taskpool:: HandleTaskResult task is null"); 413 return; 414 } 415 if (!task->IsMainThreadTask()) { 416 if (task->ShouldDeleteTask(false)) { 417 delete task; 418 return; 419 } 420 if (task->IsFunctionTask()) { 421 napi_remove_env_cleanup_hook(task->env_, Task::CleanupHookFunc, task); 422 } 423 } 424 task->DecreaseTaskRefCount(); 425 HandleTaskResultCallback(task); 426} 427 428void TaskPool::HandleTaskResultCallback(Task* task) 429{ 430 napi_handle_scope scope = nullptr; 431 NAPI_CALL_RETURN_VOID(task->env_, napi_open_handle_scope(task->env_, &scope)); 432 napi_value napiTaskResult = nullptr; 433 napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult); 434 napi_delete_serialization_data(task->env_, task->result_); 435 436 // tag for trace parse: Task PerformTask End 437 std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_); 438 if (task->taskState_ == ExecuteState::CANCELED) { 439 strTrace += ", performResult : IsCanceled"; 440 napiTaskResult = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled"); 441 } else if (status != napi_ok) { 442 HILOG_ERROR("taskpool: failed to deserialize result"); 443 strTrace += ", performResult : DeserializeFailed"; 444 } else if (task->success_) { 445 strTrace += ", performResult : Successful"; 446 } else { 447 strTrace += ", performResult : Unsuccessful"; 448 } 449 HITRACE_HELPER_METER_NAME(strTrace); 450 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str()); 451 if (napiTaskResult == nullptr) { 452 napi_get_undefined(task->env_, &napiTaskResult); 453 } 454 reinterpret_cast<NativeEngine*>(task->env_)->DecreaseSubEnvCounter(); 455 bool success = ((status == napi_ok) && (task->taskState_ != ExecuteState::CANCELED)) && (task->success_); 456 task->taskState_ = ExecuteState::ENDING; 457 if (task->IsGroupTask()) { 458 UpdateGroupInfoByResult(task->env_, task, napiTaskResult, success); 459 } else if (!task->IsPeriodicTask()) { 460 if (success) { 461 napi_resolve_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult); 462 if (task->onExecutionSucceededCallBackInfo_ != nullptr) { 463 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_); 464 } 465 } else { 466 napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult); 467 if (task->onExecutionFailedCallBackInfo_ != nullptr) { 468 task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult; 469 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_); 470 } 471 } 472 } 473 NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope)); 474 TriggerTask(task); 475} 476 477void TaskPool::TriggerTask(Task* task) 478{ 479 HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str()); 480 if (task->IsGroupTask()) { 481 return; 482 } 483 TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_); 484 task->taskState_ = ExecuteState::FINISHED; 485 // seqRunnerTask will trigger the next 486 if (task->IsSeqRunnerTask()) { 487 if (!TaskGroupManager::GetInstance().TriggerSeqRunner(task->env_, task)) { 488 HILOG_ERROR("seqRunner:: task %{public}s trigger in seqRunner %{public}s failed", 489 std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str()); 490 } 491 } else if (task->IsCommonTask()) { 492 task->NotifyPendingTask(); 493 } 494 if (task->IsPeriodicTask()) { 495 return; 496 } 497 if (!task->IsFunctionTask()) { 498 napi_reference_unref(task->env_, task->taskRef_, nullptr); 499 return; 500 } 501 TaskManager::GetInstance().RemoveTask(task->taskId_); 502 delete task; 503} 504 505void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success) 506{ 507 HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str()); 508 TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_); 509 task->taskState_ = ExecuteState::FINISHED; 510 napi_reference_unref(env, task->taskRef_, nullptr); 511 if (task->IsGroupCommonTask()) { 512 delete task->currentTaskInfo_; 513 task->currentTaskInfo_ = nullptr; 514 } 515 TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_); 516 if (taskGroup == nullptr) { 517 HILOG_DEBUG("taskpool:: taskGroup has been released"); 518 return; 519 } 520 if (taskGroup->currentGroupInfo_ == nullptr) { 521 HILOG_DEBUG("taskpool:: taskGroup has been canceled"); 522 return; 523 } 524 uint32_t index = taskGroup->GetTaskIndex(task->taskId_); 525 auto groupInfo = taskGroup->currentGroupInfo_; 526 if (success) { 527 // Update res at resArr 528 napi_ref arrRef = groupInfo->resArr; 529 napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef); 530 napi_set_element(env, resArr, index, res); 531 532 groupInfo->finishedTask++; 533 if (groupInfo->finishedTask < taskGroup->taskNum_) { 534 return; 535 } 536 HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str()); 537 napi_resolve_deferred(env, groupInfo->deferred, resArr); 538 for (uint64_t taskId : taskGroup->taskIds_) { 539 auto task = TaskManager::GetInstance().GetTask(taskId); 540 if (task->onExecutionSucceededCallBackInfo_ != nullptr) { 541 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_); 542 } 543 } 544 } else { 545 napi_reject_deferred(env, groupInfo->deferred, res); 546 if (task->onExecutionFailedCallBackInfo_ != nullptr) { 547 task->onExecutionFailedCallBackInfo_->taskError_ = res; 548 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_); 549 } 550 } 551 taskGroup->groupState_ = ExecuteState::FINISHED; 552 napi_delete_reference(env, groupInfo->resArr); 553 napi_reference_unref(env, taskGroup->groupRef_, nullptr); 554 delete groupInfo; 555 taskGroup->currentGroupInfo_ = nullptr; 556 taskGroup->NotifyGroupTask(env); 557} 558 559void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority) 560{ 561 // tag for trace parse: Task Allocation 562 std::string strTrace = "Task Allocation: taskId : " + std::to_string(task->taskId_) 563 + ", priority : " + std::to_string(priority) 564 + ", executeState : " + std::to_string(ExecuteState::WAITING); 565 HITRACE_HELPER_METER_NAME(strTrace); 566 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str()); 567 task->IncreaseRefCount(); 568 TaskManager::GetInstance().IncreaseRefCount(task->taskId_); 569 if (task->IsFunctionTask() || (task->taskState_ != ExecuteState::WAITING && 570 task->taskState_ != ExecuteState::RUNNING && task->taskState_ != ExecuteState::ENDING)) { 571 task->taskState_ = ExecuteState::WAITING; 572 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority); 573 } 574} 575 576napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo) 577{ 578 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 579 size_t argc = 1; 580 napi_value args[1]; 581 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr); 582 if (argc < 1) { 583 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1."); 584 return nullptr; 585 } 586 587 if (!NapiHelper::IsObject(env, args[0])) { 588 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object."); 589 return nullptr; 590 } 591 592 if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) { 593 napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR); 594 if (napiTaskId == nullptr) { 595 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task."); 596 return nullptr; 597 } 598 uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId); 599 TaskManager::GetInstance().CancelTask(env, taskId); 600 } else { 601 napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR); 602 if (napiGroupId == nullptr) { 603 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be taskGroup."); 604 return nullptr; 605 } 606 uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId); 607 TaskGroupManager::GetInstance().CancelGroup(env, groupId); 608 } 609 return nullptr; 610} 611 612napi_value TaskPool::IsConcurrent(napi_env env, napi_callback_info cbinfo) 613{ 614 size_t argc = 1; 615 napi_value args[1]; 616 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr); 617 if (argc != 1) { 618 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1."); 619 return nullptr; 620 } 621 622 if (!NapiHelper::IsFunction(env, args[0])) { 623 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be function."); 624 return nullptr; 625 } 626 627 bool isConcurrent = NapiHelper::IsConcurrentFunction(env, args[0]); 628 return NapiHelper::CreateBooleanValue(env, isConcurrent); 629} 630 631void TaskPool::PeriodicTaskCallback(uv_timer_t* handle) 632{ 633 Task* task = reinterpret_cast<Task*>(handle->data); 634 if (task == nullptr) { 635 HILOG_DEBUG("taskpool:: the task is nullptr"); 636 return; 637 } else if (!task->IsPeriodicTask()) { 638 HILOG_DEBUG("taskpool:: the current task is not a periodic task"); 639 return; 640 } else if (task->taskState_ == ExecuteState::CANCELED) { 641 HILOG_DEBUG("taskpool:: the periodic task has been canceled"); 642 return; 643 } 644 TaskManager::GetInstance().IncreaseRefCount(task->taskId_); 645 646 if (!task->isFirstTaskInfo_) { 647 napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_); 648 TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_); 649 if (taskInfo == nullptr) { 650 HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr"); 651 return; 652 } 653 } 654 task->isFirstTaskInfo_ = false; 655 656 task->IncreaseRefCount(); 657 HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str()); 658 if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) { 659 task->taskState_ = ExecuteState::WAITING; 660 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->periodicTaskPriority_); 661 } 662} 663 664napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo) 665{ 666 int32_t period = 0; 667 uint32_t priority = Priority::DEFAULT; 668 Task* periodicTask = nullptr; 669 if (!CheckPeriodicallyParams(env, cbinfo, period, priority, periodicTask)) { 670 return nullptr; 671 } 672 673 if (!periodicTask->CanExecutePeriodically(env)) { 674 return nullptr; 675 } 676 periodicTask->UpdatePeriodicTask(); 677 678 periodicTask->periodicTaskPriority_ = static_cast<Priority>(priority); 679 napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_); 680 TaskInfo* taskInfo = periodicTask->GetTaskInfo(env, napiTask, periodicTask->periodicTaskPriority_); 681 if (taskInfo == nullptr) { 682 return nullptr; 683 } 684 685 periodicTask->isFirstTaskInfo_ = true; // periodic task first Generate TaskInfo 686 687 TriggerTimer(env, periodicTask, period); 688 return nullptr; 689} 690 691void TaskPool::TriggerTimer(napi_env env, Task* task, int32_t period) 692{ 693 HILOG_INFO("taskpool::TriggerTimer taskId %{public}s", std::to_string(task->taskId_).c_str()); 694 uv_loop_t* loop = NapiHelper::GetLibUV(env); 695 task->timer_ = new uv_timer_t; 696 uv_timer_init(loop, task->timer_); 697 task->timer_->data = task; 698 uv_update_time(loop); 699 uv_timer_start(task->timer_, PeriodicTaskCallback, period, period); 700 NativeEngine* engine = reinterpret_cast<NativeEngine*>(env); 701 if (engine->IsMainThread()) { 702 uv_async_send(&loop->wq_async); 703 } else { 704 uv_work_t* work = new uv_work_t; 705 uv_queue_work_with_qos(loop, work, [](uv_work_t*) {}, 706 [](uv_work_t* work, int32_t) { delete work; }, uv_qos_user_initiated); 707 } 708} 709 710bool TaskPool::CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime, 711 Task* &task) 712{ 713 size_t argc = 3; // 3: delayTime, task and priority 714 napi_value args[3]; // 3: delayTime, task and priority 715 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr); 716 if (argc < 2 || argc > 3) { // 2: delayTime and task 3: delayTime, task and priority 717 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three."); 718 return false; 719 } 720 721 if (!NapiHelper::IsNumber(env, args[0])) { 722 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number."); 723 return false; 724 } 725 726 if (!NapiHelper::IsObject(env, args[1])) { 727 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be object."); 728 return false; 729 } 730 731 delayTime = NapiHelper::GetInt32Value(env, args[0]); 732 if (delayTime < 0) { 733 ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The delayTime is less than zero"); 734 return false; 735 } 736 737 if (argc > 2) { // 2: the params might have priority 738 if (!NapiHelper::IsNumber(env, args[2])) { 739 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the third param must be number."); 740 return false; 741 } 742 priority = NapiHelper::GetUint32Value(env, args[2]); // 2: get task priority 743 if (priority >= Priority::NUMBER) { 744 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error."); 745 return false; 746 } 747 } 748 749 napi_unwrap(env, args[1], reinterpret_cast<void**>(&task)); 750 if (task == nullptr) { 751 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of second param must be task"); 752 return false; 753 } 754 if (!task->CanExecuteDelayed(env)) { 755 return false; 756 } 757 return true; 758} 759 760bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period, 761 uint32_t &priority, Task* &periodicTask) 762{ 763 size_t argc = 3; // 3 : period, task, priority 764 napi_value args[3]; // 3 : period, task, priority 765 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr); 766 if (argc < 2 || argc > 3) { // 2 : period, task and 3 : period, task, priority 767 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three."); 768 return false; 769 } 770 if (!NapiHelper::IsNumber(env, args[0])) { 771 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number."); 772 return false; 773 } 774 period = NapiHelper::GetInt32Value(env, args[0]); 775 if (period < 0) { 776 ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The period value is less than zero."); 777 return false; 778 } 779 if (!NapiHelper::IsObject(env, args[1])) { 780 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task."); 781 return false; 782 } 783 784 if (argc >= 3) { // 3 : third param maybe priority 785 if (!NapiHelper::IsNumber(env, args[2])) { // 2 : priority 786 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the third param must be priority."); 787 return false; 788 } 789 priority = NapiHelper::GetUint32Value(env, args[2]); // 2 : priority 790 if (priority >= Priority::NUMBER) { 791 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the value of the priority is invalid."); 792 return false; 793 } 794 } 795 796 napi_unwrap(env, args[1], reinterpret_cast<void**>(&periodicTask)); 797 if (periodicTask == nullptr) { 798 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task."); 799 return false; 800 } 801 802 return true; 803} 804} // namespace Commonlibrary::Concurrent::TaskPoolModule 805