14d6c458bSopenharmony_ci/* 24d6c458bSopenharmony_ci * Copyright (c) 2022 Huawei Device Co., Ltd. 34d6c458bSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License"); 44d6c458bSopenharmony_ci * you may not use this file except in compliance with the License. 54d6c458bSopenharmony_ci * You may obtain a copy of the License at 64d6c458bSopenharmony_ci * 74d6c458bSopenharmony_ci * http://www.apache.org/licenses/LICENSE-2.0 84d6c458bSopenharmony_ci * 94d6c458bSopenharmony_ci * Unless required by applicable law or agreed to in writing, software 104d6c458bSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS, 114d6c458bSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 124d6c458bSopenharmony_ci * See the License for the specific language governing permissions and 134d6c458bSopenharmony_ci * limitations under the License. 144d6c458bSopenharmony_ci */ 154d6c458bSopenharmony_ci 164d6c458bSopenharmony_ci#include "task_manager.h" 174d6c458bSopenharmony_ci 184d6c458bSopenharmony_ci#include <cinttypes> 194d6c458bSopenharmony_ci#include <securec.h> 204d6c458bSopenharmony_ci#include <thread> 214d6c458bSopenharmony_ci 224d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT) 234d6c458bSopenharmony_ci#include "bundle_info.h" 244d6c458bSopenharmony_ci#include "bundle_mgr_interface.h" 254d6c458bSopenharmony_ci#include "bundle_mgr_proxy.h" 264d6c458bSopenharmony_ci#include "c/executor_task.h" 274d6c458bSopenharmony_ci#include "ffrt_inner.h" 284d6c458bSopenharmony_ci#include "iservice_registry.h" 294d6c458bSopenharmony_ci#include "parameters.h" 304d6c458bSopenharmony_ci#include "status_receiver_interface.h" 314d6c458bSopenharmony_ci#include "system_ability_definition.h" 324d6c458bSopenharmony_ci#endif 334d6c458bSopenharmony_ci#include "commonlibrary/ets_utils/js_sys_module/timer/timer.h" 344d6c458bSopenharmony_ci#include "helper/concurrent_helper.h" 354d6c458bSopenharmony_ci#include "helper/error_helper.h" 364d6c458bSopenharmony_ci#include "helper/hitrace_helper.h" 374d6c458bSopenharmony_ci#include "taskpool.h" 384d6c458bSopenharmony_ci#include "tools/log.h" 394d6c458bSopenharmony_ci#include "worker.h" 404d6c458bSopenharmony_ci 414d6c458bSopenharmony_cinamespace Commonlibrary::Concurrent::TaskPoolModule { 424d6c458bSopenharmony_ciusing namespace OHOS::JsSysModule; 434d6c458bSopenharmony_ci 444d6c458bSopenharmony_cistatic constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5; 454d6c458bSopenharmony_cistatic constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5; 464d6c458bSopenharmony_cistatic constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms 474d6c458bSopenharmony_cistatic constexpr uint32_t STEP_SIZE = 2; 484d6c458bSopenharmony_cistatic constexpr uint32_t DEFAULT_THREADS = 3; 494d6c458bSopenharmony_cistatic constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle 504d6c458bSopenharmony_cistatic constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min 514d6c458bSopenharmony_cistatic constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min 524d6c458bSopenharmony_cistatic constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s 534d6c458bSopenharmony_cistatic constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s 544d6c458bSopenharmony_cistatic constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time 554d6c458bSopenharmony_ci[[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread 564d6c458bSopenharmony_ci 574d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_EVENTHANDLER) 584d6c458bSopenharmony_cistatic const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = { 594d6c458bSopenharmony_ci {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE}, 604d6c458bSopenharmony_ci {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW}, 614d6c458bSopenharmony_ci {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH}, 624d6c458bSopenharmony_ci {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE}, 634d6c458bSopenharmony_ci}; 644d6c458bSopenharmony_ci#endif 654d6c458bSopenharmony_ci 664d6c458bSopenharmony_ci// ----------------------------------- TaskManager ---------------------------------------- 674d6c458bSopenharmony_ciTaskManager& TaskManager::GetInstance() 684d6c458bSopenharmony_ci{ 694d6c458bSopenharmony_ci static TaskManager manager; 704d6c458bSopenharmony_ci return manager; 714d6c458bSopenharmony_ci} 724d6c458bSopenharmony_ci 734d6c458bSopenharmony_ciTaskManager::TaskManager() 744d6c458bSopenharmony_ci{ 754d6c458bSopenharmony_ci for (size_t i = 0; i < taskQueues_.size(); i++) { 764d6c458bSopenharmony_ci std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>(); 774d6c458bSopenharmony_ci taskQueues_[i] = std::move(taskQueue); 784d6c458bSopenharmony_ci } 794d6c458bSopenharmony_ci} 804d6c458bSopenharmony_ci 814d6c458bSopenharmony_ciTaskManager::~TaskManager() 824d6c458bSopenharmony_ci{ 834d6c458bSopenharmony_ci HILOG_INFO("taskpool:: ~TaskManager"); 844d6c458bSopenharmony_ci if (timer_ == nullptr) { 854d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: timer_ is nullptr"); 864d6c458bSopenharmony_ci } else { 874d6c458bSopenharmony_ci uv_timer_stop(timer_); 884d6c458bSopenharmony_ci ConcurrentHelper::UvHandleClose(timer_); 894d6c458bSopenharmony_ci ConcurrentHelper::UvHandleClose(expandHandle_); 904d6c458bSopenharmony_ci } 914d6c458bSopenharmony_ci 924d6c458bSopenharmony_ci if (loop_ != nullptr) { 934d6c458bSopenharmony_ci uv_stop(loop_); 944d6c458bSopenharmony_ci } 954d6c458bSopenharmony_ci 964d6c458bSopenharmony_ci { 974d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 984d6c458bSopenharmony_ci for (auto& worker : workers_) { 994d6c458bSopenharmony_ci delete worker; 1004d6c458bSopenharmony_ci } 1014d6c458bSopenharmony_ci workers_.clear(); 1024d6c458bSopenharmony_ci } 1034d6c458bSopenharmony_ci 1044d6c458bSopenharmony_ci { 1054d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(callbackMutex_); 1064d6c458bSopenharmony_ci for (auto& [_, callbackPtr] : callbackTable_) { 1074d6c458bSopenharmony_ci if (callbackPtr == nullptr) { 1084d6c458bSopenharmony_ci continue; 1094d6c458bSopenharmony_ci } 1104d6c458bSopenharmony_ci callbackPtr.reset(); 1114d6c458bSopenharmony_ci } 1124d6c458bSopenharmony_ci callbackTable_.clear(); 1134d6c458bSopenharmony_ci } 1144d6c458bSopenharmony_ci 1154d6c458bSopenharmony_ci { 1164d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 1174d6c458bSopenharmony_ci for (auto& [_, task] : tasks_) { 1184d6c458bSopenharmony_ci delete task; 1194d6c458bSopenharmony_ci task = nullptr; 1204d6c458bSopenharmony_ci } 1214d6c458bSopenharmony_ci tasks_.clear(); 1224d6c458bSopenharmony_ci } 1234d6c458bSopenharmony_ci CountTraceForWorker(); 1244d6c458bSopenharmony_ci} 1254d6c458bSopenharmony_ci 1264d6c458bSopenharmony_civoid TaskManager::CountTraceForWorker() 1274d6c458bSopenharmony_ci{ 1284d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 1294d6c458bSopenharmony_ci int64_t threadNum = static_cast<int64_t>(workers_.size()); 1304d6c458bSopenharmony_ci int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size()); 1314d6c458bSopenharmony_ci int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size()); 1324d6c458bSopenharmony_ci HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers); 1334d6c458bSopenharmony_ci HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum); 1344d6c458bSopenharmony_ci HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers); 1354d6c458bSopenharmony_ci HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers); 1364d6c458bSopenharmony_ci} 1374d6c458bSopenharmony_ci 1384d6c458bSopenharmony_cinapi_value TaskManager::GetThreadInfos(napi_env env) 1394d6c458bSopenharmony_ci{ 1404d6c458bSopenharmony_ci napi_value threadInfos = nullptr; 1414d6c458bSopenharmony_ci napi_create_array(env, &threadInfos); 1424d6c458bSopenharmony_ci { 1434d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 1444d6c458bSopenharmony_ci int32_t i = 0; 1454d6c458bSopenharmony_ci for (auto& worker : workers_) { 1464d6c458bSopenharmony_ci if (worker->workerEnv_ == nullptr) { 1474d6c458bSopenharmony_ci continue; 1484d6c458bSopenharmony_ci } 1494d6c458bSopenharmony_ci napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->tid_)); 1504d6c458bSopenharmony_ci napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->priority_)); 1514d6c458bSopenharmony_ci 1524d6c458bSopenharmony_ci napi_value taskId = nullptr; 1534d6c458bSopenharmony_ci napi_create_array(env, &taskId); 1544d6c458bSopenharmony_ci int32_t j = 0; 1554d6c458bSopenharmony_ci { 1564d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_); 1574d6c458bSopenharmony_ci for (auto& currentId : worker->currentTaskId_) { 1584d6c458bSopenharmony_ci napi_value id = NapiHelper::CreateUint32(env, currentId); 1594d6c458bSopenharmony_ci napi_set_element(env, taskId, j, id); 1604d6c458bSopenharmony_ci j++; 1614d6c458bSopenharmony_ci } 1624d6c458bSopenharmony_ci } 1634d6c458bSopenharmony_ci napi_value threadInfo = nullptr; 1644d6c458bSopenharmony_ci napi_create_object(env, &threadInfo); 1654d6c458bSopenharmony_ci napi_set_named_property(env, threadInfo, "tid", tid); 1664d6c458bSopenharmony_ci napi_set_named_property(env, threadInfo, "priority", priority); 1674d6c458bSopenharmony_ci napi_set_named_property(env, threadInfo, "taskIds", taskId); 1684d6c458bSopenharmony_ci napi_set_element(env, threadInfos, i, threadInfo); 1694d6c458bSopenharmony_ci i++; 1704d6c458bSopenharmony_ci } 1714d6c458bSopenharmony_ci } 1724d6c458bSopenharmony_ci return threadInfos; 1734d6c458bSopenharmony_ci} 1744d6c458bSopenharmony_ci 1754d6c458bSopenharmony_cinapi_value TaskManager::GetTaskInfos(napi_env env) 1764d6c458bSopenharmony_ci{ 1774d6c458bSopenharmony_ci napi_value taskInfos = nullptr; 1784d6c458bSopenharmony_ci napi_create_array(env, &taskInfos); 1794d6c458bSopenharmony_ci { 1804d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 1814d6c458bSopenharmony_ci int32_t i = 0; 1824d6c458bSopenharmony_ci for (const auto& [_, task] : tasks_) { 1834d6c458bSopenharmony_ci if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED || 1844d6c458bSopenharmony_ci task->taskState_ == ExecuteState::FINISHED) { 1854d6c458bSopenharmony_ci continue; 1864d6c458bSopenharmony_ci } 1874d6c458bSopenharmony_ci napi_value taskInfoValue = NapiHelper::CreateObject(env); 1884d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_); 1894d6c458bSopenharmony_ci napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_); 1904d6c458bSopenharmony_ci napi_value name = nullptr; 1914d6c458bSopenharmony_ci napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name); 1924d6c458bSopenharmony_ci napi_set_named_property(env, taskInfoValue, "name", name); 1934d6c458bSopenharmony_ci ExecuteState state = task->taskState_; 1944d6c458bSopenharmony_ci uint64_t duration = 0; 1954d6c458bSopenharmony_ci if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) { 1964d6c458bSopenharmony_ci duration = ConcurrentHelper::GetMilliseconds() - task->startTime_; 1974d6c458bSopenharmony_ci } 1984d6c458bSopenharmony_ci napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state)); 1994d6c458bSopenharmony_ci napi_set_named_property(env, taskInfoValue, "taskId", taskId); 2004d6c458bSopenharmony_ci napi_set_named_property(env, taskInfoValue, "state", stateValue); 2014d6c458bSopenharmony_ci napi_value durationValue = NapiHelper::CreateUint32(env, duration); 2024d6c458bSopenharmony_ci napi_set_named_property(env, taskInfoValue, "duration", durationValue); 2034d6c458bSopenharmony_ci napi_set_element(env, taskInfos, i, taskInfoValue); 2044d6c458bSopenharmony_ci i++; 2054d6c458bSopenharmony_ci } 2064d6c458bSopenharmony_ci } 2074d6c458bSopenharmony_ci return taskInfos; 2084d6c458bSopenharmony_ci} 2094d6c458bSopenharmony_ci 2104d6c458bSopenharmony_civoid TaskManager::UpdateExecutedInfo(uint64_t duration) 2114d6c458bSopenharmony_ci{ 2124d6c458bSopenharmony_ci totalExecTime_ += duration; 2134d6c458bSopenharmony_ci totalExecCount_++; 2144d6c458bSopenharmony_ci} 2154d6c458bSopenharmony_ci 2164d6c458bSopenharmony_ciuint32_t TaskManager::ComputeSuitableThreadNum() 2174d6c458bSopenharmony_ci{ 2184d6c458bSopenharmony_ci uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers(); 2194d6c458bSopenharmony_ci return targetNum; 2204d6c458bSopenharmony_ci} 2214d6c458bSopenharmony_ci 2224d6c458bSopenharmony_ciuint32_t TaskManager::ComputeSuitableIdleNum() 2234d6c458bSopenharmony_ci{ 2244d6c458bSopenharmony_ci uint32_t targetNum = 0; 2254d6c458bSopenharmony_ci if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) { 2264d6c458bSopenharmony_ci // this branch is used for avoiding time-consuming tasks that may block the taskpool 2274d6c458bSopenharmony_ci targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum()); 2284d6c458bSopenharmony_ci } else if (totalExecCount_ != 0) { 2294d6c458bSopenharmony_ci auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_; 2304d6c458bSopenharmony_ci uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION); 2314d6c458bSopenharmony_ci targetNum = std::min(result, GetNonIdleTaskNum()); 2324d6c458bSopenharmony_ci } 2334d6c458bSopenharmony_ci return targetNum; 2344d6c458bSopenharmony_ci} 2354d6c458bSopenharmony_ci 2364d6c458bSopenharmony_civoid TaskManager::CheckForBlockedWorkers() 2374d6c458bSopenharmony_ci{ 2384d6c458bSopenharmony_ci // the threshold will be dynamically modified to provide more flexibility in detecting exceptions 2394d6c458bSopenharmony_ci // if the thread num has reached the limit and the idle worker is not available, a short time will be used, 2404d6c458bSopenharmony_ci // else we will choose the longer one 2414d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 2424d6c458bSopenharmony_ci bool needChecking = false; 2434d6c458bSopenharmony_ci bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0); 2444d6c458bSopenharmony_ci uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME; 2454d6c458bSopenharmony_ci for (auto iter = workers_.begin(); iter != workers_.end(); iter++) { 2464d6c458bSopenharmony_ci auto worker = *iter; 2474d6c458bSopenharmony_ci // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout 2484d6c458bSopenharmony_ci // if the worker is executing the longTask, we will not do the check 2494d6c458bSopenharmony_ci if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) || 2504d6c458bSopenharmony_ci (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) || 2514d6c458bSopenharmony_ci !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) { 2524d6c458bSopenharmony_ci continue; 2534d6c458bSopenharmony_ci } 2544d6c458bSopenharmony_ci // When executing the promise task, the worker state may not be updated and will be 2554d6c458bSopenharmony_ci // marked as 'BLOCKED', so we should exclude this situation. 2564d6c458bSopenharmony_ci // Besides, if the worker is not executing sync tasks or micro tasks, it may handle 2574d6c458bSopenharmony_ci // the task like I/O in uv threads, we should also exclude this situation. 2584d6c458bSopenharmony_ci auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_); 2594d6c458bSopenharmony_ci if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) { 2604d6c458bSopenharmony_ci if (!workerEngine->HasWaitingRequest()) { 2614d6c458bSopenharmony_ci worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE); 2624d6c458bSopenharmony_ci } else { 2634d6c458bSopenharmony_ci worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING); 2644d6c458bSopenharmony_ci worker->startTime_ = ConcurrentHelper::GetMilliseconds(); 2654d6c458bSopenharmony_ci } 2664d6c458bSopenharmony_ci continue; 2674d6c458bSopenharmony_ci } 2684d6c458bSopenharmony_ci 2694d6c458bSopenharmony_ci HILOG_INFO("taskpool:: The worker has been marked as timeout."); 2704d6c458bSopenharmony_ci // If the current worker has a longTask and is not executing, we will only interrupt it. 2714d6c458bSopenharmony_ci if (worker->HasLongTask()) { 2724d6c458bSopenharmony_ci continue; 2734d6c458bSopenharmony_ci } 2744d6c458bSopenharmony_ci needChecking = true; 2754d6c458bSopenharmony_ci idleWorkers_.erase(worker); 2764d6c458bSopenharmony_ci timeoutWorkers_.insert(worker); 2774d6c458bSopenharmony_ci } 2784d6c458bSopenharmony_ci // should trigger the check when we have marked and removed workers 2794d6c458bSopenharmony_ci if (UNLIKELY(needChecking)) { 2804d6c458bSopenharmony_ci TryExpand(); 2814d6c458bSopenharmony_ci } 2824d6c458bSopenharmony_ci} 2834d6c458bSopenharmony_ci 2844d6c458bSopenharmony_civoid TaskManager::TryTriggerExpand() 2854d6c458bSopenharmony_ci{ 2864d6c458bSopenharmony_ci // post the signal to notify the monitor thread to expand 2874d6c458bSopenharmony_ci if (UNLIKELY(!isHandleInited_)) { 2884d6c458bSopenharmony_ci NotifyExecuteTask(); 2894d6c458bSopenharmony_ci needChecking_ = true; 2904d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: the expandHandle_ is nullptr"); 2914d6c458bSopenharmony_ci return; 2924d6c458bSopenharmony_ci } 2934d6c458bSopenharmony_ci uv_async_send(expandHandle_); 2944d6c458bSopenharmony_ci} 2954d6c458bSopenharmony_ci 2964d6c458bSopenharmony_ci#if defined(OHOS_PLATFORM) 2974d6c458bSopenharmony_ci// read /proc/[pid]/task/[tid]/stat to get the number of idle threads. 2984d6c458bSopenharmony_cibool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size) 2994d6c458bSopenharmony_ci{ 3004d6c458bSopenharmony_ci char path[128]; // 128: buffer for path 3014d6c458bSopenharmony_ci pid_t pid = getpid(); 3024d6c458bSopenharmony_ci ssize_t bytesLen = -1; 3034d6c458bSopenharmony_ci int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid); 3044d6c458bSopenharmony_ci if (ret < 0) { 3054d6c458bSopenharmony_ci HILOG_ERROR("snprintf_s failed"); 3064d6c458bSopenharmony_ci return false; 3074d6c458bSopenharmony_ci } 3084d6c458bSopenharmony_ci int fd = open(path, O_RDONLY | O_NONBLOCK); 3094d6c458bSopenharmony_ci if (UNLIKELY(fd == -1)) { 3104d6c458bSopenharmony_ci return false; 3114d6c458bSopenharmony_ci } 3124d6c458bSopenharmony_ci bytesLen = read(fd, buf, size - 1); 3134d6c458bSopenharmony_ci close(fd); 3144d6c458bSopenharmony_ci if (bytesLen <= 0) { 3154d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: failed to read %{public}s", path); 3164d6c458bSopenharmony_ci return false; 3174d6c458bSopenharmony_ci } 3184d6c458bSopenharmony_ci buf[bytesLen] = '\0'; 3194d6c458bSopenharmony_ci return true; 3204d6c458bSopenharmony_ci} 3214d6c458bSopenharmony_ci 3224d6c458bSopenharmony_ciuint32_t TaskManager::GetIdleWorkers() 3234d6c458bSopenharmony_ci{ 3244d6c458bSopenharmony_ci char buf[4096]; // 4096: buffer for thread info 3254d6c458bSopenharmony_ci uint32_t idleCount = 0; 3264d6c458bSopenharmony_ci std::unordered_set<pid_t> tids {}; 3274d6c458bSopenharmony_ci { 3284d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 3294d6c458bSopenharmony_ci for (auto& worker : idleWorkers_) { 3304d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT) 3314d6c458bSopenharmony_ci if (worker->ffrtTaskHandle_ != nullptr) { 3324d6c458bSopenharmony_ci if (worker->GetWaitTime() > 0) { 3334d6c458bSopenharmony_ci idleCount++; 3344d6c458bSopenharmony_ci } 3354d6c458bSopenharmony_ci continue; 3364d6c458bSopenharmony_ci } 3374d6c458bSopenharmony_ci#endif 3384d6c458bSopenharmony_ci tids.emplace(worker->tid_); 3394d6c458bSopenharmony_ci } 3404d6c458bSopenharmony_ci } 3414d6c458bSopenharmony_ci // The ffrt thread does not read thread info 3424d6c458bSopenharmony_ci for (auto tid : tids) { 3434d6c458bSopenharmony_ci if (!ReadThreadInfo(tid, buf, sizeof(buf))) { 3444d6c458bSopenharmony_ci continue; 3454d6c458bSopenharmony_ci } 3464d6c458bSopenharmony_ci char state; 3474d6c458bSopenharmony_ci if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state 3484d6c458bSopenharmony_ci HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state); 3494d6c458bSopenharmony_ci return 0; 3504d6c458bSopenharmony_ci } 3514d6c458bSopenharmony_ci if (state == 'S') { 3524d6c458bSopenharmony_ci idleCount++; 3534d6c458bSopenharmony_ci } 3544d6c458bSopenharmony_ci } 3554d6c458bSopenharmony_ci return idleCount; 3564d6c458bSopenharmony_ci} 3574d6c458bSopenharmony_ci 3584d6c458bSopenharmony_civoid TaskManager::GetIdleWorkersList(uint32_t step) 3594d6c458bSopenharmony_ci{ 3604d6c458bSopenharmony_ci char buf[4096]; // 4096: buffer for thread info 3614d6c458bSopenharmony_ci for (auto& worker : idleWorkers_) { 3624d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT) 3634d6c458bSopenharmony_ci if (worker->ffrtTaskHandle_ != nullptr) { 3644d6c458bSopenharmony_ci uint64_t workerWaitTime = worker->GetWaitTime(); 3654d6c458bSopenharmony_ci bool isWorkerLoopActive = worker->IsLoopActive(); 3664d6c458bSopenharmony_ci if (workerWaitTime == 0) { 3674d6c458bSopenharmony_ci continue; 3684d6c458bSopenharmony_ci } 3694d6c458bSopenharmony_ci uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>( 3704d6c458bSopenharmony_ci std::chrono::steady_clock::now().time_since_epoch()).count()); 3714d6c458bSopenharmony_ci if (!isWorkerLoopActive) { 3724d6c458bSopenharmony_ci freeList_.emplace_back(worker); 3734d6c458bSopenharmony_ci } else if ((currTime - workerWaitTime) > IDLE_THRESHOLD * TRIGGER_INTERVAL) { 3744d6c458bSopenharmony_ci freeList_.emplace_back(worker); 3754d6c458bSopenharmony_ci HILOG_INFO("taskpool:: worker in ffrt epoll wait more than 2 intervals, force to free."); 3764d6c458bSopenharmony_ci } else { 3774d6c458bSopenharmony_ci // worker uv alive, and will be free in 2 intervals if not wake 3784d6c458bSopenharmony_ci HILOG_INFO("taskpool:: worker will be free if not wake."); 3794d6c458bSopenharmony_ci } 3804d6c458bSopenharmony_ci continue; 3814d6c458bSopenharmony_ci } 3824d6c458bSopenharmony_ci#endif 3834d6c458bSopenharmony_ci if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) { 3844d6c458bSopenharmony_ci continue; 3854d6c458bSopenharmony_ci } 3864d6c458bSopenharmony_ci char state; 3874d6c458bSopenharmony_ci uint64_t utime; 3884d6c458bSopenharmony_ci if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu", 3894d6c458bSopenharmony_ci &state, sizeof(state), &utime) != 2) { // 2: state and utime 3904d6c458bSopenharmony_ci HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_); 3914d6c458bSopenharmony_ci return; 3924d6c458bSopenharmony_ci } 3934d6c458bSopenharmony_ci if (state != 'S' || utime != worker->lastCpuTime_) { 3944d6c458bSopenharmony_ci worker->idleCount_ = 0; 3954d6c458bSopenharmony_ci worker->lastCpuTime_ = utime; 3964d6c458bSopenharmony_ci continue; 3974d6c458bSopenharmony_ci } 3984d6c458bSopenharmony_ci if (++worker->idleCount_ >= IDLE_THRESHOLD) { 3994d6c458bSopenharmony_ci freeList_.emplace_back(worker); 4004d6c458bSopenharmony_ci } 4014d6c458bSopenharmony_ci } 4024d6c458bSopenharmony_ci} 4034d6c458bSopenharmony_ci 4044d6c458bSopenharmony_civoid TaskManager::TriggerShrink(uint32_t step) 4054d6c458bSopenharmony_ci{ 4064d6c458bSopenharmony_ci GetIdleWorkersList(step); 4074d6c458bSopenharmony_ci step = std::min(step, static_cast<uint32_t>(freeList_.size())); 4084d6c458bSopenharmony_ci uint32_t count = 0; 4094d6c458bSopenharmony_ci for (size_t i = 0; i < freeList_.size(); i++) { 4104d6c458bSopenharmony_ci auto worker = freeList_[i]; 4114d6c458bSopenharmony_ci if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) { 4124d6c458bSopenharmony_ci continue; 4134d6c458bSopenharmony_ci } 4144d6c458bSopenharmony_ci auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_; 4154d6c458bSopenharmony_ci if (idleTime < MAX_IDLE_TIME || worker->HasRunningTasks()) { 4164d6c458bSopenharmony_ci continue; 4174d6c458bSopenharmony_ci } 4184d6c458bSopenharmony_ci idleWorkers_.erase(worker); 4194d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_); 4204d6c458bSopenharmony_ci worker->PostReleaseSignal(); 4214d6c458bSopenharmony_ci if (++count == step) { 4224d6c458bSopenharmony_ci break; 4234d6c458bSopenharmony_ci } 4244d6c458bSopenharmony_ci } 4254d6c458bSopenharmony_ci freeList_.clear(); 4264d6c458bSopenharmony_ci} 4274d6c458bSopenharmony_ci#else 4284d6c458bSopenharmony_ciuint32_t TaskManager::GetIdleWorkers() 4294d6c458bSopenharmony_ci{ 4304d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 4314d6c458bSopenharmony_ci return idleWorkers_.size(); 4324d6c458bSopenharmony_ci} 4334d6c458bSopenharmony_ci 4344d6c458bSopenharmony_civoid TaskManager::TriggerShrink(uint32_t step) 4354d6c458bSopenharmony_ci{ 4364d6c458bSopenharmony_ci for (uint32_t i = 0; i < step; i++) { 4374d6c458bSopenharmony_ci // try to free the worker that idle time meets the requirement 4384d6c458bSopenharmony_ci auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker *worker) { 4394d6c458bSopenharmony_ci auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_; 4404d6c458bSopenharmony_ci return idleTime > MAX_IDLE_TIME && !worker->HasRunningTasks() && !worker->HasLongTask(); 4414d6c458bSopenharmony_ci }); 4424d6c458bSopenharmony_ci // remove it from all sets 4434d6c458bSopenharmony_ci if (iter != idleWorkers_.end()) { 4444d6c458bSopenharmony_ci auto worker = *iter; 4454d6c458bSopenharmony_ci idleWorkers_.erase(worker); 4464d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_); 4474d6c458bSopenharmony_ci worker->PostReleaseSignal(); 4484d6c458bSopenharmony_ci } 4494d6c458bSopenharmony_ci } 4504d6c458bSopenharmony_ci} 4514d6c458bSopenharmony_ci#endif 4524d6c458bSopenharmony_ci 4534d6c458bSopenharmony_civoid TaskManager::NotifyShrink(uint32_t targetNum) 4544d6c458bSopenharmony_ci{ 4554d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 4564d6c458bSopenharmony_ci uint32_t workerCount = workers_.size(); 4574d6c458bSopenharmony_ci uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS; 4584d6c458bSopenharmony_ci if (minThread == 0) { 4594d6c458bSopenharmony_ci HILOG_INFO("taskpool:: the system now is under low memory"); 4604d6c458bSopenharmony_ci } 4614d6c458bSopenharmony_ci if (workerCount > minThread && workerCount > targetNum) { 4624d6c458bSopenharmony_ci targetNum = std::max(minThread, targetNum); 4634d6c458bSopenharmony_ci uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP); 4644d6c458bSopenharmony_ci TriggerShrink(step); 4654d6c458bSopenharmony_ci } 4664d6c458bSopenharmony_ci // remove all timeout workers 4674d6c458bSopenharmony_ci for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) { 4684d6c458bSopenharmony_ci auto worker = *iter; 4694d6c458bSopenharmony_ci if (workers_.find(worker) == workers_.end()) { 4704d6c458bSopenharmony_ci HILOG_WARN("taskpool:: current worker maybe release"); 4714d6c458bSopenharmony_ci iter = timeoutWorkers_.erase(iter); 4724d6c458bSopenharmony_ci } else if (!worker->HasRunningTasks()) { 4734d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", worker->tid_); 4744d6c458bSopenharmony_ci worker->PostReleaseSignal(); 4754d6c458bSopenharmony_ci timeoutWorkers_.erase(iter++); 4764d6c458bSopenharmony_ci return; 4774d6c458bSopenharmony_ci } else { 4784d6c458bSopenharmony_ci iter++; 4794d6c458bSopenharmony_ci } 4804d6c458bSopenharmony_ci } 4814d6c458bSopenharmony_ci uint32_t idleNum = idleWorkers_.size(); 4824d6c458bSopenharmony_ci // System memory state is moderate and the worker has exeuted tasks, we will try to release it 4834d6c458bSopenharmony_ci if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) { 4844d6c458bSopenharmony_ci auto worker = *(idleWorkers_.begin()); 4854d6c458bSopenharmony_ci // worker that has longTask should not be released 4864d6c458bSopenharmony_ci if (worker == nullptr || worker->HasLongTask()) { 4874d6c458bSopenharmony_ci return; 4884d6c458bSopenharmony_ci } 4894d6c458bSopenharmony_ci if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released 4904d6c458bSopenharmony_ci TriggerShrink(DEFAULT_MIN_THREADS); 4914d6c458bSopenharmony_ci return; 4924d6c458bSopenharmony_ci } 4934d6c458bSopenharmony_ci } 4944d6c458bSopenharmony_ci 4954d6c458bSopenharmony_ci // Create a worker for performance 4964d6c458bSopenharmony_ci if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) { 4974d6c458bSopenharmony_ci CreateWorkers(hostEnv_); 4984d6c458bSopenharmony_ci } 4994d6c458bSopenharmony_ci // stop the timer 5004d6c458bSopenharmony_ci if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) { 5014d6c458bSopenharmony_ci suspend_ = true; 5024d6c458bSopenharmony_ci uv_timer_stop(timer_); 5034d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: timer will be suspended"); 5044d6c458bSopenharmony_ci } 5054d6c458bSopenharmony_ci} 5064d6c458bSopenharmony_ci 5074d6c458bSopenharmony_civoid TaskManager::TriggerLoadBalance(const uv_timer_t* req) 5084d6c458bSopenharmony_ci{ 5094d6c458bSopenharmony_ci TaskManager& taskManager = TaskManager::GetInstance(); 5104d6c458bSopenharmony_ci taskManager.CheckForBlockedWorkers(); 5114d6c458bSopenharmony_ci uint32_t targetNum = taskManager.ComputeSuitableThreadNum(); 5124d6c458bSopenharmony_ci taskManager.NotifyShrink(targetNum); 5134d6c458bSopenharmony_ci taskManager.CountTraceForWorker(); 5144d6c458bSopenharmony_ci} 5154d6c458bSopenharmony_ci 5164d6c458bSopenharmony_civoid TaskManager::TryExpand() 5174d6c458bSopenharmony_ci{ 5184d6c458bSopenharmony_ci // dispatch task in the TaskPoolManager thread 5194d6c458bSopenharmony_ci NotifyExecuteTask(); 5204d6c458bSopenharmony_ci // do not trigger when there are more idleWorkers than tasks 5214d6c458bSopenharmony_ci uint32_t idleNum = GetIdleWorkers(); 5224d6c458bSopenharmony_ci if (idleNum > GetNonIdleTaskNum()) { 5234d6c458bSopenharmony_ci return; 5244d6c458bSopenharmony_ci } 5254d6c458bSopenharmony_ci needChecking_ = false; // do not need to check 5264d6c458bSopenharmony_ci uint32_t targetNum = ComputeSuitableIdleNum(); 5274d6c458bSopenharmony_ci uint32_t workerCount = 0; 5284d6c458bSopenharmony_ci uint32_t idleCount = 0; 5294d6c458bSopenharmony_ci uint32_t timeoutWorkers = 0; 5304d6c458bSopenharmony_ci { 5314d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 5324d6c458bSopenharmony_ci idleCount = idleWorkers_.size(); 5334d6c458bSopenharmony_ci workerCount = workers_.size(); 5344d6c458bSopenharmony_ci timeoutWorkers = timeoutWorkers_.size(); 5354d6c458bSopenharmony_ci } 5364d6c458bSopenharmony_ci uint32_t maxThreads = std::max(ConcurrentHelper::GetMaxThreads(), DEFAULT_THREADS); 5374d6c458bSopenharmony_ci maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads 5384d6c458bSopenharmony_ci if (workerCount < maxThreads && idleCount < targetNum) { 5394d6c458bSopenharmony_ci uint32_t step = std::min(maxThreads, targetNum) - idleCount; 5404d6c458bSopenharmony_ci // Prevent the total number of expanded threads from exceeding maxThreads 5414d6c458bSopenharmony_ci if (step + workerCount > maxThreads) { 5424d6c458bSopenharmony_ci step = maxThreads - workerCount; 5434d6c458bSopenharmony_ci } 5444d6c458bSopenharmony_ci CreateWorkers(hostEnv_, step); 5454d6c458bSopenharmony_ci HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u", 5464d6c458bSopenharmony_ci maxThreads, step, GetThreadNum()); 5474d6c458bSopenharmony_ci } 5484d6c458bSopenharmony_ci if (UNLIKELY(suspend_)) { 5494d6c458bSopenharmony_ci suspend_ = false; 5504d6c458bSopenharmony_ci uv_timer_again(timer_); 5514d6c458bSopenharmony_ci } 5524d6c458bSopenharmony_ci} 5534d6c458bSopenharmony_ci 5544d6c458bSopenharmony_civoid TaskManager::NotifyExpand(const uv_async_t* req) 5554d6c458bSopenharmony_ci{ 5564d6c458bSopenharmony_ci TaskManager& taskManager = TaskManager::GetInstance(); 5574d6c458bSopenharmony_ci taskManager.TryExpand(); 5584d6c458bSopenharmony_ci} 5594d6c458bSopenharmony_ci 5604d6c458bSopenharmony_civoid TaskManager::RunTaskManager() 5614d6c458bSopenharmony_ci{ 5624d6c458bSopenharmony_ci loop_ = uv_loop_new(); 5634d6c458bSopenharmony_ci if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE 5644d6c458bSopenharmony_ci HILOG_FATAL("taskpool:: new loop failed."); 5654d6c458bSopenharmony_ci return; 5664d6c458bSopenharmony_ci } 5674d6c458bSopenharmony_ci ConcurrentHelper::UvHandleInit(loop_, expandHandle_, TaskManager::NotifyExpand); 5684d6c458bSopenharmony_ci timer_ = new uv_timer_t; 5694d6c458bSopenharmony_ci uv_timer_init(loop_, timer_); 5704d6c458bSopenharmony_ci uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL); 5714d6c458bSopenharmony_ci isHandleInited_ = true; 5724d6c458bSopenharmony_ci#if defined IOS_PLATFORM || defined MAC_PLATFORM 5734d6c458bSopenharmony_ci pthread_setname_np("OS_TaskManager"); 5744d6c458bSopenharmony_ci#else 5754d6c458bSopenharmony_ci pthread_setname_np(pthread_self(), "OS_TaskManager"); 5764d6c458bSopenharmony_ci#endif 5774d6c458bSopenharmony_ci if (UNLIKELY(needChecking_)) { 5784d6c458bSopenharmony_ci needChecking_ = false; 5794d6c458bSopenharmony_ci uv_async_send(expandHandle_); 5804d6c458bSopenharmony_ci } 5814d6c458bSopenharmony_ci uv_run(loop_, UV_RUN_DEFAULT); 5824d6c458bSopenharmony_ci if (loop_ != nullptr) { 5834d6c458bSopenharmony_ci uv_loop_delete(loop_); 5844d6c458bSopenharmony_ci } 5854d6c458bSopenharmony_ci} 5864d6c458bSopenharmony_ci 5874d6c458bSopenharmony_civoid TaskManager::CancelTask(napi_env env, uint64_t taskId) 5884d6c458bSopenharmony_ci{ 5894d6c458bSopenharmony_ci // 1. Cannot find taskInfo by executeId, throw error 5904d6c458bSopenharmony_ci // 2. Find executing taskInfo, skip it 5914d6c458bSopenharmony_ci // 3. Find waiting taskInfo, cancel it 5924d6c458bSopenharmony_ci // 4. Find canceled taskInfo, skip it 5934d6c458bSopenharmony_ci std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId); 5944d6c458bSopenharmony_ci HILOG_INFO("taskpool:: %{public}s", strTrace.c_str()); 5954d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME(strTrace); 5964d6c458bSopenharmony_ci Task* task = GetTask(taskId); 5974d6c458bSopenharmony_ci if (task == nullptr) { 5984d6c458bSopenharmony_ci std::string errMsg = "taskpool:: the task may not exist"; 5994d6c458bSopenharmony_ci HILOG_ERROR("%{public}s", errMsg.c_str()); 6004d6c458bSopenharmony_ci ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); 6014d6c458bSopenharmony_ci return; 6024d6c458bSopenharmony_ci } 6034d6c458bSopenharmony_ci if (task->taskState_ == ExecuteState::CANCELED) { 6044d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task has been canceled"); 6054d6c458bSopenharmony_ci return; 6064d6c458bSopenharmony_ci } 6074d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_); 6084d6c458bSopenharmony_ci if (task->IsPeriodicTask()) { 6094d6c458bSopenharmony_ci task->CancelPendingTask(env); 6104d6c458bSopenharmony_ci uv_timer_stop(task->timer_); 6114d6c458bSopenharmony_ci uv_close(reinterpret_cast<uv_handle_t*>(task->timer_), [](uv_handle_t* handle) { 6124d6c458bSopenharmony_ci delete (uv_timer_t*)handle; 6134d6c458bSopenharmony_ci handle = nullptr; 6144d6c458bSopenharmony_ci }); 6154d6c458bSopenharmony_ci return; 6164d6c458bSopenharmony_ci } else if (task->IsSeqRunnerTask()) { 6174d6c458bSopenharmony_ci CancelSeqRunnerTask(env, task); 6184d6c458bSopenharmony_ci return; 6194d6c458bSopenharmony_ci } 6204d6c458bSopenharmony_ci if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) || 6214d6c458bSopenharmony_ci task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED || 6224d6c458bSopenharmony_ci task->taskState_ == ExecuteState::ENDING) { 6234d6c458bSopenharmony_ci std::string errMsg = "taskpool:: task is not executed or has been executed"; 6244d6c458bSopenharmony_ci HILOG_ERROR("%{public}s", errMsg.c_str()); 6254d6c458bSopenharmony_ci ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); 6264d6c458bSopenharmony_ci return; 6274d6c458bSopenharmony_ci } 6284d6c458bSopenharmony_ci 6294d6c458bSopenharmony_ci task->ClearDelayedTimers(); 6304d6c458bSopenharmony_ci ExecuteState state = task->taskState_.exchange(ExecuteState::CANCELED); 6314d6c458bSopenharmony_ci task->CancelPendingTask(env); 6324d6c458bSopenharmony_ci if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) { 6334d6c458bSopenharmony_ci reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter(); 6344d6c458bSopenharmony_ci task->DecreaseTaskRefCount(); 6354d6c458bSopenharmony_ci EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority); 6364d6c458bSopenharmony_ci napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: task has been canceled"); 6374d6c458bSopenharmony_ci napi_reject_deferred(env, task->currentTaskInfo_->deferred, error); 6384d6c458bSopenharmony_ci napi_reference_unref(env, task->taskRef_, nullptr); 6394d6c458bSopenharmony_ci delete task->currentTaskInfo_; 6404d6c458bSopenharmony_ci task->currentTaskInfo_ = nullptr; 6414d6c458bSopenharmony_ci } 6424d6c458bSopenharmony_ci} 6434d6c458bSopenharmony_ci 6444d6c458bSopenharmony_civoid TaskManager::CancelSeqRunnerTask(napi_env env, Task *task) 6454d6c458bSopenharmony_ci{ 6464d6c458bSopenharmony_ci if (task->taskState_ == ExecuteState::FINISHED) { 6474d6c458bSopenharmony_ci std::string errMsg = "taskpool:: sequenceRunner task has been executed"; 6484d6c458bSopenharmony_ci HILOG_ERROR("%{public}s", errMsg.c_str()); 6494d6c458bSopenharmony_ci ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); 6504d6c458bSopenharmony_ci } else { 6514d6c458bSopenharmony_ci task->taskState_ = ExecuteState::CANCELED; 6524d6c458bSopenharmony_ci } 6534d6c458bSopenharmony_ci} 6544d6c458bSopenharmony_ci 6554d6c458bSopenharmony_civoid TaskManager::NotifyWorkerIdle(Worker* worker) 6564d6c458bSopenharmony_ci{ 6574d6c458bSopenharmony_ci { 6584d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 6594d6c458bSopenharmony_ci if (worker->state_ == WorkerState::BLOCKED) { 6604d6c458bSopenharmony_ci return; 6614d6c458bSopenharmony_ci } 6624d6c458bSopenharmony_ci idleWorkers_.insert(worker); 6634d6c458bSopenharmony_ci } 6644d6c458bSopenharmony_ci if (GetTaskNum() != 0) { 6654d6c458bSopenharmony_ci NotifyExecuteTask(); 6664d6c458bSopenharmony_ci } 6674d6c458bSopenharmony_ci CountTraceForWorker(); 6684d6c458bSopenharmony_ci} 6694d6c458bSopenharmony_ci 6704d6c458bSopenharmony_civoid TaskManager::NotifyWorkerCreated(Worker* worker) 6714d6c458bSopenharmony_ci{ 6724d6c458bSopenharmony_ci NotifyWorkerIdle(worker); 6734d6c458bSopenharmony_ci} 6744d6c458bSopenharmony_ci 6754d6c458bSopenharmony_civoid TaskManager::NotifyWorkerAdded(Worker* worker) 6764d6c458bSopenharmony_ci{ 6774d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 6784d6c458bSopenharmony_ci workers_.insert(worker); 6794d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size()); 6804d6c458bSopenharmony_ci} 6814d6c458bSopenharmony_ci 6824d6c458bSopenharmony_civoid TaskManager::NotifyWorkerRunning(Worker* worker) 6834d6c458bSopenharmony_ci{ 6844d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 6854d6c458bSopenharmony_ci idleWorkers_.erase(worker); 6864d6c458bSopenharmony_ci CountTraceForWorker(); 6874d6c458bSopenharmony_ci} 6884d6c458bSopenharmony_ci 6894d6c458bSopenharmony_ciuint32_t TaskManager::GetRunningWorkers() 6904d6c458bSopenharmony_ci{ 6914d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 6924d6c458bSopenharmony_ci return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) { 6934d6c458bSopenharmony_ci return worker->HasRunningTasks(); 6944d6c458bSopenharmony_ci }); 6954d6c458bSopenharmony_ci} 6964d6c458bSopenharmony_ci 6974d6c458bSopenharmony_ciuint32_t TaskManager::GetTimeoutWorkers() 6984d6c458bSopenharmony_ci{ 6994d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 7004d6c458bSopenharmony_ci return timeoutWorkers_.size(); 7014d6c458bSopenharmony_ci} 7024d6c458bSopenharmony_ci 7034d6c458bSopenharmony_ciuint32_t TaskManager::GetTaskNum() 7044d6c458bSopenharmony_ci{ 7054d6c458bSopenharmony_ci std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_); 7064d6c458bSopenharmony_ci uint32_t sum = 0; 7074d6c458bSopenharmony_ci for (const auto& elements : taskQueues_) { 7084d6c458bSopenharmony_ci sum += elements->GetTaskNum(); 7094d6c458bSopenharmony_ci } 7104d6c458bSopenharmony_ci return sum; 7114d6c458bSopenharmony_ci} 7124d6c458bSopenharmony_ci 7134d6c458bSopenharmony_ciuint32_t TaskManager::GetNonIdleTaskNum() 7144d6c458bSopenharmony_ci{ 7154d6c458bSopenharmony_ci return nonIdleTaskNum_; 7164d6c458bSopenharmony_ci} 7174d6c458bSopenharmony_ci 7184d6c458bSopenharmony_civoid TaskManager::IncreaseNumIfNoIdle(Priority priority) 7194d6c458bSopenharmony_ci{ 7204d6c458bSopenharmony_ci if (priority != Priority::IDLE) { 7214d6c458bSopenharmony_ci ++nonIdleTaskNum_; 7224d6c458bSopenharmony_ci } 7234d6c458bSopenharmony_ci} 7244d6c458bSopenharmony_ci 7254d6c458bSopenharmony_civoid TaskManager::DecreaseNumIfNoIdle(Priority priority) 7264d6c458bSopenharmony_ci{ 7274d6c458bSopenharmony_ci if (priority != Priority::IDLE) { 7284d6c458bSopenharmony_ci --nonIdleTaskNum_; 7294d6c458bSopenharmony_ci } 7304d6c458bSopenharmony_ci} 7314d6c458bSopenharmony_ci 7324d6c458bSopenharmony_ciuint32_t TaskManager::GetThreadNum() 7334d6c458bSopenharmony_ci{ 7344d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 7354d6c458bSopenharmony_ci return workers_.size(); 7364d6c458bSopenharmony_ci} 7374d6c458bSopenharmony_ci 7384d6c458bSopenharmony_civoid TaskManager::EnqueueTaskId(uint64_t taskId, Priority priority) 7394d6c458bSopenharmony_ci{ 7404d6c458bSopenharmony_ci { 7414d6c458bSopenharmony_ci std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_); 7424d6c458bSopenharmony_ci IncreaseNumIfNoIdle(priority); 7434d6c458bSopenharmony_ci taskQueues_[priority]->EnqueueTaskId(taskId); 7444d6c458bSopenharmony_ci } 7454d6c458bSopenharmony_ci TryTriggerExpand(); 7464d6c458bSopenharmony_ci Task* task = GetTask(taskId); 7474d6c458bSopenharmony_ci if (task == nullptr) { 7484d6c458bSopenharmony_ci HILOG_FATAL("taskpool:: task is nullptr"); 7494d6c458bSopenharmony_ci return; 7504d6c458bSopenharmony_ci } 7514d6c458bSopenharmony_ci task->IncreaseTaskRefCount(); 7524d6c458bSopenharmony_ci if (task->onEnqueuedCallBackInfo_ != nullptr) { 7534d6c458bSopenharmony_ci task->ExecuteListenerCallback(task->onEnqueuedCallBackInfo_); 7544d6c458bSopenharmony_ci } 7554d6c458bSopenharmony_ci} 7564d6c458bSopenharmony_ci 7574d6c458bSopenharmony_civoid TaskManager::EraseWaitingTaskId(uint64_t taskId, Priority priority) 7584d6c458bSopenharmony_ci{ 7594d6c458bSopenharmony_ci std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_); 7604d6c458bSopenharmony_ci if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) { 7614d6c458bSopenharmony_ci HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel"); 7624d6c458bSopenharmony_ci } 7634d6c458bSopenharmony_ci} 7644d6c458bSopenharmony_ci 7654d6c458bSopenharmony_cistd::pair<uint64_t, Priority> TaskManager::DequeueTaskId() 7664d6c458bSopenharmony_ci{ 7674d6c458bSopenharmony_ci std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_); 7684d6c458bSopenharmony_ci auto& highTaskQueue = taskQueues_[Priority::HIGH]; 7694d6c458bSopenharmony_ci if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) { 7704d6c458bSopenharmony_ci highPrioExecuteCount_++; 7714d6c458bSopenharmony_ci return GetTaskByPriority(highTaskQueue, Priority::HIGH); 7724d6c458bSopenharmony_ci } 7734d6c458bSopenharmony_ci highPrioExecuteCount_ = 0; 7744d6c458bSopenharmony_ci 7754d6c458bSopenharmony_ci auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM]; 7764d6c458bSopenharmony_ci if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) { 7774d6c458bSopenharmony_ci mediumPrioExecuteCount_++; 7784d6c458bSopenharmony_ci return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM); 7794d6c458bSopenharmony_ci } 7804d6c458bSopenharmony_ci mediumPrioExecuteCount_ = 0; 7814d6c458bSopenharmony_ci 7824d6c458bSopenharmony_ci auto& lowTaskQueue = taskQueues_[Priority::LOW]; 7834d6c458bSopenharmony_ci if (!lowTaskQueue->IsEmpty()) { 7844d6c458bSopenharmony_ci return GetTaskByPriority(lowTaskQueue, Priority::LOW); 7854d6c458bSopenharmony_ci } 7864d6c458bSopenharmony_ci 7874d6c458bSopenharmony_ci auto& idleTaskQueue = taskQueues_[Priority::IDLE]; 7884d6c458bSopenharmony_ci if (highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() && IsChooseIdle()) { 7894d6c458bSopenharmony_ci return GetTaskByPriority(idleTaskQueue, Priority::IDLE); 7904d6c458bSopenharmony_ci } 7914d6c458bSopenharmony_ci return std::make_pair(0, Priority::LOW); 7924d6c458bSopenharmony_ci} 7934d6c458bSopenharmony_ci 7944d6c458bSopenharmony_cibool TaskManager::IsChooseIdle() 7954d6c458bSopenharmony_ci{ 7964d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 7974d6c458bSopenharmony_ci for (auto& worker : workers_) { 7984d6c458bSopenharmony_ci if (worker->state_ == WorkerState::IDLE) { 7994d6c458bSopenharmony_ci // If worker->state_ is WorkerState::IDLE, it means that the worker is free 8004d6c458bSopenharmony_ci continue; 8014d6c458bSopenharmony_ci } 8024d6c458bSopenharmony_ci // If there is a worker running a task, do not take the idle task. 8034d6c458bSopenharmony_ci return false; 8044d6c458bSopenharmony_ci } 8054d6c458bSopenharmony_ci // Only when all workers are free, will idle task be taken. 8064d6c458bSopenharmony_ci return true; 8074d6c458bSopenharmony_ci} 8084d6c458bSopenharmony_ci 8094d6c458bSopenharmony_cistd::pair<uint64_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, 8104d6c458bSopenharmony_ci Priority priority) 8114d6c458bSopenharmony_ci{ 8124d6c458bSopenharmony_ci uint64_t taskId = taskQueue->DequeueTaskId(); 8134d6c458bSopenharmony_ci if (IsDependendByTaskId(taskId)) { 8144d6c458bSopenharmony_ci EnqueuePendingTaskInfo(taskId, priority); 8154d6c458bSopenharmony_ci return std::make_pair(0, priority); 8164d6c458bSopenharmony_ci } 8174d6c458bSopenharmony_ci DecreaseNumIfNoIdle(priority); 8184d6c458bSopenharmony_ci return std::make_pair(taskId, priority); 8194d6c458bSopenharmony_ci} 8204d6c458bSopenharmony_ci 8214d6c458bSopenharmony_civoid TaskManager::NotifyExecuteTask() 8224d6c458bSopenharmony_ci{ 8234d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 8244d6c458bSopenharmony_ci if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) { 8254d6c458bSopenharmony_ci // When there are only idle tasks and workers executing them, it is not triggered 8264d6c458bSopenharmony_ci return; 8274d6c458bSopenharmony_ci } 8284d6c458bSopenharmony_ci for (auto& worker : idleWorkers_) { 8294d6c458bSopenharmony_ci worker->NotifyExecuteTask(); 8304d6c458bSopenharmony_ci } 8314d6c458bSopenharmony_ci} 8324d6c458bSopenharmony_ci 8334d6c458bSopenharmony_civoid TaskManager::InitTaskManager(napi_env env) 8344d6c458bSopenharmony_ci{ 8354d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME("InitTaskManager"); 8364d6c458bSopenharmony_ci if (!isInitialized_.exchange(true, std::memory_order_relaxed)) { 8374d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT) 8384d6c458bSopenharmony_ci globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0); 8394d6c458bSopenharmony_ci if (!globalEnableFfrtFlag_) { 8404d6c458bSopenharmony_ci UpdateSystemAppFlag(); 8414d6c458bSopenharmony_ci if (IsSystemApp()) { 8424d6c458bSopenharmony_ci disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0); 8434d6c458bSopenharmony_ci } 8444d6c458bSopenharmony_ci } 8454d6c458bSopenharmony_ci if (EnableFfrt()) { 8464d6c458bSopenharmony_ci HILOG_INFO("taskpool:: apps use ffrt"); 8474d6c458bSopenharmony_ci } else { 8484d6c458bSopenharmony_ci HILOG_INFO("taskpool:: apps do not use ffrt"); 8494d6c458bSopenharmony_ci } 8504d6c458bSopenharmony_ci#endif 8514d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_EVENTHANDLER) 8524d6c458bSopenharmony_ci mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>( 8534d6c458bSopenharmony_ci OHOS::AppExecFwk::EventRunner::GetMainEventRunner()); 8544d6c458bSopenharmony_ci#endif 8554d6c458bSopenharmony_ci auto mainThreadEngine = NativeEngine::GetMainThreadEngine(); 8564d6c458bSopenharmony_ci if (mainThreadEngine == nullptr) { 8574d6c458bSopenharmony_ci HILOG_FATAL("taskpool:: mainThreadEngine is nullptr"); 8584d6c458bSopenharmony_ci return; 8594d6c458bSopenharmony_ci } 8604d6c458bSopenharmony_ci hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine); 8614d6c458bSopenharmony_ci // Add a reserved thread for taskpool 8624d6c458bSopenharmony_ci CreateWorkers(hostEnv_); 8634d6c458bSopenharmony_ci // Create a timer to manage worker threads 8644d6c458bSopenharmony_ci std::thread workerManager([this] {this->RunTaskManager();}); 8654d6c458bSopenharmony_ci workerManager.detach(); 8664d6c458bSopenharmony_ci } 8674d6c458bSopenharmony_ci} 8684d6c458bSopenharmony_ci 8694d6c458bSopenharmony_civoid TaskManager::CreateWorkers(napi_env env, uint32_t num) 8704d6c458bSopenharmony_ci{ 8714d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num); 8724d6c458bSopenharmony_ci for (uint32_t i = 0; i < num; i++) { 8734d6c458bSopenharmony_ci auto worker = Worker::WorkerConstructor(env); 8744d6c458bSopenharmony_ci NotifyWorkerAdded(worker); 8754d6c458bSopenharmony_ci } 8764d6c458bSopenharmony_ci CountTraceForWorker(); 8774d6c458bSopenharmony_ci} 8784d6c458bSopenharmony_ci 8794d6c458bSopenharmony_civoid TaskManager::RemoveWorker(Worker* worker) 8804d6c458bSopenharmony_ci{ 8814d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 8824d6c458bSopenharmony_ci idleWorkers_.erase(worker); 8834d6c458bSopenharmony_ci timeoutWorkers_.erase(worker); 8844d6c458bSopenharmony_ci workers_.erase(worker); 8854d6c458bSopenharmony_ci} 8864d6c458bSopenharmony_ci 8874d6c458bSopenharmony_civoid TaskManager::RestoreWorker(Worker* worker) 8884d6c458bSopenharmony_ci{ 8894d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 8904d6c458bSopenharmony_ci if (UNLIKELY(suspend_)) { 8914d6c458bSopenharmony_ci suspend_ = false; 8924d6c458bSopenharmony_ci uv_timer_again(timer_); 8934d6c458bSopenharmony_ci } 8944d6c458bSopenharmony_ci if (worker->state_ == WorkerState::BLOCKED) { 8954d6c458bSopenharmony_ci // since the worker is blocked, we should add it to the timeout set 8964d6c458bSopenharmony_ci timeoutWorkers_.insert(worker); 8974d6c458bSopenharmony_ci return; 8984d6c458bSopenharmony_ci } 8994d6c458bSopenharmony_ci // Since the worker may be executing some tasks in IO thread, we should add it to the 9004d6c458bSopenharmony_ci // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread. 9014d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size()); 9024d6c458bSopenharmony_ci idleWorkers_.emplace_hint(idleWorkers_.end(), worker); 9034d6c458bSopenharmony_ci if (GetTaskNum() != 0) { 9044d6c458bSopenharmony_ci NotifyExecuteTask(); 9054d6c458bSopenharmony_ci } 9064d6c458bSopenharmony_ci} 9074d6c458bSopenharmony_ci 9084d6c458bSopenharmony_ci// ---------------------------------- SendData --------------------------------------- 9094d6c458bSopenharmony_civoid TaskManager::RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo) 9104d6c458bSopenharmony_ci{ 9114d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(callbackMutex_); 9124d6c458bSopenharmony_ci callbackTable_[taskId] = callbackInfo; 9134d6c458bSopenharmony_ci} 9144d6c458bSopenharmony_ci 9154d6c458bSopenharmony_cistd::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint64_t taskId) 9164d6c458bSopenharmony_ci{ 9174d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(callbackMutex_); 9184d6c458bSopenharmony_ci auto iter = callbackTable_.find(taskId); 9194d6c458bSopenharmony_ci if (iter == callbackTable_.end() || iter->second == nullptr) { 9204d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: the callback does not exist"); 9214d6c458bSopenharmony_ci return nullptr; 9224d6c458bSopenharmony_ci } 9234d6c458bSopenharmony_ci return iter->second; 9244d6c458bSopenharmony_ci} 9254d6c458bSopenharmony_ci 9264d6c458bSopenharmony_civoid TaskManager::IncreaseRefCount(uint64_t taskId) 9274d6c458bSopenharmony_ci{ 9284d6c458bSopenharmony_ci if (taskId == 0) { // do not support func 9294d6c458bSopenharmony_ci return; 9304d6c458bSopenharmony_ci } 9314d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(callbackMutex_); 9324d6c458bSopenharmony_ci auto iter = callbackTable_.find(taskId); 9334d6c458bSopenharmony_ci if (iter == callbackTable_.end() || iter->second == nullptr) { 9344d6c458bSopenharmony_ci return; 9354d6c458bSopenharmony_ci } 9364d6c458bSopenharmony_ci iter->second->refCount++; 9374d6c458bSopenharmony_ci} 9384d6c458bSopenharmony_ci 9394d6c458bSopenharmony_civoid TaskManager::DecreaseRefCount(napi_env env, uint64_t taskId) 9404d6c458bSopenharmony_ci{ 9414d6c458bSopenharmony_ci if (taskId == 0) { // do not support func 9424d6c458bSopenharmony_ci return; 9434d6c458bSopenharmony_ci } 9444d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(callbackMutex_); 9454d6c458bSopenharmony_ci auto iter = callbackTable_.find(taskId); 9464d6c458bSopenharmony_ci if (iter == callbackTable_.end() || iter->second == nullptr) { 9474d6c458bSopenharmony_ci return; 9484d6c458bSopenharmony_ci } 9494d6c458bSopenharmony_ci 9504d6c458bSopenharmony_ci auto task = reinterpret_cast<Task*>(taskId); 9514d6c458bSopenharmony_ci if (!task->IsValid()) { 9524d6c458bSopenharmony_ci callbackTable_.erase(iter); 9534d6c458bSopenharmony_ci return; 9544d6c458bSopenharmony_ci } 9554d6c458bSopenharmony_ci 9564d6c458bSopenharmony_ci iter->second->refCount--; 9574d6c458bSopenharmony_ci if (iter->second->refCount == 0) { 9584d6c458bSopenharmony_ci callbackTable_.erase(iter); 9594d6c458bSopenharmony_ci } 9604d6c458bSopenharmony_ci} 9614d6c458bSopenharmony_ci 9624d6c458bSopenharmony_cinapi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task) 9634d6c458bSopenharmony_ci{ 9644d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task:%{public}s NotifyCallbackExecute", std::to_string(task->taskId_).c_str()); 9654d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(callbackMutex_); 9664d6c458bSopenharmony_ci auto iter = callbackTable_.find(task->taskId_); 9674d6c458bSopenharmony_ci if (iter == callbackTable_.end() || iter->second == nullptr) { 9684d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side"); 9694d6c458bSopenharmony_ci ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED); 9704d6c458bSopenharmony_ci delete resultInfo; 9714d6c458bSopenharmony_ci return nullptr; 9724d6c458bSopenharmony_ci } 9734d6c458bSopenharmony_ci Worker* worker = static_cast<Worker*>(task->worker_); 9744d6c458bSopenharmony_ci worker->Enqueue(task->env_, resultInfo); 9754d6c458bSopenharmony_ci auto callbackInfo = iter->second; 9764d6c458bSopenharmony_ci callbackInfo->refCount++; 9774d6c458bSopenharmony_ci callbackInfo->worker = worker; 9784d6c458bSopenharmony_ci auto workerEngine = reinterpret_cast<NativeEngine*>(env); 9794d6c458bSopenharmony_ci workerEngine->IncreaseListeningCounter(); 9804d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_EVENTHANDLER) 9814d6c458bSopenharmony_ci if (task->IsMainThreadTask()) { 9824d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME("NotifyCallbackExecute: PostTask"); 9834d6c458bSopenharmony_ci auto onCallbackTask = [callbackInfo]() { 9844d6c458bSopenharmony_ci TaskPool::ExecuteCallbackTask(callbackInfo.get()); 9854d6c458bSopenharmony_ci }; 9864d6c458bSopenharmony_ci TaskManager::GetInstance().PostTask(onCallbackTask, "TaskPoolOnCallbackTask", worker->priority_); 9874d6c458bSopenharmony_ci } else { 9884d6c458bSopenharmony_ci callbackInfo->onCallbackSignal->data = callbackInfo.get(); 9894d6c458bSopenharmony_ci uv_async_send(callbackInfo->onCallbackSignal); 9904d6c458bSopenharmony_ci } 9914d6c458bSopenharmony_ci#else 9924d6c458bSopenharmony_ci callbackInfo->onCallbackSignal->data = callbackInfo.get(); 9934d6c458bSopenharmony_ci uv_async_send(callbackInfo->onCallbackSignal); 9944d6c458bSopenharmony_ci#endif 9954d6c458bSopenharmony_ci return nullptr; 9964d6c458bSopenharmony_ci} 9974d6c458bSopenharmony_ci 9984d6c458bSopenharmony_ciMsgQueue* TaskManager::GetMessageQueue(const uv_async_t* req) 9994d6c458bSopenharmony_ci{ 10004d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(callbackMutex_); 10014d6c458bSopenharmony_ci auto info = static_cast<CallbackInfo*>(req->data); 10024d6c458bSopenharmony_ci if (info == nullptr || info->worker == nullptr) { 10034d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: info or worker is nullptr"); 10044d6c458bSopenharmony_ci return nullptr; 10054d6c458bSopenharmony_ci } 10064d6c458bSopenharmony_ci auto worker = info->worker; 10074d6c458bSopenharmony_ci MsgQueue* queue = nullptr; 10084d6c458bSopenharmony_ci worker->Dequeue(info->hostEnv, queue); 10094d6c458bSopenharmony_ci return queue; 10104d6c458bSopenharmony_ci} 10114d6c458bSopenharmony_ci 10124d6c458bSopenharmony_ciMsgQueue* TaskManager::GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo) 10134d6c458bSopenharmony_ci{ 10144d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(callbackMutex_); 10154d6c458bSopenharmony_ci if (callbackInfo == nullptr || callbackInfo->worker == nullptr) { 10164d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: callbackInfo or worker is nullptr"); 10174d6c458bSopenharmony_ci return nullptr; 10184d6c458bSopenharmony_ci } 10194d6c458bSopenharmony_ci auto worker = callbackInfo->worker; 10204d6c458bSopenharmony_ci MsgQueue* queue = nullptr; 10214d6c458bSopenharmony_ci worker->Dequeue(callbackInfo->hostEnv, queue); 10224d6c458bSopenharmony_ci return queue; 10234d6c458bSopenharmony_ci} 10244d6c458bSopenharmony_ci// ---------------------------------- SendData --------------------------------------- 10254d6c458bSopenharmony_ci 10264d6c458bSopenharmony_civoid TaskManager::NotifyDependencyTaskInfo(uint64_t taskId) 10274d6c458bSopenharmony_ci{ 10284d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str()); 10294d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 10304d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_); 10314d6c458bSopenharmony_ci auto iter = dependentTaskInfos_.find(taskId); 10324d6c458bSopenharmony_ci if (iter == dependentTaskInfos_.end() || iter->second.empty()) { 10334d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: dependentTaskInfo empty"); 10344d6c458bSopenharmony_ci return; 10354d6c458bSopenharmony_ci } 10364d6c458bSopenharmony_ci for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) { 10374d6c458bSopenharmony_ci auto taskInfo = DequeuePendingTaskInfo(*taskIdIter); 10384d6c458bSopenharmony_ci RemoveDependencyById(taskId, *taskIdIter); 10394d6c458bSopenharmony_ci taskIdIter = iter->second.erase(taskIdIter); 10404d6c458bSopenharmony_ci if (taskInfo.first != 0) { 10414d6c458bSopenharmony_ci EnqueueTaskId(taskInfo.first, taskInfo.second); 10424d6c458bSopenharmony_ci } 10434d6c458bSopenharmony_ci } 10444d6c458bSopenharmony_ci} 10454d6c458bSopenharmony_ci 10464d6c458bSopenharmony_civoid TaskManager::RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId) 10474d6c458bSopenharmony_ci{ 10484d6c458bSopenharmony_ci HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str()); 10494d6c458bSopenharmony_ci // remove dependency after task execute 10504d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 10514d6c458bSopenharmony_ci auto dependTaskIter = dependTaskInfos_.find(taskId); 10524d6c458bSopenharmony_ci if (dependTaskIter != dependTaskInfos_.end()) { 10534d6c458bSopenharmony_ci auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId); 10544d6c458bSopenharmony_ci if (dependTaskInnerIter != dependTaskIter->second.end()) { 10554d6c458bSopenharmony_ci dependTaskIter->second.erase(dependTaskInnerIter); 10564d6c458bSopenharmony_ci } 10574d6c458bSopenharmony_ci } 10584d6c458bSopenharmony_ci} 10594d6c458bSopenharmony_ci 10604d6c458bSopenharmony_cibool TaskManager::IsDependendByTaskId(uint64_t taskId) 10614d6c458bSopenharmony_ci{ 10624d6c458bSopenharmony_ci std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 10634d6c458bSopenharmony_ci auto iter = dependTaskInfos_.find(taskId); 10644d6c458bSopenharmony_ci if (iter == dependTaskInfos_.end() || iter->second.empty()) { 10654d6c458bSopenharmony_ci return false; 10664d6c458bSopenharmony_ci } 10674d6c458bSopenharmony_ci return true; 10684d6c458bSopenharmony_ci} 10694d6c458bSopenharmony_ci 10704d6c458bSopenharmony_cibool TaskManager::IsDependentByTaskId(uint64_t dependentTaskId) 10714d6c458bSopenharmony_ci{ 10724d6c458bSopenharmony_ci std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_); 10734d6c458bSopenharmony_ci auto iter = dependentTaskInfos_.find(dependentTaskId); 10744d6c458bSopenharmony_ci if (iter == dependentTaskInfos_.end() || iter->second.empty()) { 10754d6c458bSopenharmony_ci return false; 10764d6c458bSopenharmony_ci } 10774d6c458bSopenharmony_ci return true; 10784d6c458bSopenharmony_ci} 10794d6c458bSopenharmony_ci 10804d6c458bSopenharmony_cibool TaskManager::StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet) 10814d6c458bSopenharmony_ci{ 10824d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str()); 10834d6c458bSopenharmony_ci StoreDependentTaskInfo(taskIdSet, taskId); 10844d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 10854d6c458bSopenharmony_ci auto iter = dependTaskInfos_.find(taskId); 10864d6c458bSopenharmony_ci if (iter == dependTaskInfos_.end()) { 10874d6c458bSopenharmony_ci for (const auto& dependentId : taskIdSet) { 10884d6c458bSopenharmony_ci auto idIter = dependTaskInfos_.find(dependentId); 10894d6c458bSopenharmony_ci if (idIter == dependTaskInfos_.end()) { 10904d6c458bSopenharmony_ci continue; 10914d6c458bSopenharmony_ci } 10924d6c458bSopenharmony_ci if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) { 10934d6c458bSopenharmony_ci return false; 10944d6c458bSopenharmony_ci } 10954d6c458bSopenharmony_ci } 10964d6c458bSopenharmony_ci dependTaskInfos_.emplace(taskId, std::move(taskIdSet)); 10974d6c458bSopenharmony_ci return true; 10984d6c458bSopenharmony_ci } 10994d6c458bSopenharmony_ci 11004d6c458bSopenharmony_ci for (const auto& dependentId : iter->second) { 11014d6c458bSopenharmony_ci auto idIter = dependTaskInfos_.find(dependentId); 11024d6c458bSopenharmony_ci if (idIter == dependTaskInfos_.end()) { 11034d6c458bSopenharmony_ci continue; 11044d6c458bSopenharmony_ci } 11054d6c458bSopenharmony_ci if (!CheckCircularDependency(iter->second, idIter->second, taskId)) { 11064d6c458bSopenharmony_ci return false; 11074d6c458bSopenharmony_ci } 11084d6c458bSopenharmony_ci } 11094d6c458bSopenharmony_ci iter->second.insert(taskIdSet.begin(), taskIdSet.end()); 11104d6c458bSopenharmony_ci return true; 11114d6c458bSopenharmony_ci} 11124d6c458bSopenharmony_ci 11134d6c458bSopenharmony_cibool TaskManager::CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId) 11144d6c458bSopenharmony_ci{ 11154d6c458bSopenharmony_ci for (const auto& id : idSet) { 11164d6c458bSopenharmony_ci if (id == taskId) { 11174d6c458bSopenharmony_ci return false; 11184d6c458bSopenharmony_ci } 11194d6c458bSopenharmony_ci auto iter = dependentIdSet.find(id); 11204d6c458bSopenharmony_ci if (iter != dependentIdSet.end()) { 11214d6c458bSopenharmony_ci continue; 11224d6c458bSopenharmony_ci } 11234d6c458bSopenharmony_ci auto dIter = dependTaskInfos_.find(id); 11244d6c458bSopenharmony_ci if (dIter == dependTaskInfos_.end()) { 11254d6c458bSopenharmony_ci continue; 11264d6c458bSopenharmony_ci } 11274d6c458bSopenharmony_ci if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) { 11284d6c458bSopenharmony_ci return false; 11294d6c458bSopenharmony_ci } 11304d6c458bSopenharmony_ci } 11314d6c458bSopenharmony_ci return true; 11324d6c458bSopenharmony_ci} 11334d6c458bSopenharmony_ci 11344d6c458bSopenharmony_cibool TaskManager::RemoveTaskDependency(uint64_t taskId, uint64_t dependentId) 11354d6c458bSopenharmony_ci{ 11364d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str()); 11374d6c458bSopenharmony_ci RemoveDependentTaskInfo(dependentId, taskId); 11384d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 11394d6c458bSopenharmony_ci auto iter = dependTaskInfos_.find(taskId); 11404d6c458bSopenharmony_ci if (iter == dependTaskInfos_.end()) { 11414d6c458bSopenharmony_ci return false; 11424d6c458bSopenharmony_ci } 11434d6c458bSopenharmony_ci auto dependIter = iter->second.find(dependentId); 11444d6c458bSopenharmony_ci if (dependIter == iter->second.end()) { 11454d6c458bSopenharmony_ci return false; 11464d6c458bSopenharmony_ci } 11474d6c458bSopenharmony_ci iter->second.erase(dependIter); 11484d6c458bSopenharmony_ci return true; 11494d6c458bSopenharmony_ci} 11504d6c458bSopenharmony_ci 11514d6c458bSopenharmony_civoid TaskManager::EnqueuePendingTaskInfo(uint64_t taskId, Priority priority) 11524d6c458bSopenharmony_ci{ 11534d6c458bSopenharmony_ci if (taskId == 0) { 11544d6c458bSopenharmony_ci return; 11554d6c458bSopenharmony_ci } 11564d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_); 11574d6c458bSopenharmony_ci pendingTaskInfos_.emplace(taskId, priority); 11584d6c458bSopenharmony_ci} 11594d6c458bSopenharmony_ci 11604d6c458bSopenharmony_cistd::pair<uint64_t, Priority> TaskManager::DequeuePendingTaskInfo(uint64_t taskId) 11614d6c458bSopenharmony_ci{ 11624d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_); 11634d6c458bSopenharmony_ci if (pendingTaskInfos_.empty()) { 11644d6c458bSopenharmony_ci return std::make_pair(0, Priority::DEFAULT); 11654d6c458bSopenharmony_ci } 11664d6c458bSopenharmony_ci std::pair<uint64_t, Priority> result; 11674d6c458bSopenharmony_ci for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) { 11684d6c458bSopenharmony_ci if (it->first == taskId) { 11694d6c458bSopenharmony_ci result = std::make_pair(it->first, it->second); 11704d6c458bSopenharmony_ci it = pendingTaskInfos_.erase(it); 11714d6c458bSopenharmony_ci break; 11724d6c458bSopenharmony_ci } 11734d6c458bSopenharmony_ci } 11744d6c458bSopenharmony_ci return result; 11754d6c458bSopenharmony_ci} 11764d6c458bSopenharmony_ci 11774d6c458bSopenharmony_civoid TaskManager::RemovePendingTaskInfo(uint64_t taskId) 11784d6c458bSopenharmony_ci{ 11794d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str()); 11804d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_); 11814d6c458bSopenharmony_ci pendingTaskInfos_.erase(taskId); 11824d6c458bSopenharmony_ci} 11834d6c458bSopenharmony_ci 11844d6c458bSopenharmony_civoid TaskManager::StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet, uint64_t taskId) 11854d6c458bSopenharmony_ci{ 11864d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str()); 11874d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_); 11884d6c458bSopenharmony_ci for (const auto& id : dependentTaskIdSet) { 11894d6c458bSopenharmony_ci auto iter = dependentTaskInfos_.find(id); 11904d6c458bSopenharmony_ci if (iter == dependentTaskInfos_.end()) { 11914d6c458bSopenharmony_ci std::set<uint64_t> set{taskId}; 11924d6c458bSopenharmony_ci dependentTaskInfos_.emplace(id, std::move(set)); 11934d6c458bSopenharmony_ci } else { 11944d6c458bSopenharmony_ci iter->second.emplace(taskId); 11954d6c458bSopenharmony_ci } 11964d6c458bSopenharmony_ci } 11974d6c458bSopenharmony_ci} 11984d6c458bSopenharmony_ci 11994d6c458bSopenharmony_civoid TaskManager::RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId) 12004d6c458bSopenharmony_ci{ 12014d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str()); 12024d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_); 12034d6c458bSopenharmony_ci auto iter = dependentTaskInfos_.find(dependentTaskId); 12044d6c458bSopenharmony_ci if (iter == dependentTaskInfos_.end()) { 12054d6c458bSopenharmony_ci return; 12064d6c458bSopenharmony_ci } 12074d6c458bSopenharmony_ci auto taskIter = iter->second.find(taskId); 12084d6c458bSopenharmony_ci if (taskIter == iter->second.end()) { 12094d6c458bSopenharmony_ci return; 12104d6c458bSopenharmony_ci } 12114d6c458bSopenharmony_ci iter->second.erase(taskIter); 12124d6c458bSopenharmony_ci} 12134d6c458bSopenharmony_ci 12144d6c458bSopenharmony_cistd::string TaskManager::GetTaskDependInfoToString(uint64_t taskId) 12154d6c458bSopenharmony_ci{ 12164d6c458bSopenharmony_ci std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 12174d6c458bSopenharmony_ci std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:"; 12184d6c458bSopenharmony_ci auto iter = dependTaskInfos_.find(taskId); 12194d6c458bSopenharmony_ci if (iter != dependTaskInfos_.end()) { 12204d6c458bSopenharmony_ci for (const auto& id : iter->second) { 12214d6c458bSopenharmony_ci str += " " + std::to_string(id); 12224d6c458bSopenharmony_ci } 12234d6c458bSopenharmony_ci } 12244d6c458bSopenharmony_ci return str; 12254d6c458bSopenharmony_ci} 12264d6c458bSopenharmony_ci 12274d6c458bSopenharmony_civoid TaskManager::StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration) 12284d6c458bSopenharmony_ci{ 12294d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str()); 12304d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_); 12314d6c458bSopenharmony_ci auto iter = taskDurationInfos_.find(taskId); 12324d6c458bSopenharmony_ci if (iter == taskDurationInfos_.end()) { 12334d6c458bSopenharmony_ci std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration); 12344d6c458bSopenharmony_ci taskDurationInfos_.emplace(taskId, std::move(durationData)); 12354d6c458bSopenharmony_ci } else { 12364d6c458bSopenharmony_ci if (totalDuration != 0) { 12374d6c458bSopenharmony_ci iter->second.first = totalDuration; 12384d6c458bSopenharmony_ci } 12394d6c458bSopenharmony_ci if (cpuDuration != 0) { 12404d6c458bSopenharmony_ci iter->second.second = cpuDuration; 12414d6c458bSopenharmony_ci } 12424d6c458bSopenharmony_ci } 12434d6c458bSopenharmony_ci} 12444d6c458bSopenharmony_ci 12454d6c458bSopenharmony_ciuint64_t TaskManager::GetTaskDuration(uint64_t taskId, std::string durationType) 12464d6c458bSopenharmony_ci{ 12474d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_); 12484d6c458bSopenharmony_ci auto iter = taskDurationInfos_.find(taskId); 12494d6c458bSopenharmony_ci if (iter == taskDurationInfos_.end()) { 12504d6c458bSopenharmony_ci return 0; 12514d6c458bSopenharmony_ci } 12524d6c458bSopenharmony_ci if (durationType == TASK_TOTAL_TIME) { 12534d6c458bSopenharmony_ci return iter->second.first; 12544d6c458bSopenharmony_ci } else if (durationType == TASK_CPU_TIME) { 12554d6c458bSopenharmony_ci return iter->second.second; 12564d6c458bSopenharmony_ci } else if (iter->second.first == 0) { 12574d6c458bSopenharmony_ci return 0; 12584d6c458bSopenharmony_ci } 12594d6c458bSopenharmony_ci return iter->second.first - iter->second.second; 12604d6c458bSopenharmony_ci} 12614d6c458bSopenharmony_ci 12624d6c458bSopenharmony_civoid TaskManager::RemoveTaskDuration(uint64_t taskId) 12634d6c458bSopenharmony_ci{ 12644d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str()); 12654d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_); 12664d6c458bSopenharmony_ci auto iter = taskDurationInfos_.find(taskId); 12674d6c458bSopenharmony_ci if (iter != taskDurationInfos_.end()) { 12684d6c458bSopenharmony_ci taskDurationInfos_.erase(iter); 12694d6c458bSopenharmony_ci } 12704d6c458bSopenharmony_ci} 12714d6c458bSopenharmony_ci 12724d6c458bSopenharmony_civoid TaskManager::StoreLongTaskInfo(uint64_t taskId, Worker* worker) 12734d6c458bSopenharmony_ci{ 12744d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(longTasksMutex_); 12754d6c458bSopenharmony_ci longTasksMap_.emplace(taskId, worker); 12764d6c458bSopenharmony_ci} 12774d6c458bSopenharmony_ci 12784d6c458bSopenharmony_civoid TaskManager::RemoveLongTaskInfo(uint64_t taskId) 12794d6c458bSopenharmony_ci{ 12804d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(longTasksMutex_); 12814d6c458bSopenharmony_ci longTasksMap_.erase(taskId); 12824d6c458bSopenharmony_ci} 12834d6c458bSopenharmony_ci 12844d6c458bSopenharmony_ciWorker* TaskManager::GetLongTaskInfo(uint64_t taskId) 12854d6c458bSopenharmony_ci{ 12864d6c458bSopenharmony_ci std::shared_lock<std::shared_mutex> lock(longTasksMutex_); 12874d6c458bSopenharmony_ci auto iter = longTasksMap_.find(taskId); 12884d6c458bSopenharmony_ci return iter != longTasksMap_.end() ? iter->second : nullptr; 12894d6c458bSopenharmony_ci} 12904d6c458bSopenharmony_ci 12914d6c458bSopenharmony_civoid TaskManager::TerminateTask(uint64_t taskId) 12924d6c458bSopenharmony_ci{ 12934d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str()); 12944d6c458bSopenharmony_ci auto worker = GetLongTaskInfo(taskId); 12954d6c458bSopenharmony_ci if (UNLIKELY(worker == nullptr)) { 12964d6c458bSopenharmony_ci return; 12974d6c458bSopenharmony_ci } 12984d6c458bSopenharmony_ci worker->TerminateTask(taskId); 12994d6c458bSopenharmony_ci RemoveLongTaskInfo(taskId); 13004d6c458bSopenharmony_ci} 13014d6c458bSopenharmony_ci 13024d6c458bSopenharmony_civoid TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask) 13034d6c458bSopenharmony_ci{ 13044d6c458bSopenharmony_ci uint64_t taskId = task->taskId_; 13054d6c458bSopenharmony_ci if (shouldDeleteTask) { 13064d6c458bSopenharmony_ci RemoveTask(taskId); 13074d6c458bSopenharmony_ci } 13084d6c458bSopenharmony_ci if (task->onResultSignal_ != nullptr) { 13094d6c458bSopenharmony_ci if (!uv_is_closing((uv_handle_t*)task->onResultSignal_)) { 13104d6c458bSopenharmony_ci ConcurrentHelper::UvHandleClose(task->onResultSignal_); 13114d6c458bSopenharmony_ci } else { 13124d6c458bSopenharmony_ci delete task->onResultSignal_; 13134d6c458bSopenharmony_ci } 13144d6c458bSopenharmony_ci task->onResultSignal_ = nullptr; 13154d6c458bSopenharmony_ci } 13164d6c458bSopenharmony_ci 13174d6c458bSopenharmony_ci if (task->currentTaskInfo_ != nullptr) { 13184d6c458bSopenharmony_ci delete task->currentTaskInfo_; 13194d6c458bSopenharmony_ci task->currentTaskInfo_ = nullptr; 13204d6c458bSopenharmony_ci } 13214d6c458bSopenharmony_ci 13224d6c458bSopenharmony_ci task->CancelPendingTask(env); 13234d6c458bSopenharmony_ci 13244d6c458bSopenharmony_ci task->ClearDelayedTimers(); 13254d6c458bSopenharmony_ci 13264d6c458bSopenharmony_ci if (task->IsFunctionTask() || task->IsGroupFunctionTask()) { 13274d6c458bSopenharmony_ci return; 13284d6c458bSopenharmony_ci } 13294d6c458bSopenharmony_ci DecreaseRefCount(env, taskId); 13304d6c458bSopenharmony_ci RemoveTaskDuration(taskId); 13314d6c458bSopenharmony_ci RemovePendingTaskInfo(taskId); 13324d6c458bSopenharmony_ci ReleaseCallBackInfo(task); 13334d6c458bSopenharmony_ci { 13344d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_); 13354d6c458bSopenharmony_ci for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) { 13364d6c458bSopenharmony_ci if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) { 13374d6c458bSopenharmony_ci dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter); 13384d6c458bSopenharmony_ci } else { 13394d6c458bSopenharmony_ci ++dependentTaskIter; 13404d6c458bSopenharmony_ci } 13414d6c458bSopenharmony_ci } 13424d6c458bSopenharmony_ci } 13434d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 13444d6c458bSopenharmony_ci auto dependTaskIter = dependTaskInfos_.find(taskId); 13454d6c458bSopenharmony_ci if (dependTaskIter != dependTaskInfos_.end()) { 13464d6c458bSopenharmony_ci dependTaskInfos_.erase(dependTaskIter); 13474d6c458bSopenharmony_ci } 13484d6c458bSopenharmony_ci} 13494d6c458bSopenharmony_ci 13504d6c458bSopenharmony_civoid TaskManager::ReleaseCallBackInfo(Task* task) 13514d6c458bSopenharmony_ci{ 13524d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str()); 13534d6c458bSopenharmony_ci if (task->onEnqueuedCallBackInfo_ != nullptr) { 13544d6c458bSopenharmony_ci delete task->onEnqueuedCallBackInfo_; 13554d6c458bSopenharmony_ci task->onEnqueuedCallBackInfo_ = nullptr; 13564d6c458bSopenharmony_ci } 13574d6c458bSopenharmony_ci 13584d6c458bSopenharmony_ci if (task->onStartExecutionCallBackInfo_ != nullptr) { 13594d6c458bSopenharmony_ci delete task->onStartExecutionCallBackInfo_; 13604d6c458bSopenharmony_ci task->onStartExecutionCallBackInfo_ = nullptr; 13614d6c458bSopenharmony_ci } 13624d6c458bSopenharmony_ci 13634d6c458bSopenharmony_ci if (task->onExecutionFailedCallBackInfo_ != nullptr) { 13644d6c458bSopenharmony_ci delete task->onExecutionFailedCallBackInfo_; 13654d6c458bSopenharmony_ci task->onExecutionFailedCallBackInfo_ = nullptr; 13664d6c458bSopenharmony_ci } 13674d6c458bSopenharmony_ci 13684d6c458bSopenharmony_ci if (task->onExecutionSucceededCallBackInfo_ != nullptr) { 13694d6c458bSopenharmony_ci delete task->onExecutionSucceededCallBackInfo_; 13704d6c458bSopenharmony_ci task->onExecutionSucceededCallBackInfo_ = nullptr; 13714d6c458bSopenharmony_ci } 13724d6c458bSopenharmony_ci 13734d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_EVENTHANDLER) 13744d6c458bSopenharmony_ci if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) { 13754d6c458bSopenharmony_ci if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) { 13764d6c458bSopenharmony_ci ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_); 13774d6c458bSopenharmony_ci } else { 13784d6c458bSopenharmony_ci delete task->onStartExecutionSignal_; 13794d6c458bSopenharmony_ci } 13804d6c458bSopenharmony_ci task->onStartExecutionSignal_ = nullptr; 13814d6c458bSopenharmony_ci } 13824d6c458bSopenharmony_ci#else 13834d6c458bSopenharmony_ci if (task->onStartExecutionSignal_ != nullptr) { 13844d6c458bSopenharmony_ci if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) { 13854d6c458bSopenharmony_ci ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_); 13864d6c458bSopenharmony_ci } else { 13874d6c458bSopenharmony_ci delete task->onStartExecutionSignal_; 13884d6c458bSopenharmony_ci } 13894d6c458bSopenharmony_ci task->onStartExecutionSignal_ = nullptr; 13904d6c458bSopenharmony_ci } 13914d6c458bSopenharmony_ci#endif 13924d6c458bSopenharmony_ci} 13934d6c458bSopenharmony_ci 13944d6c458bSopenharmony_civoid TaskManager::StoreTask(uint64_t taskId, Task* task) 13954d6c458bSopenharmony_ci{ 13964d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 13974d6c458bSopenharmony_ci tasks_.emplace(taskId, task); 13984d6c458bSopenharmony_ci} 13994d6c458bSopenharmony_ci 14004d6c458bSopenharmony_civoid TaskManager::RemoveTask(uint64_t taskId) 14014d6c458bSopenharmony_ci{ 14024d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 14034d6c458bSopenharmony_ci tasks_.erase(taskId); 14044d6c458bSopenharmony_ci} 14054d6c458bSopenharmony_ci 14064d6c458bSopenharmony_ciTask* TaskManager::GetTask(uint64_t taskId) 14074d6c458bSopenharmony_ci{ 14084d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 14094d6c458bSopenharmony_ci auto iter = tasks_.find(taskId); 14104d6c458bSopenharmony_ci if (iter == tasks_.end()) { 14114d6c458bSopenharmony_ci return nullptr; 14124d6c458bSopenharmony_ci } 14134d6c458bSopenharmony_ci return iter->second; 14144d6c458bSopenharmony_ci} 14154d6c458bSopenharmony_ci 14164d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT) 14174d6c458bSopenharmony_civoid TaskManager::UpdateSystemAppFlag() 14184d6c458bSopenharmony_ci{ 14194d6c458bSopenharmony_ci auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager(); 14204d6c458bSopenharmony_ci if (abilityManager == nullptr) { 14214d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr."); 14224d6c458bSopenharmony_ci return; 14234d6c458bSopenharmony_ci } 14244d6c458bSopenharmony_ci auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID); 14254d6c458bSopenharmony_ci if (bundleObj == nullptr) { 14264d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: fail to get bundle manager service."); 14274d6c458bSopenharmony_ci return; 14284d6c458bSopenharmony_ci } 14294d6c458bSopenharmony_ci auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj); 14304d6c458bSopenharmony_ci if (bundleMgr == nullptr) { 14314d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: Bundle manager is nullptr."); 14324d6c458bSopenharmony_ci return; 14334d6c458bSopenharmony_ci } 14344d6c458bSopenharmony_ci OHOS::AppExecFwk::BundleInfo bundleInfo; 14354d6c458bSopenharmony_ci if (bundleMgr->GetBundleInfoForSelf( 14364d6c458bSopenharmony_ci static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo) 14374d6c458bSopenharmony_ci != OHOS::ERR_OK) { 14384d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf"); 14394d6c458bSopenharmony_ci return; 14404d6c458bSopenharmony_ci } 14414d6c458bSopenharmony_ci isSystemApp_ = bundleInfo.applicationInfo.isSystemApp; 14424d6c458bSopenharmony_ci} 14434d6c458bSopenharmony_ci#endif 14444d6c458bSopenharmony_ci 14454d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_EVENTHANDLER) 14464d6c458bSopenharmony_cibool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority) 14474d6c458bSopenharmony_ci{ 14484d6c458bSopenharmony_ci return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority)); 14494d6c458bSopenharmony_ci} 14504d6c458bSopenharmony_ci#endif 14514d6c458bSopenharmony_ci 14524d6c458bSopenharmony_cibool TaskManager::CheckTask(uint64_t taskId) 14534d6c458bSopenharmony_ci{ 14544d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 14554d6c458bSopenharmony_ci auto item = tasks_.find(taskId); 14564d6c458bSopenharmony_ci return item != tasks_.end(); 14574d6c458bSopenharmony_ci} 14584d6c458bSopenharmony_ci 14594d6c458bSopenharmony_ci// ----------------------------------- TaskGroupManager ---------------------------------------- 14604d6c458bSopenharmony_ciTaskGroupManager& TaskGroupManager::GetInstance() 14614d6c458bSopenharmony_ci{ 14624d6c458bSopenharmony_ci static TaskGroupManager groupManager; 14634d6c458bSopenharmony_ci return groupManager; 14644d6c458bSopenharmony_ci} 14654d6c458bSopenharmony_ci 14664d6c458bSopenharmony_civoid TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId) 14674d6c458bSopenharmony_ci{ 14684d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(taskGroupsMutex_); 14694d6c458bSopenharmony_ci auto groupIter = taskGroups_.find(groupId); 14704d6c458bSopenharmony_ci if (groupIter == taskGroups_.end()) { 14714d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: taskGroup has been released"); 14724d6c458bSopenharmony_ci return; 14734d6c458bSopenharmony_ci } 14744d6c458bSopenharmony_ci auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second); 14754d6c458bSopenharmony_ci if (taskGroup == nullptr) { 14764d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: taskGroup is null"); 14774d6c458bSopenharmony_ci return; 14784d6c458bSopenharmony_ci } 14794d6c458bSopenharmony_ci taskGroup->taskRefs_.push_back(taskRef); 14804d6c458bSopenharmony_ci taskGroup->taskNum_++; 14814d6c458bSopenharmony_ci taskGroup->taskIds_.push_back(taskId); 14824d6c458bSopenharmony_ci} 14834d6c458bSopenharmony_ci 14844d6c458bSopenharmony_civoid TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group) 14854d6c458bSopenharmony_ci{ 14864d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group"); 14874d6c458bSopenharmony_ci TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_); 14884d6c458bSopenharmony_ci for (uint64_t taskId : group->taskIds_) { 14894d6c458bSopenharmony_ci Task* task = TaskManager::GetInstance().GetTask(taskId); 14904d6c458bSopenharmony_ci if (task == nullptr || !task->IsValid()) { 14914d6c458bSopenharmony_ci continue; 14924d6c458bSopenharmony_ci } 14934d6c458bSopenharmony_ci napi_reference_unref(task->env_, task->taskRef_, nullptr); 14944d6c458bSopenharmony_ci } 14954d6c458bSopenharmony_ci 14964d6c458bSopenharmony_ci if (group->currentGroupInfo_ != nullptr) { 14974d6c458bSopenharmony_ci delete group->currentGroupInfo_; 14984d6c458bSopenharmony_ci } 14994d6c458bSopenharmony_ci 15004d6c458bSopenharmony_ci group->CancelPendingGroup(env); 15014d6c458bSopenharmony_ci} 15024d6c458bSopenharmony_ci 15034d6c458bSopenharmony_civoid TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId) 15044d6c458bSopenharmony_ci{ 15054d6c458bSopenharmony_ci std::string strTrace = "CancelGroup: groupId: " + std::to_string(groupId); 15064d6c458bSopenharmony_ci HITRACE_HELPER_METER_NAME(strTrace); 15074d6c458bSopenharmony_ci HILOG_INFO("taskpool:: %{public}s", strTrace.c_str()); 15084d6c458bSopenharmony_ci TaskGroup* taskGroup = GetTaskGroup(groupId); 15094d6c458bSopenharmony_ci if (taskGroup == nullptr) { 15104d6c458bSopenharmony_ci HILOG_ERROR("taskpool:: CancelGroup group is nullptr"); 15114d6c458bSopenharmony_ci return; 15124d6c458bSopenharmony_ci } 15134d6c458bSopenharmony_ci if (taskGroup->groupState_ == ExecuteState::CANCELED) { 15144d6c458bSopenharmony_ci return; 15154d6c458bSopenharmony_ci } 15164d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_); 15174d6c458bSopenharmony_ci if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND || 15184d6c458bSopenharmony_ci taskGroup->groupState_ == ExecuteState::FINISHED) { 15194d6c458bSopenharmony_ci std::string errMsg = "taskpool:: taskGroup is not executed or has been executed"; 15204d6c458bSopenharmony_ci HILOG_ERROR("%{public}s", errMsg.c_str()); 15214d6c458bSopenharmony_ci ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str()); 15224d6c458bSopenharmony_ci return; 15234d6c458bSopenharmony_ci } 15244d6c458bSopenharmony_ci ExecuteState groupState = taskGroup->groupState_; 15254d6c458bSopenharmony_ci taskGroup->groupState_ = ExecuteState::CANCELED; 15264d6c458bSopenharmony_ci taskGroup->CancelPendingGroup(env); 15274d6c458bSopenharmony_ci if (taskGroup->currentGroupInfo_->finishedTask != taskGroup->taskNum_) { 15284d6c458bSopenharmony_ci for (uint64_t taskId : taskGroup->taskIds_) { 15294d6c458bSopenharmony_ci CancelGroupTask(env, taskId, taskGroup); 15304d6c458bSopenharmony_ci } 15314d6c458bSopenharmony_ci } 15324d6c458bSopenharmony_ci if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) { 15334d6c458bSopenharmony_ci auto engine = reinterpret_cast<NativeEngine*>(env); 15344d6c458bSopenharmony_ci for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) { 15354d6c458bSopenharmony_ci engine->DecreaseSubEnvCounter(); 15364d6c458bSopenharmony_ci } 15374d6c458bSopenharmony_ci napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled"); 15384d6c458bSopenharmony_ci napi_reject_deferred(env, taskGroup->currentGroupInfo_->deferred, error); 15394d6c458bSopenharmony_ci napi_delete_reference(env, taskGroup->currentGroupInfo_->resArr); 15404d6c458bSopenharmony_ci napi_reference_unref(env, taskGroup->groupRef_, nullptr); 15414d6c458bSopenharmony_ci delete taskGroup->currentGroupInfo_; 15424d6c458bSopenharmony_ci taskGroup->currentGroupInfo_ = nullptr; 15434d6c458bSopenharmony_ci } 15444d6c458bSopenharmony_ci} 15454d6c458bSopenharmony_ci 15464d6c458bSopenharmony_civoid TaskGroupManager::CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group) 15474d6c458bSopenharmony_ci{ 15484d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str()); 15494d6c458bSopenharmony_ci auto task = TaskManager::GetInstance().GetTask(taskId); 15504d6c458bSopenharmony_ci if (task == nullptr) { 15514d6c458bSopenharmony_ci HILOG_INFO("taskpool:: CancelGroupTask task is nullptr"); 15524d6c458bSopenharmony_ci return; 15534d6c458bSopenharmony_ci } 15544d6c458bSopenharmony_ci std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_); 15554d6c458bSopenharmony_ci if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) { 15564d6c458bSopenharmony_ci reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter(); 15574d6c458bSopenharmony_ci task->DecreaseTaskRefCount(); 15584d6c458bSopenharmony_ci TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority); 15594d6c458bSopenharmony_ci delete task->currentTaskInfo_; 15604d6c458bSopenharmony_ci task->currentTaskInfo_ = nullptr; 15614d6c458bSopenharmony_ci } 15624d6c458bSopenharmony_ci task->taskState_ = ExecuteState::CANCELED; 15634d6c458bSopenharmony_ci} 15644d6c458bSopenharmony_ci 15654d6c458bSopenharmony_civoid TaskGroupManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner) 15664d6c458bSopenharmony_ci{ 15674d6c458bSopenharmony_ci std::unique_lock<std::mutex> lock(seqRunnersMutex_); 15684d6c458bSopenharmony_ci seqRunners_.emplace(seqRunnerId, seqRunner); 15694d6c458bSopenharmony_ci} 15704d6c458bSopenharmony_ci 15714d6c458bSopenharmony_civoid TaskGroupManager::RemoveSequenceRunner(uint64_t seqRunnerId) 15724d6c458bSopenharmony_ci{ 15734d6c458bSopenharmony_ci std::unique_lock<std::mutex> lock(seqRunnersMutex_); 15744d6c458bSopenharmony_ci seqRunners_.erase(seqRunnerId); 15754d6c458bSopenharmony_ci} 15764d6c458bSopenharmony_ci 15774d6c458bSopenharmony_ciSequenceRunner* TaskGroupManager::GetSeqRunner(uint64_t seqRunnerId) 15784d6c458bSopenharmony_ci{ 15794d6c458bSopenharmony_ci std::unique_lock<std::mutex> lock(seqRunnersMutex_); 15804d6c458bSopenharmony_ci auto iter = seqRunners_.find(seqRunnerId); 15814d6c458bSopenharmony_ci if (iter != seqRunners_.end()) { 15824d6c458bSopenharmony_ci return iter->second; 15834d6c458bSopenharmony_ci } 15844d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: sequenceRunner has been released."); 15854d6c458bSopenharmony_ci return nullptr; 15864d6c458bSopenharmony_ci} 15874d6c458bSopenharmony_ci 15884d6c458bSopenharmony_civoid TaskGroupManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task) 15894d6c458bSopenharmony_ci{ 15904d6c458bSopenharmony_ci std::unique_lock<std::mutex> lock(seqRunnersMutex_); 15914d6c458bSopenharmony_ci auto iter = seqRunners_.find(seqRunnerId); 15924d6c458bSopenharmony_ci if (iter == seqRunners_.end()) { 15934d6c458bSopenharmony_ci HILOG_ERROR("seqRunner:: seqRunner not found."); 15944d6c458bSopenharmony_ci return; 15954d6c458bSopenharmony_ci } else { 15964d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> seqRunnerLock(iter->second->seqRunnerMutex_); 15974d6c458bSopenharmony_ci iter->second->seqRunnerTasks_.push(task); 15984d6c458bSopenharmony_ci } 15994d6c458bSopenharmony_ci} 16004d6c458bSopenharmony_ci 16014d6c458bSopenharmony_cibool TaskGroupManager::TriggerSeqRunner(napi_env env, Task* lastTask) 16024d6c458bSopenharmony_ci{ 16034d6c458bSopenharmony_ci uint64_t seqRunnerId = lastTask->seqRunnerId_; 16044d6c458bSopenharmony_ci SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId); 16054d6c458bSopenharmony_ci if (seqRunner == nullptr) { 16064d6c458bSopenharmony_ci HILOG_ERROR("seqRunner:: trigger seqRunner not exist."); 16074d6c458bSopenharmony_ci return false; 16084d6c458bSopenharmony_ci } 16094d6c458bSopenharmony_ci if (!SequenceRunnerManager::GetInstance().TriggerGlobalSeqRunner(env, seqRunner)) { 16104d6c458bSopenharmony_ci HILOG_ERROR("seqRunner:: trigger globalSeqRunner not exist."); 16114d6c458bSopenharmony_ci return false; 16124d6c458bSopenharmony_ci } 16134d6c458bSopenharmony_ci if (seqRunner->currentTaskId_ != lastTask->taskId_) { 16144d6c458bSopenharmony_ci HILOG_ERROR("seqRunner:: only front task can trigger seqRunner."); 16154d6c458bSopenharmony_ci return false; 16164d6c458bSopenharmony_ci } 16174d6c458bSopenharmony_ci { 16184d6c458bSopenharmony_ci std::unique_lock<std::shared_mutex> lock(seqRunner->seqRunnerMutex_); 16194d6c458bSopenharmony_ci if (seqRunner->seqRunnerTasks_.empty()) { 16204d6c458bSopenharmony_ci HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty.", std::to_string(seqRunnerId).c_str()); 16214d6c458bSopenharmony_ci seqRunner->currentTaskId_ = 0; 16224d6c458bSopenharmony_ci return true; 16234d6c458bSopenharmony_ci } 16244d6c458bSopenharmony_ci Task* task = seqRunner->seqRunnerTasks_.front(); 16254d6c458bSopenharmony_ci seqRunner->seqRunnerTasks_.pop(); 16264d6c458bSopenharmony_ci while (task->taskState_ == ExecuteState::CANCELED) { 16274d6c458bSopenharmony_ci DisposeCanceledTask(env, task); 16284d6c458bSopenharmony_ci if (seqRunner->seqRunnerTasks_.empty()) { 16294d6c458bSopenharmony_ci HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.", 16304d6c458bSopenharmony_ci std::to_string(seqRunnerId).c_str()); 16314d6c458bSopenharmony_ci seqRunner->currentTaskId_ = 0; 16324d6c458bSopenharmony_ci return true; 16334d6c458bSopenharmony_ci } 16344d6c458bSopenharmony_ci task = seqRunner->seqRunnerTasks_.front(); 16354d6c458bSopenharmony_ci seqRunner->seqRunnerTasks_.pop(); 16364d6c458bSopenharmony_ci } 16374d6c458bSopenharmony_ci seqRunner->currentTaskId_ = task->taskId_; 16384d6c458bSopenharmony_ci task->IncreaseRefCount(); 16394d6c458bSopenharmony_ci task->taskState_ = ExecuteState::WAITING; 16404d6c458bSopenharmony_ci HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.", 16414d6c458bSopenharmony_ci std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str()); 16424d6c458bSopenharmony_ci TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_); 16434d6c458bSopenharmony_ci } 16444d6c458bSopenharmony_ci return true; 16454d6c458bSopenharmony_ci} 16464d6c458bSopenharmony_ci 16474d6c458bSopenharmony_civoid TaskGroupManager::DisposeCanceledTask(napi_env env, Task* task) 16484d6c458bSopenharmony_ci{ 16494d6c458bSopenharmony_ci napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: sequenceRunner task has been canceled"); 16504d6c458bSopenharmony_ci napi_reject_deferred(env, task->currentTaskInfo_->deferred, error); 16514d6c458bSopenharmony_ci reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter(); 16524d6c458bSopenharmony_ci napi_reference_unref(env, task->taskRef_, nullptr); 16534d6c458bSopenharmony_ci delete task->currentTaskInfo_; 16544d6c458bSopenharmony_ci task->currentTaskInfo_ = nullptr; 16554d6c458bSopenharmony_ci} 16564d6c458bSopenharmony_ci 16574d6c458bSopenharmony_civoid TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup) 16584d6c458bSopenharmony_ci{ 16594d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(taskGroupsMutex_); 16604d6c458bSopenharmony_ci taskGroups_.emplace(groupId, taskGroup); 16614d6c458bSopenharmony_ci} 16624d6c458bSopenharmony_ci 16634d6c458bSopenharmony_civoid TaskGroupManager::RemoveTaskGroup(uint64_t groupId) 16644d6c458bSopenharmony_ci{ 16654d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(taskGroupsMutex_); 16664d6c458bSopenharmony_ci taskGroups_.erase(groupId); 16674d6c458bSopenharmony_ci} 16684d6c458bSopenharmony_ci 16694d6c458bSopenharmony_ciTaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId) 16704d6c458bSopenharmony_ci{ 16714d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(taskGroupsMutex_); 16724d6c458bSopenharmony_ci auto groupIter = taskGroups_.find(groupId); 16734d6c458bSopenharmony_ci if (groupIter == taskGroups_.end()) { 16744d6c458bSopenharmony_ci return nullptr; 16754d6c458bSopenharmony_ci } 16764d6c458bSopenharmony_ci return reinterpret_cast<TaskGroup*>(groupIter->second); 16774d6c458bSopenharmony_ci} 16784d6c458bSopenharmony_ci 16794d6c458bSopenharmony_cibool TaskGroupManager::UpdateGroupState(uint64_t groupId) 16804d6c458bSopenharmony_ci{ 16814d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: UpdateGroupState groupId:%{public}s", std::to_string(groupId).c_str()); 16824d6c458bSopenharmony_ci // During the modification process of the group, prevent other sub threads from performing other 16834d6c458bSopenharmony_ci // operations on the group pointer, which may cause the modification to fail. 16844d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(taskGroupsMutex_); 16854d6c458bSopenharmony_ci auto groupIter = taskGroups_.find(groupId); 16864d6c458bSopenharmony_ci if (groupIter == taskGroups_.end()) { 16874d6c458bSopenharmony_ci return false; 16884d6c458bSopenharmony_ci } 16894d6c458bSopenharmony_ci TaskGroup* group = reinterpret_cast<TaskGroup*>(groupIter->second); 16904d6c458bSopenharmony_ci if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) { 16914d6c458bSopenharmony_ci HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled"); 16924d6c458bSopenharmony_ci return false; 16934d6c458bSopenharmony_ci } 16944d6c458bSopenharmony_ci group->groupState_ = ExecuteState::RUNNING; 16954d6c458bSopenharmony_ci return true; 16964d6c458bSopenharmony_ci} 16974d6c458bSopenharmony_ci 16984d6c458bSopenharmony_ci// ----------------------------------- SequenceRunnerManager ---------------------------------------- 16994d6c458bSopenharmony_ciSequenceRunnerManager& SequenceRunnerManager::GetInstance() 17004d6c458bSopenharmony_ci{ 17014d6c458bSopenharmony_ci static SequenceRunnerManager sequenceRunnerManager; 17024d6c458bSopenharmony_ci return sequenceRunnerManager; 17034d6c458bSopenharmony_ci} 17044d6c458bSopenharmony_ci 17054d6c458bSopenharmony_ciSequenceRunner* SequenceRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc, 17064d6c458bSopenharmony_ci const std::string &name, uint32_t priority) 17074d6c458bSopenharmony_ci{ 17084d6c458bSopenharmony_ci std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_); 17094d6c458bSopenharmony_ci SequenceRunner *seqRunner = nullptr; 17104d6c458bSopenharmony_ci auto iter = globalSeqRunner_.find(name); 17114d6c458bSopenharmony_ci if (iter == globalSeqRunner_.end()) { 17124d6c458bSopenharmony_ci seqRunner = new SequenceRunner(); 17134d6c458bSopenharmony_ci // refresh priority default values on first creation 17144d6c458bSopenharmony_ci if (argc == 2) { // 2: The number of parameters is 2. 17154d6c458bSopenharmony_ci seqRunner->priority_ = static_cast<Priority>(priority); 17164d6c458bSopenharmony_ci } 17174d6c458bSopenharmony_ci seqRunner->isGlobalRunner_ = true; 17184d6c458bSopenharmony_ci seqRunner->seqName_ = name; 17194d6c458bSopenharmony_ci globalSeqRunner_.emplace(name, seqRunner); 17204d6c458bSopenharmony_ci } else { 17214d6c458bSopenharmony_ci seqRunner = iter->second; 17224d6c458bSopenharmony_ci if (priority != seqRunner->priority_) { 17234d6c458bSopenharmony_ci ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: priority can not changed."); 17244d6c458bSopenharmony_ci return nullptr; 17254d6c458bSopenharmony_ci } 17264d6c458bSopenharmony_ci } 17274d6c458bSopenharmony_ci seqRunner->count_++; 17284d6c458bSopenharmony_ci auto tmpIter = seqRunner->globalSeqRunnerRef_.find(env); 17294d6c458bSopenharmony_ci if (tmpIter == seqRunner->globalSeqRunnerRef_.end()) { 17304d6c458bSopenharmony_ci napi_ref gloableSeqRunnerRef = nullptr; 17314d6c458bSopenharmony_ci napi_create_reference(env, thisVar, 0, &gloableSeqRunnerRef); 17324d6c458bSopenharmony_ci seqRunner->globalSeqRunnerRef_.emplace(env, gloableSeqRunnerRef); 17334d6c458bSopenharmony_ci } 17344d6c458bSopenharmony_ci 17354d6c458bSopenharmony_ci return seqRunner; 17364d6c458bSopenharmony_ci} 17374d6c458bSopenharmony_ci 17384d6c458bSopenharmony_cibool SequenceRunnerManager::TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner) 17394d6c458bSopenharmony_ci{ 17404d6c458bSopenharmony_ci std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_); 17414d6c458bSopenharmony_ci if (seqRunner->isGlobalRunner_) { 17424d6c458bSopenharmony_ci auto iter = seqRunner->globalSeqRunnerRef_.find(env); 17434d6c458bSopenharmony_ci if (iter == seqRunner->globalSeqRunnerRef_.end()) { 17444d6c458bSopenharmony_ci return false; 17454d6c458bSopenharmony_ci } 17464d6c458bSopenharmony_ci napi_reference_unref(env, iter->second, nullptr); 17474d6c458bSopenharmony_ci } else { 17484d6c458bSopenharmony_ci napi_reference_unref(env, seqRunner->seqRunnerRef_, nullptr); 17494d6c458bSopenharmony_ci } 17504d6c458bSopenharmony_ci return true; 17514d6c458bSopenharmony_ci} 17524d6c458bSopenharmony_ci 17534d6c458bSopenharmony_ciuint64_t SequenceRunnerManager::DecreaseSeqCount(SequenceRunner* seqRunner) 17544d6c458bSopenharmony_ci{ 17554d6c458bSopenharmony_ci std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_); 17564d6c458bSopenharmony_ci return --(seqRunner->count_); 17574d6c458bSopenharmony_ci} 17584d6c458bSopenharmony_ci 17594d6c458bSopenharmony_civoid SequenceRunnerManager::RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner) 17604d6c458bSopenharmony_ci{ 17614d6c458bSopenharmony_ci std::lock_guard<std::mutex> lock(globalSeqRunnerMutex_); 17624d6c458bSopenharmony_ci auto iter = seqRunner->globalSeqRunnerRef_.find(env); 17634d6c458bSopenharmony_ci if (iter != seqRunner->globalSeqRunnerRef_.end()) { 17644d6c458bSopenharmony_ci napi_delete_reference(env, iter->second); 17654d6c458bSopenharmony_ci seqRunner->globalSeqRunnerRef_.erase(iter); 17664d6c458bSopenharmony_ci } 17674d6c458bSopenharmony_ci} 17684d6c458bSopenharmony_ci 17694d6c458bSopenharmony_civoid SequenceRunnerManager::RemoveSequenceRunner(const std::string &name) 17704d6c458bSopenharmony_ci{ 17714d6c458bSopenharmony_ci std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_); 17724d6c458bSopenharmony_ci auto iter = globalSeqRunner_.find(name.c_str()); 17734d6c458bSopenharmony_ci if (iter != globalSeqRunner_.end()) { 17744d6c458bSopenharmony_ci globalSeqRunner_.erase(iter->first); 17754d6c458bSopenharmony_ci } 17764d6c458bSopenharmony_ci} 17774d6c458bSopenharmony_ci 17784d6c458bSopenharmony_civoid SequenceRunnerManager::GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner) 17794d6c458bSopenharmony_ci{ 17804d6c458bSopenharmony_ci RemoveGlobalSeqRunnerRef(env, seqRunner); 17814d6c458bSopenharmony_ci if (DecreaseSeqCount(seqRunner) == 0) { 17824d6c458bSopenharmony_ci RemoveSequenceRunner(seqRunner->seqName_); 17834d6c458bSopenharmony_ci TaskGroupManager::GetInstance().RemoveSequenceRunner(seqRunner->seqRunnerId_); 17844d6c458bSopenharmony_ci delete seqRunner; 17854d6c458bSopenharmony_ci } 17864d6c458bSopenharmony_ci} 17874d6c458bSopenharmony_ci} // namespace Commonlibrary::Concurrent::TaskPoolModule