1/* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "worker.h" 17 18#if defined(ENABLE_TASKPOOL_FFRT) 19#include "c/executor_task.h" 20#include "ffrt_inner.h" 21#endif 22#include "commonlibrary/ets_utils/js_sys_module/timer/timer.h" 23#include "helper/hitrace_helper.h" 24#include "process_helper.h" 25#include "task_group.h" 26#include "task_manager.h" 27#include "taskpool.h" 28#include "tools/log.h" 29 30namespace Commonlibrary::Concurrent::TaskPoolModule { 31using namespace OHOS::JsSysModule; 32using namespace Commonlibrary::Platform; 33 34Worker::PriorityScope::PriorityScope(Worker* worker, Priority taskPriority) : worker_(worker) 35{ 36 if (taskPriority != worker->priority_) { 37 HILOG_DEBUG("taskpool:: reset worker priority to match task priority"); 38 if (TaskManager::GetInstance().EnableFfrt()) { 39#if defined(ENABLE_TASKPOOL_FFRT) 40 if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(taskPriority)) != 0) { 41 SetWorkerPriority(taskPriority); 42 } 43#endif 44 } else { 45 SetWorkerPriority(taskPriority); 46 } 47 worker->priority_ = taskPriority; 48 } 49} 50 51Worker::RunningScope::~RunningScope() 52{ 53 HILOG_DEBUG("taskpool:: RunningScope destruction"); 54 if (scope_ != nullptr) { 55 napi_close_handle_scope(worker_->workerEnv_, scope_); 56 } 57 worker_->NotifyIdle(); 58 worker_->idleState_ = true; 59} 60 61Worker* Worker::WorkerConstructor(napi_env env) 62{ 63 HITRACE_HELPER_METER_NAME("TaskWorkerConstructor: [Add Thread]"); 64 Worker* worker = new Worker(env); 65 worker->StartExecuteInThread(); 66 return worker; 67} 68 69void Worker::CloseHandles() 70{ 71 // set all handles to nullptr so that they can not be used even when the loop is re-running 72 ConcurrentHelper::UvHandleClose(performTaskSignal_); 73 performTaskSignal_ = nullptr; 74#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 75 ConcurrentHelper::UvHandleClose(debuggerOnPostTaskSignal_); 76 debuggerOnPostTaskSignal_ = nullptr; 77#endif 78 ConcurrentHelper::UvHandleClose(clearWorkerSignal_); 79 clearWorkerSignal_ = nullptr; 80 ConcurrentHelper::UvHandleClose(triggerGCCheckSignal_); 81 triggerGCCheckSignal_ = nullptr; 82} 83 84void Worker::ReleaseWorkerHandles(const uv_async_t* req) 85{ 86 auto worker = static_cast<Worker*>(req->data); 87 HILOG_DEBUG("taskpool:: enter the worker loop and try to release thread: %{public}d", worker->tid_); 88 if (!worker->CheckFreeConditions()) { 89 return; 90 } 91 92 TaskManager::GetInstance().RemoveWorker(worker); 93 HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]"); 94 HILOG_INFO("taskpool:: the thread is idle and will be released, and the total num is %{public}u now", 95 TaskManager::GetInstance().GetThreadNum()); 96 // when there is no active handle, worker loop will stop automatically. 97 worker->CloseHandles(); 98 99 uv_loop_t* loop = worker->GetWorkerLoop(); 100 if (loop != nullptr) { 101 uv_stop(loop); 102 } 103} 104 105bool Worker::CheckFreeConditions() 106{ 107 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_); 108 // only when all conditions are met can the worker be freed 109 if (HasRunningTasks()) { 110 HILOG_DEBUG("taskpool:: async callbacks may exist, the worker thread will not exit"); 111 } else if (workerEngine->HasListeningCounter()) { 112 HILOG_DEBUG("taskpool:: listening operation exists, the worker thread will not exit"); 113 } else if (Timer::HasTimer(workerEnv_)) { 114 HILOG_DEBUG("taskpool:: timer exists, the worker thread will not exit"); 115 } else if (workerEngine->HasWaitingRequest()) { 116 HILOG_DEBUG("taskpool:: waiting request exists, the worker thread will not exit"); 117 } else if (workerEngine->HasSubEnv()) { 118 HILOG_DEBUG("taskpool:: sub env exists, the worker thread will not exit"); 119 } else if (workerEngine->HasPendingJob()) { 120 HILOG_DEBUG("taskpool:: pending job exists, the worker thread will not exit"); 121 } else if (workerEngine->IsProfiling()) { 122 HILOG_DEBUG("taskpool:: the worker thread will not exit during profiling"); 123 } else { 124 return true; 125 } 126 HILOG_DEBUG("taskpool:: the worker %{public}d can't be released due to not meeting the conditions", tid_); 127 TaskManager& taskManager = TaskManager::GetInstance(); 128 taskManager.RestoreWorker(this); 129 taskManager.CountTraceForWorker(); 130 return false; 131} 132 133void Worker::StartExecuteInThread() 134{ 135 if (!runner_) { 136 runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this)); 137 } 138 if (runner_) { 139 runner_->Execute(); // start a new thread 140 } else { 141 HILOG_ERROR("taskpool:: runner_ is nullptr"); 142 } 143} 144 145#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 146void Worker::HandleDebuggerTask(const uv_async_t* req) 147{ 148 Worker* worker = reinterpret_cast<Worker*>(req->data); 149 if (worker == nullptr) { 150 HILOG_ERROR("taskpool:: worker is null"); 151 return; 152 } 153 worker->debuggerMutex_.lock(); 154 auto task = std::move(worker->debuggerQueue_.front()); 155 worker->debuggerQueue_.pop(); 156 worker->debuggerMutex_.unlock(); 157 task(); 158} 159 160void Worker::DebuggerOnPostTask(std::function<void()>&& task) 161{ 162 if (debuggerOnPostTaskSignal_ != nullptr && !uv_is_closing( 163 reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_))) { 164 std::lock_guard<std::mutex> lock(debuggerMutex_); 165 debuggerQueue_.push(std::move(task)); 166 uv_async_send(debuggerOnPostTaskSignal_); 167 } 168} 169#endif 170 171#if defined(ENABLE_TASKPOOL_FFRT) 172void Worker::InitFfrtInfo() 173{ 174 if (TaskManager::GetInstance().EnableFfrt()) { 175 static const std::map<int, Priority> FFRTQOS_WORKERPRIORITY_MAP = { 176 {ffrt::qos_background, Priority::IDLE}, 177 {ffrt::qos_utility, Priority::LOW}, 178 {ffrt::qos_default, Priority::DEFAULT}, 179 {ffrt::qos_user_initiated, Priority::HIGH}, 180 }; 181 ffrt_qos_t qos = ffrt_this_task_get_qos(); 182 priority_ = FFRTQOS_WORKERPRIORITY_MAP.at(qos); 183 ffrtTaskHandle_ = ffrt_get_cur_task(); 184 } 185} 186 187void Worker::InitLoopHandleNum() 188{ 189 if (ffrtTaskHandle_ == nullptr) { 190 return; 191 } 192 193 uv_loop_t* loop = GetWorkerLoop(); 194 if (loop != nullptr) { 195 initActiveHandleNum_ = loop->active_handles; 196 } else { 197 HILOG_ERROR("taskpool:: worker loop is nullptr when init loop handle num."); 198 } 199} 200 201bool Worker::IsLoopActive() 202{ 203 uv_loop_t* loop = GetWorkerLoop(); 204 if (loop != nullptr) { 205 return uv_loop_alive_taskpool(loop, initActiveHandleNum_); 206 } else { 207 HILOG_ERROR("taskpool:: worker loop is nullptr when judge loop alive."); 208 return false; 209 } 210} 211 212uint64_t Worker::GetWaitTime() 213{ 214 return ffrt_epoll_get_wait_time(ffrtTaskHandle_); 215} 216#endif 217 218void Worker::ExecuteInThread(const void* data) 219{ 220 HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__); 221 auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data)); 222 { 223 napi_create_runtime(worker->hostEnv_, &worker->workerEnv_); 224 if (worker->workerEnv_ == nullptr) { 225 HILOG_ERROR("taskpool:: worker create runtime failed"); 226 return; 227 } 228 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_); 229 // mark worker env is taskpoolThread 230 workerEngine->MarkTaskPoolThread(); 231 workerEngine->InitTaskPoolThread(worker->workerEnv_, Worker::TaskResultCallback); 232 } 233 uv_loop_t* loop = worker->GetWorkerLoop(); 234 if (loop == nullptr) { 235 HILOG_ERROR("taskpool:: loop is nullptr"); 236 return; 237 } 238 // save the worker tid 239 worker->tid_ = GetThreadId(); 240 241 // Init worker task execute signal 242 ConcurrentHelper::UvHandleInit(loop, worker->performTaskSignal_, Worker::PerformTask, worker); 243 ConcurrentHelper::UvHandleInit(loop, worker->clearWorkerSignal_, Worker::ReleaseWorkerHandles, worker); 244 ConcurrentHelper::UvHandleInit(loop, worker->triggerGCCheckSignal_, Worker::TriggerGCCheck, worker); 245 246 HITRACE_HELPER_FINISH_TRACE; 247#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 248 // Init debugger task post signal 249 ConcurrentHelper::UvHandleInit(loop, worker->debuggerOnPostTaskSignal_, Worker::HandleDebuggerTask, worker); 250#endif 251 if (worker->PrepareForWorkerInstance()) { 252 // Call after uv_async_init 253 worker->NotifyWorkerCreated(); 254#if defined(ENABLE_TASKPOOL_FFRT) 255 worker->InitFfrtInfo(); 256 worker->InitLoopHandleNum(); 257#endif 258 worker->RunLoop(); 259 } else { 260 HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail"); 261 } 262 TaskManager::GetInstance().RemoveWorker(worker); 263 TaskManager::GetInstance().CountTraceForWorker(); 264 worker->ReleaseWorkerThreadContent(); 265 delete worker; 266 worker = nullptr; 267} 268 269bool Worker::PrepareForWorkerInstance() 270{ 271 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 272 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_); 273#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 274 workerEngine->SetDebuggerPostTaskFunc([this](std::function<void()>&& task) { 275 this->DebuggerOnPostTask(std::move(task)); 276 }); 277#endif 278 if (!workerEngine->CallInitWorkerFunc(workerEngine)) { 279 HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail"); 280 return false; 281 } 282 // register timer interface 283 Timer::RegisterTime(workerEnv_); 284 285 // Check exception after worker construction 286 if (NapiHelper::IsExceptionPending(workerEnv_)) { 287 HILOG_ERROR("taskpool:: Worker construction occur exception"); 288 return false; 289 } 290 return true; 291} 292 293void Worker::ReleaseWorkerThreadContent() 294{ 295 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_); 296 auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_); 297 if (workerEngine == nullptr) { 298 HILOG_ERROR("taskpool:: workerEngine is nullptr"); 299 return; 300 } 301 if (hostEngine != nullptr) { 302 if (!hostEngine->DeleteWorker(workerEngine)) { 303 HILOG_ERROR("taskpool:: DeleteWorker fail"); 304 } 305 } 306 if (state_ == WorkerState::BLOCKED) { 307 HITRACE_HELPER_METER_NAME("Thread Timeout Exit"); 308 } else { 309 HITRACE_HELPER_METER_NAME("Thread Exit"); 310 } 311 312 Timer::ClearEnvironmentTimer(workerEnv_); 313 // 2. delete NativeEngine created in worker thread 314 if (!workerEngine->CallOffWorkerFunc(workerEngine)) { 315 HILOG_ERROR("worker:: CallOffWorkerFunc error"); 316 } 317 delete workerEngine; 318 workerEnv_ = nullptr; 319} 320 321void Worker::NotifyExecuteTask() 322{ 323 if (LIKELY(performTaskSignal_ != nullptr && !uv_is_closing(reinterpret_cast<uv_handle_t*>(performTaskSignal_)))) { 324 uv_async_send(performTaskSignal_); 325 } 326} 327 328void Worker::NotifyIdle() 329{ 330 TaskManager::GetInstance().NotifyWorkerIdle(this); 331} 332 333void Worker::NotifyWorkerCreated() 334{ 335 TaskManager::GetInstance().NotifyWorkerCreated(this); 336} 337 338void Worker::NotifyTaskBegin() 339{ 340 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_); 341 workerEngine->NotifyTaskBegin(); 342} 343 344void Worker::TriggerGCCheck(const uv_async_t* req) 345{ 346 if (req == nullptr || req->data == nullptr) { 347 HILOG_ERROR("taskpool:: req handle is invalid"); 348 return; 349 } 350 auto worker = reinterpret_cast<Worker*>(req->data); 351 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_); 352 workerEngine->NotifyTaskFinished(); 353} 354 355void Worker::NotifyTaskFinished() 356{ 357 // trigger gc check by uv and return immediately if the handle is invalid 358 if (UNLIKELY(triggerGCCheckSignal_ == nullptr || uv_is_closing( 359 reinterpret_cast<uv_handle_t*>(triggerGCCheckSignal_)))) { 360 HILOG_ERROR("taskpool:: triggerGCCheckSignal_ is nullptr or closed"); 361 return; 362 } else { 363 uv_async_send(triggerGCCheckSignal_); 364 } 365 366 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_); 367 if (--runningCount_ != 0 || workerEngine->HasPendingJob()) { 368 // the worker state is still RUNNING and the start time will be updated 369 startTime_ = ConcurrentHelper::GetMilliseconds(); 370 } else { 371 UpdateWorkerState(WorkerState::RUNNING, WorkerState::IDLE); 372 } 373 idlePoint_ = ConcurrentHelper::GetMilliseconds(); 374} 375 376bool Worker::UpdateWorkerState(WorkerState expect, WorkerState desired) 377{ 378 return state_.compare_exchange_strong(expect, desired); 379} 380 381void Worker::PerformTask(const uv_async_t* req) 382{ 383 uint64_t startTime = ConcurrentHelper::GetMilliseconds(); 384 auto worker = static_cast<Worker*>(req->data); 385 napi_env env = worker->workerEnv_; 386 TaskManager::GetInstance().NotifyWorkerRunning(worker); 387 auto taskInfo = TaskManager::GetInstance().DequeueTaskId(); 388 if (taskInfo.first == 0) { 389 worker->NotifyIdle(); 390 return; 391 } 392 RunningScope runningScope(worker); 393 PriorityScope priorityScope(worker, taskInfo.second); 394 Task* task = TaskManager::GetInstance().GetTask(taskInfo.first); 395 if (task == nullptr) { 396 HILOG_DEBUG("taskpool:: task has been released"); 397 return; 398 } else if (!task->IsValid() && task->ShouldDeleteTask(false)) { 399 HILOG_WARN("taskpool:: task is invalid"); 400 delete task; 401 return; 402 } 403 // try to record the memory data for gc 404 worker->NotifyTaskBegin(); 405 406 if (!task->UpdateTask(startTime, worker)) { 407 worker->NotifyTaskFinished(); 408 return; 409 } 410 if (task->IsGroupTask() && (!TaskGroupManager::GetInstance().UpdateGroupState(task->groupId_))) { 411 return; 412 } 413 if (task->IsLongTask()) { 414 worker->UpdateLongTaskInfo(task); 415 } 416 worker->StoreTaskId(task->taskId_); 417 // tag for trace parse: Task Perform 418 std::string strTrace = "Task Perform: name : " + task->name_ + ", taskId : " + std::to_string(task->taskId_) 419 + ", priority : " + std::to_string(taskInfo.second); 420 HITRACE_HELPER_METER_NAME(strTrace); 421 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str()); 422 423 napi_value func = nullptr; 424 napi_value args = nullptr; 425 napi_value errorInfo = task->DeserializeValue(env, &func, &args); 426 if (UNLIKELY(func == nullptr || args == nullptr)) { 427 if (errorInfo != nullptr) { 428 worker->NotifyTaskResult(env, task, errorInfo); 429 } 430 return; 431 } 432 if (!worker->InitTaskPoolFunc(env, func, task)) { 433 return; 434 } 435 worker->hasExecuted_ = true; 436 uint32_t argsNum = NapiHelper::GetArrayLength(env, args); 437 napi_value argsArray[argsNum]; 438 for (size_t i = 0; i < argsNum; i++) { 439 argsArray[i] = NapiHelper::GetElement(env, args, i); 440 } 441 442 if (!task->CheckStartExecution(taskInfo.second)) { 443 if (task->ShouldDeleteTask()) { 444 delete task; 445 } 446 return; 447 } 448 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, nullptr); 449 auto workerEngine = reinterpret_cast<NativeEngine*>(env); 450 workerEngine->ClearCurrentTaskInfo(); 451 task->DecreaseRefCount(); 452 task->StoreTaskDuration(); 453 worker->UpdateExecutedInfo(); 454 HandleFunctionException(env, task); 455} 456 457void Worker::NotifyTaskResult(napi_env env, Task* task, napi_value result) 458{ 459 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 460 HILOG_DEBUG("taskpool:: NotifyTaskResult task:%{public}s", std::to_string(task->taskId_).c_str()); 461 void* resultData = nullptr; 462 napi_value undefined = NapiHelper::GetUndefinedValue(env); 463 bool defaultTransfer = true; 464 bool defaultCloneSendable = false; 465 napi_status status = napi_serialize_inner(env, result, undefined, undefined, 466 defaultTransfer, defaultCloneSendable, &resultData); 467 if ((status != napi_ok || resultData == nullptr) && task->success_) { 468 task->success_ = false; 469 std::string errMessage = "taskpool: failed to serialize result."; 470 HILOG_ERROR("%{public}s", errMessage.c_str()); 471 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str()); 472 NotifyTaskResult(env, task, err); 473 return; 474 } 475 task->result_ = resultData; 476 NotifyHandleTaskResult(task); 477} 478 479void Worker::NotifyHandleTaskResult(Task* task) 480{ 481 if (!task->IsReadyToHandle()) { 482 return; 483 } 484 Worker* worker = reinterpret_cast<Worker*>(task->worker_); 485 if (worker != nullptr) { 486 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_); 487 auto iter = std::find(worker->currentTaskId_.begin(), worker->currentTaskId_.end(), task->taskId_); 488 if (iter != worker->currentTaskId_.end()) { 489 worker->currentTaskId_.erase(iter); 490 } 491 } else { 492 HILOG_FATAL("taskpool:: worker is nullptr"); 493 return; 494 } 495 if (!task->VerifyAndPostResult(worker->priority_)) { 496 if (task->ShouldDeleteTask()) { 497 delete task; 498 } 499 } 500 worker->NotifyTaskFinished(); 501} 502 503void Worker::TaskResultCallback(napi_env env, napi_value result, bool success, void* data) 504{ 505 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 506 if (env == nullptr) { // LCOV_EXCL_BR_LINE 507 HILOG_FATAL("taskpool:: TaskResultCallback engine is null"); 508 return; 509 } 510 if (data == nullptr) { // LCOV_EXCL_BR_LINE 511 HILOG_FATAL("taskpool:: data is nullptr"); 512 return; 513 } 514 Task* task = static_cast<Task*>(data); 515 auto taskId = reinterpret_cast<uint64_t>(task); 516 if (TaskManager::GetInstance().GetTask(taskId) == nullptr) { 517 HILOG_FATAL("taskpool:: task is nullptr"); 518 return; 519 } 520 auto worker = static_cast<Worker*>(task->worker_); 521 worker->isExecutingLongTask_ = task->IsLongTask(); 522 task->DecreaseRefCount(); 523 task->ioTime_ = ConcurrentHelper::GetMilliseconds(); 524 if (task->cpuTime_ != 0) { 525 uint64_t ioDuration = task->ioTime_ - task->startTime_; 526 uint64_t cpuDuration = task->cpuTime_ - task->startTime_; 527 TaskManager::GetInstance().StoreTaskDuration(task->taskId_, std::max(ioDuration, cpuDuration), cpuDuration); 528 } 529 task->success_ = success; 530 NotifyTaskResult(env, task, result); 531} 532 533// reset qos_user_initiated after perform task 534void Worker::ResetWorkerPriority() 535{ 536 if (priority_ != Priority::HIGH) { 537 if (TaskManager::GetInstance().EnableFfrt()) { 538#if defined(ENABLE_TASKPOOL_FFRT) 539 if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(Priority::HIGH)) != 0) { 540 SetWorkerPriority(Priority::HIGH); 541 } 542#endif 543 } else { 544 SetWorkerPriority(Priority::HIGH); 545 } 546 priority_ = Priority::HIGH; 547 } 548} 549 550void Worker::StoreTaskId(uint64_t taskId) 551{ 552 std::lock_guard<std::mutex> lock(currentTaskIdMutex_); 553 currentTaskId_.emplace_back(taskId); 554} 555 556bool Worker::InitTaskPoolFunc(napi_env env, napi_value func, Task* task) 557{ 558 auto workerEngine = reinterpret_cast<NativeEngine*>(env); 559 bool success = workerEngine->InitTaskPoolFunc(env, func, task); 560 napi_value exception; 561 napi_get_and_clear_last_exception(env, &exception); 562 if (exception != nullptr) { 563 HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception"); 564 task->success_ = false; 565 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception); 566 NotifyTaskResult(env, task, errorEvent); 567 return false; 568 } 569 if (!success) { 570 HILOG_ERROR("taskpool:: InitTaskPoolFunc fail"); 571 napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR, 572 "taskpool:: function may not be concurrent."); 573 task->success_ = false; 574 NotifyTaskResult(env, task, err); 575 return false; 576 } 577 return true; 578} 579 580void Worker::UpdateExecutedInfo() 581{ 582 // if the worker is blocked, just skip 583 if (LIKELY(state_ != WorkerState::BLOCKED)) { 584 uint64_t duration = ConcurrentHelper::GetMilliseconds() - startTime_; 585 TaskManager::GetInstance().UpdateExecutedInfo(duration); 586 } 587} 588 589// Only when the worker has no longTask can it be released. 590void Worker::TerminateTask(uint64_t taskId) 591{ 592 HILOG_DEBUG("taskpool:: TerminateTask task:%{public}s", std::to_string(taskId).c_str()); 593 std::lock_guard<std::mutex> lock(longMutex_); 594 longTasksSet_.erase(taskId); 595 if (longTasksSet_.empty()) { 596 hasLongTask_ = false; 597 } 598} 599 600// to store longTasks' state 601void Worker::UpdateLongTaskInfo(Task* task) 602{ 603 HILOG_DEBUG("taskpool:: UpdateLongTaskInfo task:%{public}s", std::to_string(task->taskId_).c_str()); 604 TaskManager::GetInstance().StoreLongTaskInfo(task->taskId_, this); 605 std::lock_guard<std::mutex> lock(longMutex_); 606 hasLongTask_ = true; 607 isExecutingLongTask_ = true; 608 longTasksSet_.emplace(task->taskId_); 609} 610 611bool Worker::IsExecutingLongTask() 612{ 613 return isExecutingLongTask_; 614} 615 616bool Worker::HasLongTask() 617{ 618 return hasLongTask_; 619} 620 621void Worker::HandleFunctionException(napi_env env, Task* task) 622{ 623 napi_value exception; 624 napi_get_and_clear_last_exception(env, &exception); 625 if (exception != nullptr) { 626 HILOG_ERROR("taskpool::PerformTask occur exception"); 627 task->DecreaseRefCount(); 628 task->success_ = false; 629 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception); 630 NotifyTaskResult(env, task, errorEvent); 631 return; 632 } 633 NotifyHandleTaskResult(task); 634} 635 636void Worker::PostReleaseSignal() 637{ 638 if (UNLIKELY(clearWorkerSignal_ == nullptr || uv_is_closing( 639 reinterpret_cast<uv_handle_t*>(clearWorkerSignal_)))) { 640 HILOG_ERROR("taskpool:: clearWorkerSignal_ is nullptr or closed"); 641 return; 642 } 643 uv_async_send(clearWorkerSignal_); 644} 645} // namespace Commonlibrary::Concurrent::TaskPoolModule