14d6c458bSopenharmony_ci/* 24d6c458bSopenharmony_ci * Copyright (c) 2022 Huawei Device Co., Ltd. 34d6c458bSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 44d6c458bSopenharmony_ci * you may not use this file except in compliance with the License. 54d6c458bSopenharmony_ci * You may obtain a copy of the License at 64d6c458bSopenharmony_ci * 74d6c458bSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 84d6c458bSopenharmony_ci * 94d6c458bSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 104d6c458bSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 114d6c458bSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 124d6c458bSopenharmony_ci * See the License for the specific language governing permissions and 134d6c458bSopenharmony_ci * limitations under the License. 144d6c458bSopenharmony_ci */ 154d6c458bSopenharmony_ci 164d6c458bSopenharmony_ci#include "worker.h" 174d6c458bSopenharmony_ci 184d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT) 194d6c458bSopenharmony_ci#include "c/executor_task.h" 204d6c458bSopenharmony_ci#include "ffrt_inner.h" 214d6c458bSopenharmony_ci#endif 224d6c458bSopenharmony_ci#include "commonlibrary/ets_utils/js_sys_module/timer/timer.h" 234d6c458bSopenharmony_ci#include "helper/hitrace_helper.h" 244d6c458bSopenharmony_ci#include "process_helper.h" 254d6c458bSopenharmony_ci#include "task_group.h" 264d6c458bSopenharmony_ci#include "task_manager.h" 274d6c458bSopenharmony_ci#include "taskpool.h" 284d6c458bSopenharmony_ci#include "tools/log.h" 294d6c458bSopenharmony_ci 304d6c458bSopenharmony_cinamespace Commonlibrary::Concurrent::TaskPoolModule { 314d6c458bSopenharmony_ciusing namespace OHOS::JsSysModule; 324d6c458bSopenharmony_ciusing namespace Commonlibrary::Platform; 334d6c458bSopenharmony_ci 344d6c458bSopenharmony_ciWorker::PriorityScope::PriorityScope(Worker* worker, Priority taskPriority) : worker_(worker) 354d6c458bSopenharmony_ci{ 364d6c458bSopenharmony_ci if (taskPriority != worker->priority_) { 374d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: reset worker priority to match task priority"); 384d6c458bSopenharmony_ci if (TaskManager::GetInstance().EnableFfrt()) { 394d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT) 404d6c458bSopenharmony_ci if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(taskPriority)) != 0) { 414d6c458bSopenharmony_ci SetWorkerPriority(taskPriority); 424d6c458bSopenharmony_ci } 434d6c458bSopenharmony_ci#endif 444d6c458bSopenharmony_ci } else { 454d6c458bSopenharmony_ci SetWorkerPriority(taskPriority); 464d6c458bSopenharmony_ci } 474d6c458bSopenharmony_ci worker->priority_ = taskPriority; 484d6c458bSopenharmony_ci } 494d6c458bSopenharmony_ci} 504d6c458bSopenharmony_ci 514d6c458bSopenharmony_ciWorker::RunningScope::~RunningScope() 524d6c458bSopenharmony_ci{ 534d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: RunningScope destruction"); 544d6c458bSopenharmony_ci if (scope_ != nullptr) { 554d6c458bSopenharmony_ci napi_close_handle_scope(worker_->workerEnv_, scope_); 564d6c458bSopenharmony_ci } 574d6c458bSopenharmony_ci worker_->NotifyIdle(); 584d6c458bSopenharmony_ci worker_->idleState_ = true; 594d6c458bSopenharmony_ci} 604d6c458bSopenharmony_ci 614d6c458bSopenharmony_ciWorker* Worker::WorkerConstructor(napi_env env) 624d6c458bSopenharmony_ci{ 634d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME("TaskWorkerConstructor: [Add Thread]"); 644d6c458bSopenharmony_ci Worker* worker = new Worker(env); 654d6c458bSopenharmony_ci worker->StartExecuteInThread(); 664d6c458bSopenharmony_ci return worker; 674d6c458bSopenharmony_ci} 684d6c458bSopenharmony_ci 694d6c458bSopenharmony_civoid Worker::CloseHandles() 704d6c458bSopenharmony_ci{ 714d6c458bSopenharmony_ci // set all handles to nullptr so that they can not be used even when the loop is re-running 724d6c458bSopenharmony_ci ConcurrentHelper::UvHandleClose(performTaskSignal_); 734d6c458bSopenharmony_ci performTaskSignal_ = nullptr; 744d6c458bSopenharmony_ci#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 754d6c458bSopenharmony_ci ConcurrentHelper::UvHandleClose(debuggerOnPostTaskSignal_); 764d6c458bSopenharmony_ci debuggerOnPostTaskSignal_ = nullptr; 774d6c458bSopenharmony_ci#endif 784d6c458bSopenharmony_ci ConcurrentHelper::UvHandleClose(clearWorkerSignal_); 794d6c458bSopenharmony_ci clearWorkerSignal_ = nullptr; 804d6c458bSopenharmony_ci ConcurrentHelper::UvHandleClose(triggerGCCheckSignal_); 814d6c458bSopenharmony_ci triggerGCCheckSignal_ = nullptr; 824d6c458bSopenharmony_ci} 834d6c458bSopenharmony_ci 844d6c458bSopenharmony_civoid Worker::ReleaseWorkerHandles(const uv_async_t* req) 854d6c458bSopenharmony_ci{ 864d6c458bSopenharmony_ci auto worker = static_cast<Worker*>(req->data); 874d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: enter the worker loop and try to release thread: %{public}d", worker->tid_); 884d6c458bSopenharmony_ci if (!worker->CheckFreeConditions()) { 894d6c458bSopenharmony_ci return; 904d6c458bSopenharmony_ci } 914d6c458bSopenharmony_ci 924d6c458bSopenharmony_ci TaskManager::GetInstance().RemoveWorker(worker); 934d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]"); 944d6c458bSopenharmony_ci HILOG_INFO("taskpool:: the thread is idle and will be released, and the total num is %{public}u now", 954d6c458bSopenharmony_ci TaskManager::GetInstance().GetThreadNum()); 964d6c458bSopenharmony_ci // when there is no active handle, worker loop will stop automatically. 974d6c458bSopenharmony_ci worker->CloseHandles(); 984d6c458bSopenharmony_ci 994d6c458bSopenharmony_ci uv_loop_t* loop = worker->GetWorkerLoop(); 1004d6c458bSopenharmony_ci if (loop != nullptr) { 1014d6c458bSopenharmony_ci uv_stop(loop); 1024d6c458bSopenharmony_ci } 1034d6c458bSopenharmony_ci} 1044d6c458bSopenharmony_ci 1054d6c458bSopenharmony_cibool Worker::CheckFreeConditions() 1064d6c458bSopenharmony_ci{ 1074d6c458bSopenharmony_ci auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_); 1084d6c458bSopenharmony_ci // only when all conditions are met can the worker be freed 1094d6c458bSopenharmony_ci if (HasRunningTasks()) { 1104d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: async callbacks may exist, the worker thread will not exit"); 1114d6c458bSopenharmony_ci } else if (workerEngine->HasListeningCounter()) { 1124d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: listening operation exists, the worker thread will not exit"); 1134d6c458bSopenharmony_ci } else if (Timer::HasTimer(workerEnv_)) { 1144d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: timer exists, the worker thread will not exit"); 1154d6c458bSopenharmony_ci } else if (workerEngine->HasWaitingRequest()) { 1164d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: waiting request exists, the worker thread will not exit"); 1174d6c458bSopenharmony_ci } else if (workerEngine->HasSubEnv()) { 1184d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: sub env exists, the worker thread will not exit"); 1194d6c458bSopenharmony_ci } else if (workerEngine->HasPendingJob()) { 1204d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: pending job exists, the worker thread will not exit"); 1214d6c458bSopenharmony_ci } else if (workerEngine->IsProfiling()) { 1224d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: the worker thread will not exit during profiling"); 1234d6c458bSopenharmony_ci } else { 1244d6c458bSopenharmony_ci return true; 1254d6c458bSopenharmony_ci } 1264d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: the worker %{public}d can't be released due to not meeting the conditions", tid_); 1274d6c458bSopenharmony_ci TaskManager& taskManager = TaskManager::GetInstance(); 1284d6c458bSopenharmony_ci taskManager.RestoreWorker(this); 1294d6c458bSopenharmony_ci taskManager.CountTraceForWorker(); 1304d6c458bSopenharmony_ci return false; 1314d6c458bSopenharmony_ci} 1324d6c458bSopenharmony_ci 1334d6c458bSopenharmony_civoid Worker::StartExecuteInThread() 1344d6c458bSopenharmony_ci{ 1354d6c458bSopenharmony_ci if (!runner_) { 1364d6c458bSopenharmony_ci runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this)); 1374d6c458bSopenharmony_ci } 1384d6c458bSopenharmony_ci if (runner_) { 1394d6c458bSopenharmony_ci runner_->Execute(); // start a new thread 1404d6c458bSopenharmony_ci } else { 1414d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: runner_ is nullptr"); 1424d6c458bSopenharmony_ci } 1434d6c458bSopenharmony_ci} 1444d6c458bSopenharmony_ci 1454d6c458bSopenharmony_ci#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 1464d6c458bSopenharmony_civoid Worker::HandleDebuggerTask(const uv_async_t* req) 1474d6c458bSopenharmony_ci{ 1484d6c458bSopenharmony_ci Worker* worker = reinterpret_cast<Worker*>(req->data); 1494d6c458bSopenharmony_ci if (worker == nullptr) { 1504d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: worker is null"); 1514d6c458bSopenharmony_ci return; 1524d6c458bSopenharmony_ci } 1534d6c458bSopenharmony_ci worker->debuggerMutex_.lock(); 1544d6c458bSopenharmony_ci auto task = std::move(worker->debuggerQueue_.front()); 1554d6c458bSopenharmony_ci worker->debuggerQueue_.pop(); 1564d6c458bSopenharmony_ci worker->debuggerMutex_.unlock(); 1574d6c458bSopenharmony_ci task(); 1584d6c458bSopenharmony_ci} 1594d6c458bSopenharmony_ci 1604d6c458bSopenharmony_civoid Worker::DebuggerOnPostTask(std::function<void()>&& task) 1614d6c458bSopenharmony_ci{ 1624d6c458bSopenharmony_ci if (debuggerOnPostTaskSignal_ != nullptr && !uv_is_closing( 1634d6c458bSopenharmony_ci reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_))) { 1644d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(debuggerMutex_); 1654d6c458bSopenharmony_ci debuggerQueue_.push(std::move(task)); 1664d6c458bSopenharmony_ci uv_async_send(debuggerOnPostTaskSignal_); 1674d6c458bSopenharmony_ci } 1684d6c458bSopenharmony_ci} 1694d6c458bSopenharmony_ci#endif 1704d6c458bSopenharmony_ci 1714d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT) 1724d6c458bSopenharmony_civoid Worker::InitFfrtInfo() 1734d6c458bSopenharmony_ci{ 1744d6c458bSopenharmony_ci if (TaskManager::GetInstance().EnableFfrt()) { 1754d6c458bSopenharmony_ci static const std::map<int, Priority> FFRTQOS_WORKERPRIORITY_MAP = { 1764d6c458bSopenharmony_ci {ffrt::qos_background, Priority::IDLE}, 1774d6c458bSopenharmony_ci {ffrt::qos_utility, Priority::LOW}, 1784d6c458bSopenharmony_ci {ffrt::qos_default, Priority::DEFAULT}, 1794d6c458bSopenharmony_ci {ffrt::qos_user_initiated, Priority::HIGH}, 1804d6c458bSopenharmony_ci }; 1814d6c458bSopenharmony_ci ffrt_qos_t qos = ffrt_this_task_get_qos(); 1824d6c458bSopenharmony_ci priority_ = FFRTQOS_WORKERPRIORITY_MAP.at(qos); 1834d6c458bSopenharmony_ci ffrtTaskHandle_ = ffrt_get_cur_task(); 1844d6c458bSopenharmony_ci } 1854d6c458bSopenharmony_ci} 1864d6c458bSopenharmony_ci 1874d6c458bSopenharmony_civoid Worker::InitLoopHandleNum() 1884d6c458bSopenharmony_ci{ 1894d6c458bSopenharmony_ci if (ffrtTaskHandle_ == nullptr) { 1904d6c458bSopenharmony_ci return; 1914d6c458bSopenharmony_ci } 1924d6c458bSopenharmony_ci 1934d6c458bSopenharmony_ci uv_loop_t* loop = GetWorkerLoop(); 1944d6c458bSopenharmony_ci if (loop != nullptr) { 1954d6c458bSopenharmony_ci initActiveHandleNum_ = loop->active_handles; 1964d6c458bSopenharmony_ci } else { 1974d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: worker loop is nullptr when init loop handle num."); 1984d6c458bSopenharmony_ci } 1994d6c458bSopenharmony_ci} 2004d6c458bSopenharmony_ci 2014d6c458bSopenharmony_cibool Worker::IsLoopActive() 2024d6c458bSopenharmony_ci{ 2034d6c458bSopenharmony_ci uv_loop_t* loop = GetWorkerLoop(); 2044d6c458bSopenharmony_ci if (loop != nullptr) { 2054d6c458bSopenharmony_ci return uv_loop_alive_taskpool(loop, initActiveHandleNum_); 2064d6c458bSopenharmony_ci } else { 2074d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: worker loop is nullptr when judge loop alive."); 2084d6c458bSopenharmony_ci return false; 2094d6c458bSopenharmony_ci } 2104d6c458bSopenharmony_ci} 2114d6c458bSopenharmony_ci 2124d6c458bSopenharmony_ciuint64_t Worker::GetWaitTime() 2134d6c458bSopenharmony_ci{ 2144d6c458bSopenharmony_ci return ffrt_epoll_get_wait_time(ffrtTaskHandle_); 2154d6c458bSopenharmony_ci} 2164d6c458bSopenharmony_ci#endif 2174d6c458bSopenharmony_ci 2184d6c458bSopenharmony_civoid Worker::ExecuteInThread(const void* data) 2194d6c458bSopenharmony_ci{ 2204d6c458bSopenharmony_ci HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__); 2214d6c458bSopenharmony_ci auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data)); 2224d6c458bSopenharmony_ci { 2234d6c458bSopenharmony_ci napi_create_runtime(worker->hostEnv_, &worker->workerEnv_); 2244d6c458bSopenharmony_ci if (worker->workerEnv_ == nullptr) { 2254d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: worker create runtime failed"); 2264d6c458bSopenharmony_ci return; 2274d6c458bSopenharmony_ci } 2284d6c458bSopenharmony_ci auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_); 2294d6c458bSopenharmony_ci // mark worker env is taskpoolThread 2304d6c458bSopenharmony_ci workerEngine->MarkTaskPoolThread(); 2314d6c458bSopenharmony_ci workerEngine->InitTaskPoolThread(worker->workerEnv_, Worker::TaskResultCallback); 2324d6c458bSopenharmony_ci } 2334d6c458bSopenharmony_ci uv_loop_t* loop = worker->GetWorkerLoop(); 2344d6c458bSopenharmony_ci if (loop == nullptr) { 2354d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: loop is nullptr"); 2364d6c458bSopenharmony_ci return; 2374d6c458bSopenharmony_ci } 2384d6c458bSopenharmony_ci // save the worker tid 2394d6c458bSopenharmony_ci worker->tid_ = GetThreadId(); 2404d6c458bSopenharmony_ci 2414d6c458bSopenharmony_ci // Init worker task execute signal 2424d6c458bSopenharmony_ci ConcurrentHelper::UvHandleInit(loop, worker->performTaskSignal_, Worker::PerformTask, worker); 2434d6c458bSopenharmony_ci ConcurrentHelper::UvHandleInit(loop, worker->clearWorkerSignal_, Worker::ReleaseWorkerHandles, worker); 2444d6c458bSopenharmony_ci ConcurrentHelper::UvHandleInit(loop, worker->triggerGCCheckSignal_, Worker::TriggerGCCheck, worker); 2454d6c458bSopenharmony_ci 2464d6c458bSopenharmony_ci HITRACE_HELPER_FINISH_TRACE; 2474d6c458bSopenharmony_ci#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 2484d6c458bSopenharmony_ci // Init debugger task post signal 2494d6c458bSopenharmony_ci ConcurrentHelper::UvHandleInit(loop, worker->debuggerOnPostTaskSignal_, Worker::HandleDebuggerTask, worker); 2504d6c458bSopenharmony_ci#endif 2514d6c458bSopenharmony_ci if (worker->PrepareForWorkerInstance()) { 2524d6c458bSopenharmony_ci // Call after uv_async_init 2534d6c458bSopenharmony_ci worker->NotifyWorkerCreated(); 2544d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT) 2554d6c458bSopenharmony_ci worker->InitFfrtInfo(); 2564d6c458bSopenharmony_ci worker->InitLoopHandleNum(); 2574d6c458bSopenharmony_ci#endif 2584d6c458bSopenharmony_ci worker->RunLoop(); 2594d6c458bSopenharmony_ci } else { 2604d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail"); 2614d6c458bSopenharmony_ci } 2624d6c458bSopenharmony_ci TaskManager::GetInstance().RemoveWorker(worker); 2634d6c458bSopenharmony_ci TaskManager::GetInstance().CountTraceForWorker(); 2644d6c458bSopenharmony_ci worker->ReleaseWorkerThreadContent(); 2654d6c458bSopenharmony_ci delete worker; 2664d6c458bSopenharmony_ci worker = nullptr; 2674d6c458bSopenharmony_ci} 2684d6c458bSopenharmony_ci 2694d6c458bSopenharmony_cibool Worker::PrepareForWorkerInstance() 2704d6c458bSopenharmony_ci{ 2714d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 2724d6c458bSopenharmony_ci auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_); 2734d6c458bSopenharmony_ci#if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) 2744d6c458bSopenharmony_ci workerEngine->SetDebuggerPostTaskFunc([this](std::function<void()>&& task) { 2754d6c458bSopenharmony_ci this->DebuggerOnPostTask(std::move(task)); 2764d6c458bSopenharmony_ci }); 2774d6c458bSopenharmony_ci#endif 2784d6c458bSopenharmony_ci if (!workerEngine->CallInitWorkerFunc(workerEngine)) { 2794d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail"); 2804d6c458bSopenharmony_ci return false; 2814d6c458bSopenharmony_ci } 2824d6c458bSopenharmony_ci // register timer interface 2834d6c458bSopenharmony_ci Timer::RegisterTime(workerEnv_); 2844d6c458bSopenharmony_ci 2854d6c458bSopenharmony_ci // Check exception after worker construction 2864d6c458bSopenharmony_ci if (NapiHelper::IsExceptionPending(workerEnv_)) { 2874d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: Worker construction occur exception"); 2884d6c458bSopenharmony_ci return false; 2894d6c458bSopenharmony_ci } 2904d6c458bSopenharmony_ci return true; 2914d6c458bSopenharmony_ci} 2924d6c458bSopenharmony_ci 2934d6c458bSopenharmony_civoid Worker::ReleaseWorkerThreadContent() 2944d6c458bSopenharmony_ci{ 2954d6c458bSopenharmony_ci auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_); 2964d6c458bSopenharmony_ci auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_); 2974d6c458bSopenharmony_ci if (workerEngine == nullptr) { 2984d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: workerEngine is nullptr"); 2994d6c458bSopenharmony_ci return; 3004d6c458bSopenharmony_ci } 3014d6c458bSopenharmony_ci if (hostEngine != nullptr) { 3024d6c458bSopenharmony_ci if (!hostEngine->DeleteWorker(workerEngine)) { 3034d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: DeleteWorker fail"); 3044d6c458bSopenharmony_ci } 3054d6c458bSopenharmony_ci } 3064d6c458bSopenharmony_ci if (state_ == WorkerState::BLOCKED) { 3074d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME("Thread Timeout Exit"); 3084d6c458bSopenharmony_ci } else { 3094d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME("Thread Exit"); 3104d6c458bSopenharmony_ci } 3114d6c458bSopenharmony_ci 3124d6c458bSopenharmony_ci Timer::ClearEnvironmentTimer(workerEnv_); 3134d6c458bSopenharmony_ci // 2. delete NativeEngine created in worker thread 3144d6c458bSopenharmony_ci if (!workerEngine->CallOffWorkerFunc(workerEngine)) { 3154d6c458bSopenharmony_ci HILOG_ERROR("worker:: CallOffWorkerFunc error"); 3164d6c458bSopenharmony_ci } 3174d6c458bSopenharmony_ci delete workerEngine; 3184d6c458bSopenharmony_ci workerEnv_ = nullptr; 3194d6c458bSopenharmony_ci} 3204d6c458bSopenharmony_ci 3214d6c458bSopenharmony_civoid Worker::NotifyExecuteTask() 3224d6c458bSopenharmony_ci{ 3234d6c458bSopenharmony_ci if (LIKELY(performTaskSignal_ != nullptr && !uv_is_closing(reinterpret_cast<uv_handle_t*>(performTaskSignal_)))) { 3244d6c458bSopenharmony_ci uv_async_send(performTaskSignal_); 3254d6c458bSopenharmony_ci } 3264d6c458bSopenharmony_ci} 3274d6c458bSopenharmony_ci 3284d6c458bSopenharmony_civoid Worker::NotifyIdle() 3294d6c458bSopenharmony_ci{ 3304d6c458bSopenharmony_ci TaskManager::GetInstance().NotifyWorkerIdle(this); 3314d6c458bSopenharmony_ci} 3324d6c458bSopenharmony_ci 3334d6c458bSopenharmony_civoid Worker::NotifyWorkerCreated() 3344d6c458bSopenharmony_ci{ 3354d6c458bSopenharmony_ci TaskManager::GetInstance().NotifyWorkerCreated(this); 3364d6c458bSopenharmony_ci} 3374d6c458bSopenharmony_ci 3384d6c458bSopenharmony_civoid Worker::NotifyTaskBegin() 3394d6c458bSopenharmony_ci{ 3404d6c458bSopenharmony_ci auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_); 3414d6c458bSopenharmony_ci workerEngine->NotifyTaskBegin(); 3424d6c458bSopenharmony_ci} 3434d6c458bSopenharmony_ci 3444d6c458bSopenharmony_civoid Worker::TriggerGCCheck(const uv_async_t* req) 3454d6c458bSopenharmony_ci{ 3464d6c458bSopenharmony_ci if (req == nullptr || req->data == nullptr) { 3474d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: req handle is invalid"); 3484d6c458bSopenharmony_ci return; 3494d6c458bSopenharmony_ci } 3504d6c458bSopenharmony_ci auto worker = reinterpret_cast<Worker*>(req->data); 3514d6c458bSopenharmony_ci auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_); 3524d6c458bSopenharmony_ci workerEngine->NotifyTaskFinished(); 3534d6c458bSopenharmony_ci} 3544d6c458bSopenharmony_ci 3554d6c458bSopenharmony_civoid Worker::NotifyTaskFinished() 3564d6c458bSopenharmony_ci{ 3574d6c458bSopenharmony_ci // trigger gc check by uv and return immediately if the handle is invalid 3584d6c458bSopenharmony_ci if (UNLIKELY(triggerGCCheckSignal_ == nullptr || uv_is_closing( 3594d6c458bSopenharmony_ci reinterpret_cast<uv_handle_t*>(triggerGCCheckSignal_)))) { 3604d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: triggerGCCheckSignal_ is nullptr or closed"); 3614d6c458bSopenharmony_ci return; 3624d6c458bSopenharmony_ci } else { 3634d6c458bSopenharmony_ci uv_async_send(triggerGCCheckSignal_); 3644d6c458bSopenharmony_ci } 3654d6c458bSopenharmony_ci 3664d6c458bSopenharmony_ci auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_); 3674d6c458bSopenharmony_ci if (--runningCount_ != 0 || workerEngine->HasPendingJob()) { 3684d6c458bSopenharmony_ci // the worker state is still RUNNING and the start time will be updated 3694d6c458bSopenharmony_ci startTime_ = ConcurrentHelper::GetMilliseconds(); 3704d6c458bSopenharmony_ci } else { 3714d6c458bSopenharmony_ci UpdateWorkerState(WorkerState::RUNNING, WorkerState::IDLE); 3724d6c458bSopenharmony_ci } 3734d6c458bSopenharmony_ci idlePoint_ = ConcurrentHelper::GetMilliseconds(); 3744d6c458bSopenharmony_ci} 3754d6c458bSopenharmony_ci 3764d6c458bSopenharmony_cibool Worker::UpdateWorkerState(WorkerState expect, WorkerState desired) 3774d6c458bSopenharmony_ci{ 3784d6c458bSopenharmony_ci return state_.compare_exchange_strong(expect, desired); 3794d6c458bSopenharmony_ci} 3804d6c458bSopenharmony_ci 3814d6c458bSopenharmony_civoid Worker::PerformTask(const uv_async_t* req) 3824d6c458bSopenharmony_ci{ 3834d6c458bSopenharmony_ci uint64_t startTime = ConcurrentHelper::GetMilliseconds(); 3844d6c458bSopenharmony_ci auto worker = static_cast<Worker*>(req->data); 3854d6c458bSopenharmony_ci napi_env env = worker->workerEnv_; 3864d6c458bSopenharmony_ci TaskManager::GetInstance().NotifyWorkerRunning(worker); 3874d6c458bSopenharmony_ci auto taskInfo = TaskManager::GetInstance().DequeueTaskId(); 3884d6c458bSopenharmony_ci if (taskInfo.first == 0) { 3894d6c458bSopenharmony_ci worker->NotifyIdle(); 3904d6c458bSopenharmony_ci return; 3914d6c458bSopenharmony_ci } 3924d6c458bSopenharmony_ci RunningScope runningScope(worker); 3934d6c458bSopenharmony_ci PriorityScope priorityScope(worker, taskInfo.second); 3944d6c458bSopenharmony_ci Task* task = TaskManager::GetInstance().GetTask(taskInfo.first); 3954d6c458bSopenharmony_ci if (task == nullptr) { 3964d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task has been released"); 3974d6c458bSopenharmony_ci return; 3984d6c458bSopenharmony_ci } else if (!task->IsValid() && task->ShouldDeleteTask(false)) { 3994d6c458bSopenharmony_ci HILOG_WARN("taskpool:: task is invalid"); 4004d6c458bSopenharmony_ci delete task; 4014d6c458bSopenharmony_ci return; 4024d6c458bSopenharmony_ci } 4034d6c458bSopenharmony_ci // try to record the memory data for gc 4044d6c458bSopenharmony_ci worker->NotifyTaskBegin(); 4054d6c458bSopenharmony_ci 4064d6c458bSopenharmony_ci if (!task->UpdateTask(startTime, worker)) { 4074d6c458bSopenharmony_ci worker->NotifyTaskFinished(); 4084d6c458bSopenharmony_ci return; 4094d6c458bSopenharmony_ci } 4104d6c458bSopenharmony_ci if (task->IsGroupTask() && (!TaskGroupManager::GetInstance().UpdateGroupState(task->groupId_))) { 4114d6c458bSopenharmony_ci return; 4124d6c458bSopenharmony_ci } 4134d6c458bSopenharmony_ci if (task->IsLongTask()) { 4144d6c458bSopenharmony_ci worker->UpdateLongTaskInfo(task); 4154d6c458bSopenharmony_ci } 4164d6c458bSopenharmony_ci worker->StoreTaskId(task->taskId_); 4174d6c458bSopenharmony_ci // tag for trace parse: Task Perform 4184d6c458bSopenharmony_ci std::string strTrace = "Task Perform: name : " + task->name_ + ", taskId : " + std::to_string(task->taskId_) 4194d6c458bSopenharmony_ci + ", priority : " + std::to_string(taskInfo.second); 4204d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME(strTrace); 4214d6c458bSopenharmony_ci HILOG_INFO("taskpool:: %{public}s", strTrace.c_str()); 4224d6c458bSopenharmony_ci 4234d6c458bSopenharmony_ci napi_value func = nullptr; 4244d6c458bSopenharmony_ci napi_value args = nullptr; 4254d6c458bSopenharmony_ci napi_value errorInfo = task->DeserializeValue(env, &func, &args); 4264d6c458bSopenharmony_ci if (UNLIKELY(func == nullptr || args == nullptr)) { 4274d6c458bSopenharmony_ci if (errorInfo != nullptr) { 4284d6c458bSopenharmony_ci worker->NotifyTaskResult(env, task, errorInfo); 4294d6c458bSopenharmony_ci } 4304d6c458bSopenharmony_ci return; 4314d6c458bSopenharmony_ci } 4324d6c458bSopenharmony_ci if (!worker->InitTaskPoolFunc(env, func, task)) { 4334d6c458bSopenharmony_ci return; 4344d6c458bSopenharmony_ci } 4354d6c458bSopenharmony_ci worker->hasExecuted_ = true; 4364d6c458bSopenharmony_ci uint32_t argsNum = NapiHelper::GetArrayLength(env, args); 4374d6c458bSopenharmony_ci napi_value argsArray[argsNum]; 4384d6c458bSopenharmony_ci for (size_t i = 0; i < argsNum; i++) { 4394d6c458bSopenharmony_ci argsArray[i] = NapiHelper::GetElement(env, args, i); 4404d6c458bSopenharmony_ci } 4414d6c458bSopenharmony_ci 4424d6c458bSopenharmony_ci if (!task->CheckStartExecution(taskInfo.second)) { 4434d6c458bSopenharmony_ci if (task->ShouldDeleteTask()) { 4444d6c458bSopenharmony_ci delete task; 4454d6c458bSopenharmony_ci } 4464d6c458bSopenharmony_ci return; 4474d6c458bSopenharmony_ci } 4484d6c458bSopenharmony_ci napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, nullptr); 4494d6c458bSopenharmony_ci auto workerEngine = reinterpret_cast<NativeEngine*>(env); 4504d6c458bSopenharmony_ci workerEngine->ClearCurrentTaskInfo(); 4514d6c458bSopenharmony_ci task->DecreaseRefCount(); 4524d6c458bSopenharmony_ci task->StoreTaskDuration(); 4534d6c458bSopenharmony_ci worker->UpdateExecutedInfo(); 4544d6c458bSopenharmony_ci HandleFunctionException(env, task); 4554d6c458bSopenharmony_ci} 4564d6c458bSopenharmony_ci 4574d6c458bSopenharmony_civoid Worker::NotifyTaskResult(napi_env env, Task* task, napi_value result) 4584d6c458bSopenharmony_ci{ 4594d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 4604d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: NotifyTaskResult task:%{public}s", std::to_string(task->taskId_).c_str()); 4614d6c458bSopenharmony_ci void* resultData = nullptr; 4624d6c458bSopenharmony_ci napi_value undefined = NapiHelper::GetUndefinedValue(env); 4634d6c458bSopenharmony_ci bool defaultTransfer = true; 4644d6c458bSopenharmony_ci bool defaultCloneSendable = false; 4654d6c458bSopenharmony_ci napi_status status = napi_serialize_inner(env, result, undefined, undefined, 4664d6c458bSopenharmony_ci defaultTransfer, defaultCloneSendable, &resultData); 4674d6c458bSopenharmony_ci if ((status != napi_ok || resultData == nullptr) && task->success_) { 4684d6c458bSopenharmony_ci task->success_ = false; 4694d6c458bSopenharmony_ci std::string errMessage = "taskpool: failed to serialize result."; 4704d6c458bSopenharmony_ci HILOG_ERROR("%{public}s", errMessage.c_str()); 4714d6c458bSopenharmony_ci napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str()); 4724d6c458bSopenharmony_ci NotifyTaskResult(env, task, err); 4734d6c458bSopenharmony_ci return; 4744d6c458bSopenharmony_ci } 4754d6c458bSopenharmony_ci task->result_ = resultData; 4764d6c458bSopenharmony_ci NotifyHandleTaskResult(task); 4774d6c458bSopenharmony_ci} 4784d6c458bSopenharmony_ci 4794d6c458bSopenharmony_civoid Worker::NotifyHandleTaskResult(Task* task) 4804d6c458bSopenharmony_ci{ 4814d6c458bSopenharmony_ci if (!task->IsReadyToHandle()) { 4824d6c458bSopenharmony_ci return; 4834d6c458bSopenharmony_ci } 4844d6c458bSopenharmony_ci Worker* worker = reinterpret_cast<Worker*>(task->worker_); 4854d6c458bSopenharmony_ci if (worker != nullptr) { 4864d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_); 4874d6c458bSopenharmony_ci auto iter = std::find(worker->currentTaskId_.begin(), worker->currentTaskId_.end(), task->taskId_); 4884d6c458bSopenharmony_ci if (iter != worker->currentTaskId_.end()) { 4894d6c458bSopenharmony_ci worker->currentTaskId_.erase(iter); 4904d6c458bSopenharmony_ci } 4914d6c458bSopenharmony_ci } else { 4924d6c458bSopenharmony_ci HILOG_FATAL("taskpool:: worker is nullptr"); 4934d6c458bSopenharmony_ci return; 4944d6c458bSopenharmony_ci } 4954d6c458bSopenharmony_ci if (!task->VerifyAndPostResult(worker->priority_)) { 4964d6c458bSopenharmony_ci if (task->ShouldDeleteTask()) { 4974d6c458bSopenharmony_ci delete task; 4984d6c458bSopenharmony_ci } 4994d6c458bSopenharmony_ci } 5004d6c458bSopenharmony_ci worker->NotifyTaskFinished(); 5014d6c458bSopenharmony_ci} 5024d6c458bSopenharmony_ci 5034d6c458bSopenharmony_civoid Worker::TaskResultCallback(napi_env env, napi_value result, bool success, void* data) 5044d6c458bSopenharmony_ci{ 5054d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 5064d6c458bSopenharmony_ci if (env == nullptr) { // LCOV_EXCL_BR_LINE 5074d6c458bSopenharmony_ci HILOG_FATAL("taskpool:: TaskResultCallback engine is null"); 5084d6c458bSopenharmony_ci return; 5094d6c458bSopenharmony_ci } 5104d6c458bSopenharmony_ci if (data == nullptr) { // LCOV_EXCL_BR_LINE 5114d6c458bSopenharmony_ci HILOG_FATAL("taskpool:: data is nullptr"); 5124d6c458bSopenharmony_ci return; 5134d6c458bSopenharmony_ci } 5144d6c458bSopenharmony_ci Task* task = static_cast<Task*>(data); 5154d6c458bSopenharmony_ci auto taskId = reinterpret_cast<uint64_t>(task); 5164d6c458bSopenharmony_ci if (TaskManager::GetInstance().GetTask(taskId) == nullptr) { 5174d6c458bSopenharmony_ci HILOG_FATAL("taskpool:: task is nullptr"); 5184d6c458bSopenharmony_ci return; 5194d6c458bSopenharmony_ci } 5204d6c458bSopenharmony_ci auto worker = static_cast<Worker*>(task->worker_); 5214d6c458bSopenharmony_ci worker->isExecutingLongTask_ = task->IsLongTask(); 5224d6c458bSopenharmony_ci task->DecreaseRefCount(); 5234d6c458bSopenharmony_ci task->ioTime_ = ConcurrentHelper::GetMilliseconds(); 5244d6c458bSopenharmony_ci if (task->cpuTime_ != 0) { 5254d6c458bSopenharmony_ci uint64_t ioDuration = task->ioTime_ - task->startTime_; 5264d6c458bSopenharmony_ci uint64_t cpuDuration = task->cpuTime_ - task->startTime_; 5274d6c458bSopenharmony_ci TaskManager::GetInstance().StoreTaskDuration(task->taskId_, std::max(ioDuration, cpuDuration), cpuDuration); 5284d6c458bSopenharmony_ci } 5294d6c458bSopenharmony_ci task->success_ = success; 5304d6c458bSopenharmony_ci NotifyTaskResult(env, task, result); 5314d6c458bSopenharmony_ci} 5324d6c458bSopenharmony_ci 5334d6c458bSopenharmony_ci// reset qos_user_initiated after perform task 5344d6c458bSopenharmony_civoid Worker::ResetWorkerPriority() 5354d6c458bSopenharmony_ci{ 5364d6c458bSopenharmony_ci if (priority_ != Priority::HIGH) { 5374d6c458bSopenharmony_ci if (TaskManager::GetInstance().EnableFfrt()) { 5384d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT) 5394d6c458bSopenharmony_ci if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(Priority::HIGH)) != 0) { 5404d6c458bSopenharmony_ci SetWorkerPriority(Priority::HIGH); 5414d6c458bSopenharmony_ci } 5424d6c458bSopenharmony_ci#endif 5434d6c458bSopenharmony_ci } else { 5444d6c458bSopenharmony_ci SetWorkerPriority(Priority::HIGH); 5454d6c458bSopenharmony_ci } 5464d6c458bSopenharmony_ci priority_ = Priority::HIGH; 5474d6c458bSopenharmony_ci } 5484d6c458bSopenharmony_ci} 5494d6c458bSopenharmony_ci 5504d6c458bSopenharmony_civoid Worker::StoreTaskId(uint64_t taskId) 5514d6c458bSopenharmony_ci{ 5524d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(currentTaskIdMutex_); 5534d6c458bSopenharmony_ci currentTaskId_.emplace_back(taskId); 5544d6c458bSopenharmony_ci} 5554d6c458bSopenharmony_ci 5564d6c458bSopenharmony_cibool Worker::InitTaskPoolFunc(napi_env env, napi_value func, Task* task) 5574d6c458bSopenharmony_ci{ 5584d6c458bSopenharmony_ci auto workerEngine = reinterpret_cast<NativeEngine*>(env); 5594d6c458bSopenharmony_ci bool success = workerEngine->InitTaskPoolFunc(env, func, task); 5604d6c458bSopenharmony_ci napi_value exception; 5614d6c458bSopenharmony_ci napi_get_and_clear_last_exception(env, &exception); 5624d6c458bSopenharmony_ci if (exception != nullptr) { 5634d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception"); 5644d6c458bSopenharmony_ci task->success_ = false; 5654d6c458bSopenharmony_ci napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception); 5664d6c458bSopenharmony_ci NotifyTaskResult(env, task, errorEvent); 5674d6c458bSopenharmony_ci return false; 5684d6c458bSopenharmony_ci } 5694d6c458bSopenharmony_ci if (!success) { 5704d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: InitTaskPoolFunc fail"); 5714d6c458bSopenharmony_ci napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR, 5724d6c458bSopenharmony_ci "taskpool:: function may not be concurrent."); 5734d6c458bSopenharmony_ci task->success_ = false; 5744d6c458bSopenharmony_ci NotifyTaskResult(env, task, err); 5754d6c458bSopenharmony_ci return false; 5764d6c458bSopenharmony_ci } 5774d6c458bSopenharmony_ci return true; 5784d6c458bSopenharmony_ci} 5794d6c458bSopenharmony_ci 5804d6c458bSopenharmony_civoid Worker::UpdateExecutedInfo() 5814d6c458bSopenharmony_ci{ 5824d6c458bSopenharmony_ci // if the worker is blocked, just skip 5834d6c458bSopenharmony_ci if (LIKELY(state_ != WorkerState::BLOCKED)) { 5844d6c458bSopenharmony_ci uint64_t duration = ConcurrentHelper::GetMilliseconds() - startTime_; 5854d6c458bSopenharmony_ci TaskManager::GetInstance().UpdateExecutedInfo(duration); 5864d6c458bSopenharmony_ci } 5874d6c458bSopenharmony_ci} 5884d6c458bSopenharmony_ci 5894d6c458bSopenharmony_ci// Only when the worker has no longTask can it be released. 5904d6c458bSopenharmony_civoid Worker::TerminateTask(uint64_t taskId) 5914d6c458bSopenharmony_ci{ 5924d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: TerminateTask task:%{public}s", std::to_string(taskId).c_str()); 5934d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(longMutex_); 5944d6c458bSopenharmony_ci longTasksSet_.erase(taskId); 5954d6c458bSopenharmony_ci if (longTasksSet_.empty()) { 5964d6c458bSopenharmony_ci hasLongTask_ = false; 5974d6c458bSopenharmony_ci } 5984d6c458bSopenharmony_ci} 5994d6c458bSopenharmony_ci 6004d6c458bSopenharmony_ci// to store longTasks' state 6014d6c458bSopenharmony_civoid Worker::UpdateLongTaskInfo(Task* task) 6024d6c458bSopenharmony_ci{ 6034d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: UpdateLongTaskInfo task:%{public}s", std::to_string(task->taskId_).c_str()); 6044d6c458bSopenharmony_ci TaskManager::GetInstance().StoreLongTaskInfo(task->taskId_, this); 6054d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(longMutex_); 6064d6c458bSopenharmony_ci hasLongTask_ = true; 6074d6c458bSopenharmony_ci isExecutingLongTask_ = true; 6084d6c458bSopenharmony_ci longTasksSet_.emplace(task->taskId_); 6094d6c458bSopenharmony_ci} 6104d6c458bSopenharmony_ci 6114d6c458bSopenharmony_cibool Worker::IsExecutingLongTask() 6124d6c458bSopenharmony_ci{ 6134d6c458bSopenharmony_ci return isExecutingLongTask_; 6144d6c458bSopenharmony_ci} 6154d6c458bSopenharmony_ci 6164d6c458bSopenharmony_cibool Worker::HasLongTask() 6174d6c458bSopenharmony_ci{ 6184d6c458bSopenharmony_ci return hasLongTask_; 6194d6c458bSopenharmony_ci} 6204d6c458bSopenharmony_ci 6214d6c458bSopenharmony_civoid Worker::HandleFunctionException(napi_env env, Task* task) 6224d6c458bSopenharmony_ci{ 6234d6c458bSopenharmony_ci napi_value exception; 6244d6c458bSopenharmony_ci napi_get_and_clear_last_exception(env, &exception); 6254d6c458bSopenharmony_ci if (exception != nullptr) { 6264d6c458bSopenharmony_ci HILOG_ERROR("taskpool::PerformTask occur exception"); 6274d6c458bSopenharmony_ci task->DecreaseRefCount(); 6284d6c458bSopenharmony_ci task->success_ = false; 6294d6c458bSopenharmony_ci napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception); 6304d6c458bSopenharmony_ci NotifyTaskResult(env, task, errorEvent); 6314d6c458bSopenharmony_ci return; 6324d6c458bSopenharmony_ci } 6334d6c458bSopenharmony_ci NotifyHandleTaskResult(task); 6344d6c458bSopenharmony_ci} 6354d6c458bSopenharmony_ci 6364d6c458bSopenharmony_civoid Worker::PostReleaseSignal() 6374d6c458bSopenharmony_ci{ 6384d6c458bSopenharmony_ci if (UNLIKELY(clearWorkerSignal_ == nullptr || uv_is_closing( 6394d6c458bSopenharmony_ci reinterpret_cast<uv_handle_t*>(clearWorkerSignal_)))) { 6404d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: clearWorkerSignal_ is nullptr or closed"); 6414d6c458bSopenharmony_ci return; 6424d6c458bSopenharmony_ci } 6434d6c458bSopenharmony_ci uv_async_send(clearWorkerSignal_); 6444d6c458bSopenharmony_ci} 6454d6c458bSopenharmony_ci} // namespace Commonlibrary::Concurrent::TaskPoolModule