14d6c458bSopenharmony_ci/*
24d6c458bSopenharmony_ci * Copyright (c) 2022 Huawei Device Co., Ltd.
34d6c458bSopenharmony_ci * Licensed under the Apache License, Version 2.0 (the "License");
44d6c458bSopenharmony_ci * you may not use this file except in compliance with the License.
54d6c458bSopenharmony_ci * You may obtain a copy of the License at
64d6c458bSopenharmony_ci *
74d6c458bSopenharmony_ci *     http://www.apache.org/licenses/LICENSE-2.0
84d6c458bSopenharmony_ci *
94d6c458bSopenharmony_ci * Unless required by applicable law or agreed to in writing, software
104d6c458bSopenharmony_ci * distributed under the License is distributed on an "AS IS" BASIS,
114d6c458bSopenharmony_ci * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
124d6c458bSopenharmony_ci * See the License for the specific language governing permissions and
134d6c458bSopenharmony_ci * limitations under the License.
144d6c458bSopenharmony_ci */
154d6c458bSopenharmony_ci
164d6c458bSopenharmony_ci#include "task_manager.h"
174d6c458bSopenharmony_ci
184d6c458bSopenharmony_ci#include <cinttypes>
194d6c458bSopenharmony_ci#include <securec.h>
204d6c458bSopenharmony_ci#include <thread>
214d6c458bSopenharmony_ci
224d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT)
234d6c458bSopenharmony_ci#include "bundle_info.h"
244d6c458bSopenharmony_ci#include "bundle_mgr_interface.h"
254d6c458bSopenharmony_ci#include "bundle_mgr_proxy.h"
264d6c458bSopenharmony_ci#include "c/executor_task.h"
274d6c458bSopenharmony_ci#include "ffrt_inner.h"
284d6c458bSopenharmony_ci#include "iservice_registry.h"
294d6c458bSopenharmony_ci#include "parameters.h"
304d6c458bSopenharmony_ci#include "status_receiver_interface.h"
314d6c458bSopenharmony_ci#include "system_ability_definition.h"
324d6c458bSopenharmony_ci#endif
334d6c458bSopenharmony_ci#include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
344d6c458bSopenharmony_ci#include "helper/concurrent_helper.h"
354d6c458bSopenharmony_ci#include "helper/error_helper.h"
364d6c458bSopenharmony_ci#include "helper/hitrace_helper.h"
374d6c458bSopenharmony_ci#include "taskpool.h"
384d6c458bSopenharmony_ci#include "tools/log.h"
394d6c458bSopenharmony_ci#include "worker.h"
404d6c458bSopenharmony_ci
414d6c458bSopenharmony_cinamespace Commonlibrary::Concurrent::TaskPoolModule {
424d6c458bSopenharmony_ciusing namespace OHOS::JsSysModule;
434d6c458bSopenharmony_ci
444d6c458bSopenharmony_cistatic constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
454d6c458bSopenharmony_cistatic constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
464d6c458bSopenharmony_cistatic constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
474d6c458bSopenharmony_cistatic constexpr uint32_t STEP_SIZE = 2;
484d6c458bSopenharmony_cistatic constexpr uint32_t DEFAULT_THREADS = 3;
494d6c458bSopenharmony_cistatic constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle
504d6c458bSopenharmony_cistatic constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
514d6c458bSopenharmony_cistatic constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
524d6c458bSopenharmony_cistatic constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
534d6c458bSopenharmony_cistatic constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s
544d6c458bSopenharmony_cistatic constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time
554d6c458bSopenharmony_ci[[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread
564d6c458bSopenharmony_ci
574d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_EVENTHANDLER)
584d6c458bSopenharmony_cistatic const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = {
594d6c458bSopenharmony_ci    {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE},
604d6c458bSopenharmony_ci    {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW},
614d6c458bSopenharmony_ci    {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH},
624d6c458bSopenharmony_ci    {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE},
634d6c458bSopenharmony_ci};
644d6c458bSopenharmony_ci#endif
654d6c458bSopenharmony_ci
664d6c458bSopenharmony_ci// ----------------------------------- TaskManager ----------------------------------------
674d6c458bSopenharmony_ciTaskManager& TaskManager::GetInstance()
684d6c458bSopenharmony_ci{
694d6c458bSopenharmony_ci    static TaskManager manager;
704d6c458bSopenharmony_ci    return manager;
714d6c458bSopenharmony_ci}
724d6c458bSopenharmony_ci
734d6c458bSopenharmony_ciTaskManager::TaskManager()
744d6c458bSopenharmony_ci{
754d6c458bSopenharmony_ci    for (size_t i = 0; i < taskQueues_.size(); i++) {
764d6c458bSopenharmony_ci        std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
774d6c458bSopenharmony_ci        taskQueues_[i] = std::move(taskQueue);
784d6c458bSopenharmony_ci    }
794d6c458bSopenharmony_ci}
804d6c458bSopenharmony_ci
814d6c458bSopenharmony_ciTaskManager::~TaskManager()
824d6c458bSopenharmony_ci{
834d6c458bSopenharmony_ci    HILOG_INFO("taskpool:: ~TaskManager");
844d6c458bSopenharmony_ci    if (timer_ == nullptr) {
854d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: timer_ is nullptr");
864d6c458bSopenharmony_ci    } else {
874d6c458bSopenharmony_ci        uv_timer_stop(timer_);
884d6c458bSopenharmony_ci        ConcurrentHelper::UvHandleClose(timer_);
894d6c458bSopenharmony_ci        ConcurrentHelper::UvHandleClose(expandHandle_);
904d6c458bSopenharmony_ci    }
914d6c458bSopenharmony_ci
924d6c458bSopenharmony_ci    if (loop_ != nullptr) {
934d6c458bSopenharmony_ci        uv_stop(loop_);
944d6c458bSopenharmony_ci    }
954d6c458bSopenharmony_ci
964d6c458bSopenharmony_ci    {
974d6c458bSopenharmony_ci        std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
984d6c458bSopenharmony_ci        for (auto& worker : workers_) {
994d6c458bSopenharmony_ci            delete worker;
1004d6c458bSopenharmony_ci        }
1014d6c458bSopenharmony_ci        workers_.clear();
1024d6c458bSopenharmony_ci    }
1034d6c458bSopenharmony_ci
1044d6c458bSopenharmony_ci    {
1054d6c458bSopenharmony_ci        std::lock_guard<std::mutex> lock(callbackMutex_);
1064d6c458bSopenharmony_ci        for (auto& [_, callbackPtr] : callbackTable_) {
1074d6c458bSopenharmony_ci            if (callbackPtr == nullptr) {
1084d6c458bSopenharmony_ci                continue;
1094d6c458bSopenharmony_ci            }
1104d6c458bSopenharmony_ci            callbackPtr.reset();
1114d6c458bSopenharmony_ci        }
1124d6c458bSopenharmony_ci        callbackTable_.clear();
1134d6c458bSopenharmony_ci    }
1144d6c458bSopenharmony_ci
1154d6c458bSopenharmony_ci    {
1164d6c458bSopenharmony_ci        std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1174d6c458bSopenharmony_ci        for (auto& [_, task] : tasks_) {
1184d6c458bSopenharmony_ci            delete task;
1194d6c458bSopenharmony_ci            task = nullptr;
1204d6c458bSopenharmony_ci        }
1214d6c458bSopenharmony_ci        tasks_.clear();
1224d6c458bSopenharmony_ci    }
1234d6c458bSopenharmony_ci    CountTraceForWorker();
1244d6c458bSopenharmony_ci}
1254d6c458bSopenharmony_ci
1264d6c458bSopenharmony_civoid TaskManager::CountTraceForWorker()
1274d6c458bSopenharmony_ci{
1284d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
1294d6c458bSopenharmony_ci    int64_t threadNum = static_cast<int64_t>(workers_.size());
1304d6c458bSopenharmony_ci    int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
1314d6c458bSopenharmony_ci    int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
1324d6c458bSopenharmony_ci    HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
1334d6c458bSopenharmony_ci    HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
1344d6c458bSopenharmony_ci    HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
1354d6c458bSopenharmony_ci    HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
1364d6c458bSopenharmony_ci}
1374d6c458bSopenharmony_ci
1384d6c458bSopenharmony_cinapi_value TaskManager::GetThreadInfos(napi_env env)
1394d6c458bSopenharmony_ci{
1404d6c458bSopenharmony_ci    napi_value threadInfos = nullptr;
1414d6c458bSopenharmony_ci    napi_create_array(env, &threadInfos);
1424d6c458bSopenharmony_ci    {
1434d6c458bSopenharmony_ci        std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
1444d6c458bSopenharmony_ci        int32_t i = 0;
1454d6c458bSopenharmony_ci        for (auto& worker : workers_) {
1464d6c458bSopenharmony_ci            if (worker->workerEnv_ == nullptr) {
1474d6c458bSopenharmony_ci                continue;
1484d6c458bSopenharmony_ci            }
1494d6c458bSopenharmony_ci            napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->tid_));
1504d6c458bSopenharmony_ci            napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->priority_));
1514d6c458bSopenharmony_ci
1524d6c458bSopenharmony_ci            napi_value taskId = nullptr;
1534d6c458bSopenharmony_ci            napi_create_array(env, &taskId);
1544d6c458bSopenharmony_ci            int32_t j = 0;
1554d6c458bSopenharmony_ci            {
1564d6c458bSopenharmony_ci                std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
1574d6c458bSopenharmony_ci                for (auto& currentId : worker->currentTaskId_) {
1584d6c458bSopenharmony_ci                    napi_value id = NapiHelper::CreateUint32(env, currentId);
1594d6c458bSopenharmony_ci                    napi_set_element(env, taskId, j, id);
1604d6c458bSopenharmony_ci                    j++;
1614d6c458bSopenharmony_ci                }
1624d6c458bSopenharmony_ci            }
1634d6c458bSopenharmony_ci            napi_value threadInfo = nullptr;
1644d6c458bSopenharmony_ci            napi_create_object(env, &threadInfo);
1654d6c458bSopenharmony_ci            napi_set_named_property(env, threadInfo, "tid", tid);
1664d6c458bSopenharmony_ci            napi_set_named_property(env, threadInfo, "priority", priority);
1674d6c458bSopenharmony_ci            napi_set_named_property(env, threadInfo, "taskIds", taskId);
1684d6c458bSopenharmony_ci            napi_set_element(env, threadInfos, i, threadInfo);
1694d6c458bSopenharmony_ci            i++;
1704d6c458bSopenharmony_ci        }
1714d6c458bSopenharmony_ci    }
1724d6c458bSopenharmony_ci    return threadInfos;
1734d6c458bSopenharmony_ci}
1744d6c458bSopenharmony_ci
1754d6c458bSopenharmony_cinapi_value TaskManager::GetTaskInfos(napi_env env)
1764d6c458bSopenharmony_ci{
1774d6c458bSopenharmony_ci    napi_value taskInfos = nullptr;
1784d6c458bSopenharmony_ci    napi_create_array(env, &taskInfos);
1794d6c458bSopenharmony_ci    {
1804d6c458bSopenharmony_ci        std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1814d6c458bSopenharmony_ci        int32_t i = 0;
1824d6c458bSopenharmony_ci        for (const auto& [_, task] : tasks_) {
1834d6c458bSopenharmony_ci            if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED ||
1844d6c458bSopenharmony_ci                task->taskState_ == ExecuteState::FINISHED) {
1854d6c458bSopenharmony_ci                continue;
1864d6c458bSopenharmony_ci            }
1874d6c458bSopenharmony_ci            napi_value taskInfoValue = NapiHelper::CreateObject(env);
1884d6c458bSopenharmony_ci            std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
1894d6c458bSopenharmony_ci            napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_);
1904d6c458bSopenharmony_ci            napi_value name = nullptr;
1914d6c458bSopenharmony_ci            napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name);
1924d6c458bSopenharmony_ci            napi_set_named_property(env, taskInfoValue, "name", name);
1934d6c458bSopenharmony_ci            ExecuteState state = task->taskState_;
1944d6c458bSopenharmony_ci            uint64_t duration = 0;
1954d6c458bSopenharmony_ci            if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) {
1964d6c458bSopenharmony_ci                duration = ConcurrentHelper::GetMilliseconds() - task->startTime_;
1974d6c458bSopenharmony_ci            }
1984d6c458bSopenharmony_ci            napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state));
1994d6c458bSopenharmony_ci            napi_set_named_property(env, taskInfoValue, "taskId", taskId);
2004d6c458bSopenharmony_ci            napi_set_named_property(env, taskInfoValue, "state", stateValue);
2014d6c458bSopenharmony_ci            napi_value durationValue = NapiHelper::CreateUint32(env, duration);
2024d6c458bSopenharmony_ci            napi_set_named_property(env, taskInfoValue, "duration", durationValue);
2034d6c458bSopenharmony_ci            napi_set_element(env, taskInfos, i, taskInfoValue);
2044d6c458bSopenharmony_ci            i++;
2054d6c458bSopenharmony_ci        }
2064d6c458bSopenharmony_ci    }
2074d6c458bSopenharmony_ci    return taskInfos;
2084d6c458bSopenharmony_ci}
2094d6c458bSopenharmony_ci
2104d6c458bSopenharmony_civoid TaskManager::UpdateExecutedInfo(uint64_t duration)
2114d6c458bSopenharmony_ci{
2124d6c458bSopenharmony_ci    totalExecTime_ += duration;
2134d6c458bSopenharmony_ci    totalExecCount_++;
2144d6c458bSopenharmony_ci}
2154d6c458bSopenharmony_ci
2164d6c458bSopenharmony_ciuint32_t TaskManager::ComputeSuitableThreadNum()
2174d6c458bSopenharmony_ci{
2184d6c458bSopenharmony_ci    uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers();
2194d6c458bSopenharmony_ci    return targetNum;
2204d6c458bSopenharmony_ci}
2214d6c458bSopenharmony_ci
2224d6c458bSopenharmony_ciuint32_t TaskManager::ComputeSuitableIdleNum()
2234d6c458bSopenharmony_ci{
2244d6c458bSopenharmony_ci    uint32_t targetNum = 0;
2254d6c458bSopenharmony_ci    if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) {
2264d6c458bSopenharmony_ci        // this branch is used for avoiding time-consuming tasks that may block the taskpool
2274d6c458bSopenharmony_ci        targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum());
2284d6c458bSopenharmony_ci    } else if (totalExecCount_ != 0) {
2294d6c458bSopenharmony_ci        auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
2304d6c458bSopenharmony_ci        uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION);
2314d6c458bSopenharmony_ci        targetNum = std::min(result, GetNonIdleTaskNum());
2324d6c458bSopenharmony_ci    }
2334d6c458bSopenharmony_ci    return targetNum;
2344d6c458bSopenharmony_ci}
2354d6c458bSopenharmony_ci
2364d6c458bSopenharmony_civoid TaskManager::CheckForBlockedWorkers()
2374d6c458bSopenharmony_ci{
2384d6c458bSopenharmony_ci    // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
2394d6c458bSopenharmony_ci    // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
2404d6c458bSopenharmony_ci    // else we will choose the longer one
2414d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
2424d6c458bSopenharmony_ci    bool needChecking = false;
2434d6c458bSopenharmony_ci    bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
2444d6c458bSopenharmony_ci    uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
2454d6c458bSopenharmony_ci    for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
2464d6c458bSopenharmony_ci        auto worker = *iter;
2474d6c458bSopenharmony_ci        // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
2484d6c458bSopenharmony_ci        // if the worker is executing the longTask, we will not do the check
2494d6c458bSopenharmony_ci        if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) ||
2504d6c458bSopenharmony_ci            (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
2514d6c458bSopenharmony_ci            !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
2524d6c458bSopenharmony_ci            continue;
2534d6c458bSopenharmony_ci        }
2544d6c458bSopenharmony_ci        // When executing the promise task, the worker state may not be updated and will be
2554d6c458bSopenharmony_ci        // marked as 'BLOCKED', so we should exclude this situation.
2564d6c458bSopenharmony_ci        // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
2574d6c458bSopenharmony_ci        // the task like I/O in uv threads, we should also exclude this situation.
2584d6c458bSopenharmony_ci        auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
2594d6c458bSopenharmony_ci        if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
2604d6c458bSopenharmony_ci            if (!workerEngine->HasWaitingRequest()) {
2614d6c458bSopenharmony_ci                worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
2624d6c458bSopenharmony_ci            } else {
2634d6c458bSopenharmony_ci                worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
2644d6c458bSopenharmony_ci                worker->startTime_ = ConcurrentHelper::GetMilliseconds();
2654d6c458bSopenharmony_ci            }
2664d6c458bSopenharmony_ci            continue;
2674d6c458bSopenharmony_ci        }
2684d6c458bSopenharmony_ci
2694d6c458bSopenharmony_ci        HILOG_INFO("taskpool:: The worker has been marked as timeout.");
2704d6c458bSopenharmony_ci        // If the current worker has a longTask and is not executing, we will only interrupt it.
2714d6c458bSopenharmony_ci        if (worker->HasLongTask()) {
2724d6c458bSopenharmony_ci            continue;
2734d6c458bSopenharmony_ci        }
2744d6c458bSopenharmony_ci        needChecking = true;
2754d6c458bSopenharmony_ci        idleWorkers_.erase(worker);
2764d6c458bSopenharmony_ci        timeoutWorkers_.insert(worker);
2774d6c458bSopenharmony_ci    }
2784d6c458bSopenharmony_ci    // should trigger the check when we have marked and removed workers
2794d6c458bSopenharmony_ci    if (UNLIKELY(needChecking)) {
2804d6c458bSopenharmony_ci        TryExpand();
2814d6c458bSopenharmony_ci    }
2824d6c458bSopenharmony_ci}
2834d6c458bSopenharmony_ci
2844d6c458bSopenharmony_civoid TaskManager::TryTriggerExpand()
2854d6c458bSopenharmony_ci{
2864d6c458bSopenharmony_ci    // post the signal to notify the monitor thread to expand
2874d6c458bSopenharmony_ci    if (UNLIKELY(!isHandleInited_)) {
2884d6c458bSopenharmony_ci        NotifyExecuteTask();
2894d6c458bSopenharmony_ci        needChecking_ = true;
2904d6c458bSopenharmony_ci        HILOG_DEBUG("taskpool:: the expandHandle_ is nullptr");
2914d6c458bSopenharmony_ci        return;
2924d6c458bSopenharmony_ci    }
2934d6c458bSopenharmony_ci    uv_async_send(expandHandle_);
2944d6c458bSopenharmony_ci}
2954d6c458bSopenharmony_ci
2964d6c458bSopenharmony_ci#if defined(OHOS_PLATFORM)
2974d6c458bSopenharmony_ci// read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
2984d6c458bSopenharmony_cibool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size)
2994d6c458bSopenharmony_ci{
3004d6c458bSopenharmony_ci    char path[128]; // 128: buffer for path
3014d6c458bSopenharmony_ci    pid_t pid = getpid();
3024d6c458bSopenharmony_ci    ssize_t bytesLen = -1;
3034d6c458bSopenharmony_ci    int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
3044d6c458bSopenharmony_ci    if (ret < 0) {
3054d6c458bSopenharmony_ci        HILOG_ERROR("snprintf_s failed");
3064d6c458bSopenharmony_ci        return false;
3074d6c458bSopenharmony_ci    }
3084d6c458bSopenharmony_ci    int fd = open(path, O_RDONLY | O_NONBLOCK);
3094d6c458bSopenharmony_ci    if (UNLIKELY(fd == -1)) {
3104d6c458bSopenharmony_ci        return false;
3114d6c458bSopenharmony_ci    }
3124d6c458bSopenharmony_ci    bytesLen = read(fd, buf, size - 1);
3134d6c458bSopenharmony_ci    close(fd);
3144d6c458bSopenharmony_ci    if (bytesLen <= 0) {
3154d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: failed to read %{public}s", path);
3164d6c458bSopenharmony_ci        return false;
3174d6c458bSopenharmony_ci    }
3184d6c458bSopenharmony_ci    buf[bytesLen] = '\0';
3194d6c458bSopenharmony_ci    return true;
3204d6c458bSopenharmony_ci}
3214d6c458bSopenharmony_ci
3224d6c458bSopenharmony_ciuint32_t TaskManager::GetIdleWorkers()
3234d6c458bSopenharmony_ci{
3244d6c458bSopenharmony_ci    char buf[4096]; // 4096: buffer for thread info
3254d6c458bSopenharmony_ci    uint32_t idleCount = 0;
3264d6c458bSopenharmony_ci    std::unordered_set<pid_t> tids {};
3274d6c458bSopenharmony_ci    {
3284d6c458bSopenharmony_ci        std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
3294d6c458bSopenharmony_ci        for (auto& worker : idleWorkers_) {
3304d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT)
3314d6c458bSopenharmony_ci            if (worker->ffrtTaskHandle_ != nullptr) {
3324d6c458bSopenharmony_ci                if (worker->GetWaitTime() > 0) {
3334d6c458bSopenharmony_ci                    idleCount++;
3344d6c458bSopenharmony_ci                }
3354d6c458bSopenharmony_ci                continue;
3364d6c458bSopenharmony_ci            }
3374d6c458bSopenharmony_ci#endif
3384d6c458bSopenharmony_ci            tids.emplace(worker->tid_);
3394d6c458bSopenharmony_ci        }
3404d6c458bSopenharmony_ci    }
3414d6c458bSopenharmony_ci    // The ffrt thread does not read thread info
3424d6c458bSopenharmony_ci    for (auto tid : tids) {
3434d6c458bSopenharmony_ci        if (!ReadThreadInfo(tid, buf, sizeof(buf))) {
3444d6c458bSopenharmony_ci            continue;
3454d6c458bSopenharmony_ci        }
3464d6c458bSopenharmony_ci        char state;
3474d6c458bSopenharmony_ci        if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
3484d6c458bSopenharmony_ci            HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
3494d6c458bSopenharmony_ci            return 0;
3504d6c458bSopenharmony_ci        }
3514d6c458bSopenharmony_ci        if (state == 'S') {
3524d6c458bSopenharmony_ci            idleCount++;
3534d6c458bSopenharmony_ci        }
3544d6c458bSopenharmony_ci    }
3554d6c458bSopenharmony_ci    return idleCount;
3564d6c458bSopenharmony_ci}
3574d6c458bSopenharmony_ci
3584d6c458bSopenharmony_civoid TaskManager::GetIdleWorkersList(uint32_t step)
3594d6c458bSopenharmony_ci{
3604d6c458bSopenharmony_ci    char buf[4096]; // 4096: buffer for thread info
3614d6c458bSopenharmony_ci    for (auto& worker : idleWorkers_) {
3624d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT)
3634d6c458bSopenharmony_ci        if (worker->ffrtTaskHandle_ != nullptr) {
3644d6c458bSopenharmony_ci            uint64_t workerWaitTime = worker->GetWaitTime();
3654d6c458bSopenharmony_ci            bool isWorkerLoopActive = worker->IsLoopActive();
3664d6c458bSopenharmony_ci            if (workerWaitTime == 0) {
3674d6c458bSopenharmony_ci                continue;
3684d6c458bSopenharmony_ci            }
3694d6c458bSopenharmony_ci            uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
3704d6c458bSopenharmony_ci                std::chrono::steady_clock::now().time_since_epoch()).count());
3714d6c458bSopenharmony_ci            if (!isWorkerLoopActive) {
3724d6c458bSopenharmony_ci                freeList_.emplace_back(worker);
3734d6c458bSopenharmony_ci            } else if ((currTime - workerWaitTime) > IDLE_THRESHOLD * TRIGGER_INTERVAL) {
3744d6c458bSopenharmony_ci                freeList_.emplace_back(worker);
3754d6c458bSopenharmony_ci                HILOG_INFO("taskpool:: worker in ffrt epoll wait more than 2 intervals, force to free.");
3764d6c458bSopenharmony_ci            } else {
3774d6c458bSopenharmony_ci                // worker uv alive, and will be free in 2 intervals if not wake
3784d6c458bSopenharmony_ci                HILOG_INFO("taskpool:: worker will be free if not wake.");
3794d6c458bSopenharmony_ci            }
3804d6c458bSopenharmony_ci            continue;
3814d6c458bSopenharmony_ci        }
3824d6c458bSopenharmony_ci#endif
3834d6c458bSopenharmony_ci        if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) {
3844d6c458bSopenharmony_ci            continue;
3854d6c458bSopenharmony_ci        }
3864d6c458bSopenharmony_ci        char state;
3874d6c458bSopenharmony_ci        uint64_t utime;
3884d6c458bSopenharmony_ci        if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
3894d6c458bSopenharmony_ci            &state, sizeof(state), &utime) != 2) { // 2: state and utime
3904d6c458bSopenharmony_ci            HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
3914d6c458bSopenharmony_ci            return;
3924d6c458bSopenharmony_ci        }
3934d6c458bSopenharmony_ci        if (state != 'S' || utime != worker->lastCpuTime_) {
3944d6c458bSopenharmony_ci            worker->idleCount_ = 0;
3954d6c458bSopenharmony_ci            worker->lastCpuTime_ = utime;
3964d6c458bSopenharmony_ci            continue;
3974d6c458bSopenharmony_ci        }
3984d6c458bSopenharmony_ci        if (++worker->idleCount_ >= IDLE_THRESHOLD) {
3994d6c458bSopenharmony_ci            freeList_.emplace_back(worker);
4004d6c458bSopenharmony_ci        }
4014d6c458bSopenharmony_ci    }
4024d6c458bSopenharmony_ci}
4034d6c458bSopenharmony_ci
4044d6c458bSopenharmony_civoid TaskManager::TriggerShrink(uint32_t step)
4054d6c458bSopenharmony_ci{
4064d6c458bSopenharmony_ci    GetIdleWorkersList(step);
4074d6c458bSopenharmony_ci    step = std::min(step, static_cast<uint32_t>(freeList_.size()));
4084d6c458bSopenharmony_ci    uint32_t count = 0;
4094d6c458bSopenharmony_ci    for (size_t i = 0; i < freeList_.size(); i++) {
4104d6c458bSopenharmony_ci        auto worker = freeList_[i];
4114d6c458bSopenharmony_ci        if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) {
4124d6c458bSopenharmony_ci            continue;
4134d6c458bSopenharmony_ci        }
4144d6c458bSopenharmony_ci        auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
4154d6c458bSopenharmony_ci        if (idleTime < MAX_IDLE_TIME || worker->HasRunningTasks()) {
4164d6c458bSopenharmony_ci            continue;
4174d6c458bSopenharmony_ci        }
4184d6c458bSopenharmony_ci        idleWorkers_.erase(worker);
4194d6c458bSopenharmony_ci        HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
4204d6c458bSopenharmony_ci        worker->PostReleaseSignal();
4214d6c458bSopenharmony_ci        if (++count == step) {
4224d6c458bSopenharmony_ci            break;
4234d6c458bSopenharmony_ci        }
4244d6c458bSopenharmony_ci    }
4254d6c458bSopenharmony_ci    freeList_.clear();
4264d6c458bSopenharmony_ci}
4274d6c458bSopenharmony_ci#else
4284d6c458bSopenharmony_ciuint32_t TaskManager::GetIdleWorkers()
4294d6c458bSopenharmony_ci{
4304d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
4314d6c458bSopenharmony_ci    return idleWorkers_.size();
4324d6c458bSopenharmony_ci}
4334d6c458bSopenharmony_ci
4344d6c458bSopenharmony_civoid TaskManager::TriggerShrink(uint32_t step)
4354d6c458bSopenharmony_ci{
4364d6c458bSopenharmony_ci    for (uint32_t i = 0; i < step; i++) {
4374d6c458bSopenharmony_ci        // try to free the worker that idle time meets the requirement
4384d6c458bSopenharmony_ci        auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker *worker) {
4394d6c458bSopenharmony_ci            auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
4404d6c458bSopenharmony_ci            return idleTime > MAX_IDLE_TIME && !worker->HasRunningTasks() && !worker->HasLongTask();
4414d6c458bSopenharmony_ci        });
4424d6c458bSopenharmony_ci        // remove it from all sets
4434d6c458bSopenharmony_ci        if (iter != idleWorkers_.end()) {
4444d6c458bSopenharmony_ci            auto worker = *iter;
4454d6c458bSopenharmony_ci            idleWorkers_.erase(worker);
4464d6c458bSopenharmony_ci            HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
4474d6c458bSopenharmony_ci            worker->PostReleaseSignal();
4484d6c458bSopenharmony_ci        }
4494d6c458bSopenharmony_ci    }
4504d6c458bSopenharmony_ci}
4514d6c458bSopenharmony_ci#endif
4524d6c458bSopenharmony_ci
4534d6c458bSopenharmony_civoid TaskManager::NotifyShrink(uint32_t targetNum)
4544d6c458bSopenharmony_ci{
4554d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
4564d6c458bSopenharmony_ci    uint32_t workerCount = workers_.size();
4574d6c458bSopenharmony_ci    uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS;
4584d6c458bSopenharmony_ci    if (minThread == 0) {
4594d6c458bSopenharmony_ci        HILOG_INFO("taskpool:: the system now is under low memory");
4604d6c458bSopenharmony_ci    }
4614d6c458bSopenharmony_ci    if (workerCount > minThread && workerCount > targetNum) {
4624d6c458bSopenharmony_ci        targetNum = std::max(minThread, targetNum);
4634d6c458bSopenharmony_ci        uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP);
4644d6c458bSopenharmony_ci        TriggerShrink(step);
4654d6c458bSopenharmony_ci    }
4664d6c458bSopenharmony_ci    // remove all timeout workers
4674d6c458bSopenharmony_ci    for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
4684d6c458bSopenharmony_ci        auto worker = *iter;
4694d6c458bSopenharmony_ci        if (workers_.find(worker) == workers_.end()) {
4704d6c458bSopenharmony_ci            HILOG_WARN("taskpool:: current worker maybe release");
4714d6c458bSopenharmony_ci            iter = timeoutWorkers_.erase(iter);
4724d6c458bSopenharmony_ci        } else if (!worker->HasRunningTasks()) {
4734d6c458bSopenharmony_ci            HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", worker->tid_);
4744d6c458bSopenharmony_ci            worker->PostReleaseSignal();
4754d6c458bSopenharmony_ci            timeoutWorkers_.erase(iter++);
4764d6c458bSopenharmony_ci            return;
4774d6c458bSopenharmony_ci        } else {
4784d6c458bSopenharmony_ci            iter++;
4794d6c458bSopenharmony_ci        }
4804d6c458bSopenharmony_ci    }
4814d6c458bSopenharmony_ci    uint32_t idleNum = idleWorkers_.size();
4824d6c458bSopenharmony_ci    // System memory state is moderate and the worker has exeuted tasks, we will try to release it
4834d6c458bSopenharmony_ci    if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) {
4844d6c458bSopenharmony_ci        auto worker = *(idleWorkers_.begin());
4854d6c458bSopenharmony_ci        // worker that has longTask should not be released
4864d6c458bSopenharmony_ci        if (worker == nullptr || worker->HasLongTask()) {
4874d6c458bSopenharmony_ci            return;
4884d6c458bSopenharmony_ci        }
4894d6c458bSopenharmony_ci        if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released
4904d6c458bSopenharmony_ci            TriggerShrink(DEFAULT_MIN_THREADS);
4914d6c458bSopenharmony_ci            return;
4924d6c458bSopenharmony_ci        }
4934d6c458bSopenharmony_ci    }
4944d6c458bSopenharmony_ci
4954d6c458bSopenharmony_ci    // Create a worker for performance
4964d6c458bSopenharmony_ci    if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) {
4974d6c458bSopenharmony_ci        CreateWorkers(hostEnv_);
4984d6c458bSopenharmony_ci    }
4994d6c458bSopenharmony_ci    // stop the timer
5004d6c458bSopenharmony_ci    if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) {
5014d6c458bSopenharmony_ci        suspend_ = true;
5024d6c458bSopenharmony_ci        uv_timer_stop(timer_);
5034d6c458bSopenharmony_ci        HILOG_DEBUG("taskpool:: timer will be suspended");
5044d6c458bSopenharmony_ci    }
5054d6c458bSopenharmony_ci}
5064d6c458bSopenharmony_ci
5074d6c458bSopenharmony_civoid TaskManager::TriggerLoadBalance(const uv_timer_t* req)
5084d6c458bSopenharmony_ci{
5094d6c458bSopenharmony_ci    TaskManager& taskManager = TaskManager::GetInstance();
5104d6c458bSopenharmony_ci    taskManager.CheckForBlockedWorkers();
5114d6c458bSopenharmony_ci    uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
5124d6c458bSopenharmony_ci    taskManager.NotifyShrink(targetNum);
5134d6c458bSopenharmony_ci    taskManager.CountTraceForWorker();
5144d6c458bSopenharmony_ci}
5154d6c458bSopenharmony_ci
5164d6c458bSopenharmony_civoid TaskManager::TryExpand()
5174d6c458bSopenharmony_ci{
5184d6c458bSopenharmony_ci    // dispatch task in the TaskPoolManager thread
5194d6c458bSopenharmony_ci    NotifyExecuteTask();
5204d6c458bSopenharmony_ci    // do not trigger when there are more idleWorkers than tasks
5214d6c458bSopenharmony_ci    uint32_t idleNum = GetIdleWorkers();
5224d6c458bSopenharmony_ci    if (idleNum > GetNonIdleTaskNum()) {
5234d6c458bSopenharmony_ci        return;
5244d6c458bSopenharmony_ci    }
5254d6c458bSopenharmony_ci    needChecking_ = false; // do not need to check
5264d6c458bSopenharmony_ci    uint32_t targetNum = ComputeSuitableIdleNum();
5274d6c458bSopenharmony_ci    uint32_t workerCount = 0;
5284d6c458bSopenharmony_ci    uint32_t idleCount = 0;
5294d6c458bSopenharmony_ci    uint32_t timeoutWorkers = 0;
5304d6c458bSopenharmony_ci    {
5314d6c458bSopenharmony_ci        std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
5324d6c458bSopenharmony_ci        idleCount = idleWorkers_.size();
5334d6c458bSopenharmony_ci        workerCount = workers_.size();
5344d6c458bSopenharmony_ci        timeoutWorkers = timeoutWorkers_.size();
5354d6c458bSopenharmony_ci    }
5364d6c458bSopenharmony_ci    uint32_t maxThreads = std::max(ConcurrentHelper::GetMaxThreads(), DEFAULT_THREADS);
5374d6c458bSopenharmony_ci    maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads
5384d6c458bSopenharmony_ci    if (workerCount < maxThreads && idleCount < targetNum) {
5394d6c458bSopenharmony_ci        uint32_t step = std::min(maxThreads, targetNum) - idleCount;
5404d6c458bSopenharmony_ci        // Prevent the total number of expanded threads from exceeding maxThreads
5414d6c458bSopenharmony_ci        if (step + workerCount > maxThreads) {
5424d6c458bSopenharmony_ci            step = maxThreads - workerCount;
5434d6c458bSopenharmony_ci        }
5444d6c458bSopenharmony_ci        CreateWorkers(hostEnv_, step);
5454d6c458bSopenharmony_ci        HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
5464d6c458bSopenharmony_ci            maxThreads, step, GetThreadNum());
5474d6c458bSopenharmony_ci    }
5484d6c458bSopenharmony_ci    if (UNLIKELY(suspend_)) {
5494d6c458bSopenharmony_ci        suspend_ = false;
5504d6c458bSopenharmony_ci        uv_timer_again(timer_);
5514d6c458bSopenharmony_ci    }
5524d6c458bSopenharmony_ci}
5534d6c458bSopenharmony_ci
5544d6c458bSopenharmony_civoid TaskManager::NotifyExpand(const uv_async_t* req)
5554d6c458bSopenharmony_ci{
5564d6c458bSopenharmony_ci    TaskManager& taskManager = TaskManager::GetInstance();
5574d6c458bSopenharmony_ci    taskManager.TryExpand();
5584d6c458bSopenharmony_ci}
5594d6c458bSopenharmony_ci
5604d6c458bSopenharmony_civoid TaskManager::RunTaskManager()
5614d6c458bSopenharmony_ci{
5624d6c458bSopenharmony_ci    loop_ = uv_loop_new();
5634d6c458bSopenharmony_ci    if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE
5644d6c458bSopenharmony_ci        HILOG_FATAL("taskpool:: new loop failed.");
5654d6c458bSopenharmony_ci        return;
5664d6c458bSopenharmony_ci    }
5674d6c458bSopenharmony_ci    ConcurrentHelper::UvHandleInit(loop_, expandHandle_, TaskManager::NotifyExpand);
5684d6c458bSopenharmony_ci    timer_ = new uv_timer_t;
5694d6c458bSopenharmony_ci    uv_timer_init(loop_, timer_);
5704d6c458bSopenharmony_ci    uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL);
5714d6c458bSopenharmony_ci    isHandleInited_ = true;
5724d6c458bSopenharmony_ci#if defined IOS_PLATFORM || defined MAC_PLATFORM
5734d6c458bSopenharmony_ci    pthread_setname_np("OS_TaskManager");
5744d6c458bSopenharmony_ci#else
5754d6c458bSopenharmony_ci    pthread_setname_np(pthread_self(), "OS_TaskManager");
5764d6c458bSopenharmony_ci#endif
5774d6c458bSopenharmony_ci    if (UNLIKELY(needChecking_)) {
5784d6c458bSopenharmony_ci        needChecking_ = false;
5794d6c458bSopenharmony_ci        uv_async_send(expandHandle_);
5804d6c458bSopenharmony_ci    }
5814d6c458bSopenharmony_ci    uv_run(loop_, UV_RUN_DEFAULT);
5824d6c458bSopenharmony_ci    if (loop_ != nullptr) {
5834d6c458bSopenharmony_ci        uv_loop_delete(loop_);
5844d6c458bSopenharmony_ci    }
5854d6c458bSopenharmony_ci}
5864d6c458bSopenharmony_ci
5874d6c458bSopenharmony_civoid TaskManager::CancelTask(napi_env env, uint64_t taskId)
5884d6c458bSopenharmony_ci{
5894d6c458bSopenharmony_ci    // 1. Cannot find taskInfo by executeId, throw error
5904d6c458bSopenharmony_ci    // 2. Find executing taskInfo, skip it
5914d6c458bSopenharmony_ci    // 3. Find waiting taskInfo, cancel it
5924d6c458bSopenharmony_ci    // 4. Find canceled taskInfo, skip it
5934d6c458bSopenharmony_ci    std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId);
5944d6c458bSopenharmony_ci    HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
5954d6c458bSopenharmony_ci    HITRACE_HELPER_METER_NAME(strTrace);
5964d6c458bSopenharmony_ci    Task* task = GetTask(taskId);
5974d6c458bSopenharmony_ci    if (task == nullptr) {
5984d6c458bSopenharmony_ci        std::string errMsg = "taskpool:: the task may not exist";
5994d6c458bSopenharmony_ci        HILOG_ERROR("%{public}s", errMsg.c_str());
6004d6c458bSopenharmony_ci        ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
6014d6c458bSopenharmony_ci        return;
6024d6c458bSopenharmony_ci    }
6034d6c458bSopenharmony_ci    if (task->taskState_ == ExecuteState::CANCELED) {
6044d6c458bSopenharmony_ci        HILOG_DEBUG("taskpool:: task has been canceled");
6054d6c458bSopenharmony_ci        return;
6064d6c458bSopenharmony_ci    }
6074d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
6084d6c458bSopenharmony_ci    if (task->IsPeriodicTask()) {
6094d6c458bSopenharmony_ci        task->CancelPendingTask(env);
6104d6c458bSopenharmony_ci        uv_timer_stop(task->timer_);
6114d6c458bSopenharmony_ci        uv_close(reinterpret_cast<uv_handle_t*>(task->timer_), [](uv_handle_t* handle) {
6124d6c458bSopenharmony_ci            delete (uv_timer_t*)handle;
6134d6c458bSopenharmony_ci            handle = nullptr;
6144d6c458bSopenharmony_ci        });
6154d6c458bSopenharmony_ci        return;
6164d6c458bSopenharmony_ci    } else if (task->IsSeqRunnerTask()) {
6174d6c458bSopenharmony_ci        CancelSeqRunnerTask(env, task);
6184d6c458bSopenharmony_ci        return;
6194d6c458bSopenharmony_ci    }
6204d6c458bSopenharmony_ci    if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
6214d6c458bSopenharmony_ci        task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
6224d6c458bSopenharmony_ci        task->taskState_ == ExecuteState::ENDING) {
6234d6c458bSopenharmony_ci        std::string errMsg = "taskpool:: task is not executed or has been executed";
6244d6c458bSopenharmony_ci        HILOG_ERROR("%{public}s", errMsg.c_str());
6254d6c458bSopenharmony_ci        ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
6264d6c458bSopenharmony_ci        return;
6274d6c458bSopenharmony_ci    }
6284d6c458bSopenharmony_ci
6294d6c458bSopenharmony_ci    task->ClearDelayedTimers();
6304d6c458bSopenharmony_ci    ExecuteState state = task->taskState_.exchange(ExecuteState::CANCELED);
6314d6c458bSopenharmony_ci    task->CancelPendingTask(env);
6324d6c458bSopenharmony_ci    if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
6334d6c458bSopenharmony_ci        reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
6344d6c458bSopenharmony_ci        task->DecreaseTaskRefCount();
6354d6c458bSopenharmony_ci        EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
6364d6c458bSopenharmony_ci        napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: task has been canceled");
6374d6c458bSopenharmony_ci        napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
6384d6c458bSopenharmony_ci        napi_reference_unref(env, task->taskRef_, nullptr);
6394d6c458bSopenharmony_ci        delete task->currentTaskInfo_;
6404d6c458bSopenharmony_ci        task->currentTaskInfo_ = nullptr;
6414d6c458bSopenharmony_ci    }
6424d6c458bSopenharmony_ci}
6434d6c458bSopenharmony_ci
6444d6c458bSopenharmony_civoid TaskManager::CancelSeqRunnerTask(napi_env env, Task *task)
6454d6c458bSopenharmony_ci{
6464d6c458bSopenharmony_ci    if (task->taskState_ == ExecuteState::FINISHED) {
6474d6c458bSopenharmony_ci        std::string errMsg = "taskpool:: sequenceRunner task has been executed";
6484d6c458bSopenharmony_ci        HILOG_ERROR("%{public}s", errMsg.c_str());
6494d6c458bSopenharmony_ci        ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
6504d6c458bSopenharmony_ci    } else {
6514d6c458bSopenharmony_ci        task->taskState_ = ExecuteState::CANCELED;
6524d6c458bSopenharmony_ci    }
6534d6c458bSopenharmony_ci}
6544d6c458bSopenharmony_ci
6554d6c458bSopenharmony_civoid TaskManager::NotifyWorkerIdle(Worker* worker)
6564d6c458bSopenharmony_ci{
6574d6c458bSopenharmony_ci    {
6584d6c458bSopenharmony_ci        std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
6594d6c458bSopenharmony_ci        if (worker->state_ == WorkerState::BLOCKED) {
6604d6c458bSopenharmony_ci            return;
6614d6c458bSopenharmony_ci        }
6624d6c458bSopenharmony_ci        idleWorkers_.insert(worker);
6634d6c458bSopenharmony_ci    }
6644d6c458bSopenharmony_ci    if (GetTaskNum() != 0) {
6654d6c458bSopenharmony_ci        NotifyExecuteTask();
6664d6c458bSopenharmony_ci    }
6674d6c458bSopenharmony_ci    CountTraceForWorker();
6684d6c458bSopenharmony_ci}
6694d6c458bSopenharmony_ci
6704d6c458bSopenharmony_civoid TaskManager::NotifyWorkerCreated(Worker* worker)
6714d6c458bSopenharmony_ci{
6724d6c458bSopenharmony_ci    NotifyWorkerIdle(worker);
6734d6c458bSopenharmony_ci}
6744d6c458bSopenharmony_ci
6754d6c458bSopenharmony_civoid TaskManager::NotifyWorkerAdded(Worker* worker)
6764d6c458bSopenharmony_ci{
6774d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
6784d6c458bSopenharmony_ci    workers_.insert(worker);
6794d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
6804d6c458bSopenharmony_ci}
6814d6c458bSopenharmony_ci
6824d6c458bSopenharmony_civoid TaskManager::NotifyWorkerRunning(Worker* worker)
6834d6c458bSopenharmony_ci{
6844d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
6854d6c458bSopenharmony_ci    idleWorkers_.erase(worker);
6864d6c458bSopenharmony_ci    CountTraceForWorker();
6874d6c458bSopenharmony_ci}
6884d6c458bSopenharmony_ci
6894d6c458bSopenharmony_ciuint32_t TaskManager::GetRunningWorkers()
6904d6c458bSopenharmony_ci{
6914d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
6924d6c458bSopenharmony_ci    return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
6934d6c458bSopenharmony_ci        return worker->HasRunningTasks();
6944d6c458bSopenharmony_ci    });
6954d6c458bSopenharmony_ci}
6964d6c458bSopenharmony_ci
6974d6c458bSopenharmony_ciuint32_t TaskManager::GetTimeoutWorkers()
6984d6c458bSopenharmony_ci{
6994d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
7004d6c458bSopenharmony_ci    return timeoutWorkers_.size();
7014d6c458bSopenharmony_ci}
7024d6c458bSopenharmony_ci
7034d6c458bSopenharmony_ciuint32_t TaskManager::GetTaskNum()
7044d6c458bSopenharmony_ci{
7054d6c458bSopenharmony_ci    std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_);
7064d6c458bSopenharmony_ci    uint32_t sum = 0;
7074d6c458bSopenharmony_ci    for (const auto& elements : taskQueues_) {
7084d6c458bSopenharmony_ci        sum += elements->GetTaskNum();
7094d6c458bSopenharmony_ci    }
7104d6c458bSopenharmony_ci    return sum;
7114d6c458bSopenharmony_ci}
7124d6c458bSopenharmony_ci
7134d6c458bSopenharmony_ciuint32_t TaskManager::GetNonIdleTaskNum()
7144d6c458bSopenharmony_ci{
7154d6c458bSopenharmony_ci    return nonIdleTaskNum_;
7164d6c458bSopenharmony_ci}
7174d6c458bSopenharmony_ci
7184d6c458bSopenharmony_civoid TaskManager::IncreaseNumIfNoIdle(Priority priority)
7194d6c458bSopenharmony_ci{
7204d6c458bSopenharmony_ci    if (priority != Priority::IDLE) {
7214d6c458bSopenharmony_ci        ++nonIdleTaskNum_;
7224d6c458bSopenharmony_ci    }
7234d6c458bSopenharmony_ci}
7244d6c458bSopenharmony_ci
7254d6c458bSopenharmony_civoid TaskManager::DecreaseNumIfNoIdle(Priority priority)
7264d6c458bSopenharmony_ci{
7274d6c458bSopenharmony_ci    if (priority != Priority::IDLE) {
7284d6c458bSopenharmony_ci        --nonIdleTaskNum_;
7294d6c458bSopenharmony_ci    }
7304d6c458bSopenharmony_ci}
7314d6c458bSopenharmony_ci
7324d6c458bSopenharmony_ciuint32_t TaskManager::GetThreadNum()
7334d6c458bSopenharmony_ci{
7344d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
7354d6c458bSopenharmony_ci    return workers_.size();
7364d6c458bSopenharmony_ci}
7374d6c458bSopenharmony_ci
7384d6c458bSopenharmony_civoid TaskManager::EnqueueTaskId(uint64_t taskId, Priority priority)
7394d6c458bSopenharmony_ci{
7404d6c458bSopenharmony_ci    {
7414d6c458bSopenharmony_ci        std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_);
7424d6c458bSopenharmony_ci        IncreaseNumIfNoIdle(priority);
7434d6c458bSopenharmony_ci        taskQueues_[priority]->EnqueueTaskId(taskId);
7444d6c458bSopenharmony_ci    }
7454d6c458bSopenharmony_ci    TryTriggerExpand();
7464d6c458bSopenharmony_ci    Task* task = GetTask(taskId);
7474d6c458bSopenharmony_ci    if (task == nullptr) {
7484d6c458bSopenharmony_ci        HILOG_FATAL("taskpool:: task is nullptr");
7494d6c458bSopenharmony_ci        return;
7504d6c458bSopenharmony_ci    }
7514d6c458bSopenharmony_ci    task->IncreaseTaskRefCount();
7524d6c458bSopenharmony_ci    if (task->onEnqueuedCallBackInfo_ != nullptr) {
7534d6c458bSopenharmony_ci        task->ExecuteListenerCallback(task->onEnqueuedCallBackInfo_);
7544d6c458bSopenharmony_ci    }
7554d6c458bSopenharmony_ci}
7564d6c458bSopenharmony_ci
7574d6c458bSopenharmony_civoid TaskManager::EraseWaitingTaskId(uint64_t taskId, Priority priority)
7584d6c458bSopenharmony_ci{
7594d6c458bSopenharmony_ci    std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_);
7604d6c458bSopenharmony_ci    if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) {
7614d6c458bSopenharmony_ci        HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel");
7624d6c458bSopenharmony_ci    }
7634d6c458bSopenharmony_ci}
7644d6c458bSopenharmony_ci
7654d6c458bSopenharmony_cistd::pair<uint64_t, Priority> TaskManager::DequeueTaskId()
7664d6c458bSopenharmony_ci{
7674d6c458bSopenharmony_ci    std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_);
7684d6c458bSopenharmony_ci    auto& highTaskQueue = taskQueues_[Priority::HIGH];
7694d6c458bSopenharmony_ci    if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
7704d6c458bSopenharmony_ci        highPrioExecuteCount_++;
7714d6c458bSopenharmony_ci        return GetTaskByPriority(highTaskQueue, Priority::HIGH);
7724d6c458bSopenharmony_ci    }
7734d6c458bSopenharmony_ci    highPrioExecuteCount_ = 0;
7744d6c458bSopenharmony_ci
7754d6c458bSopenharmony_ci    auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
7764d6c458bSopenharmony_ci    if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
7774d6c458bSopenharmony_ci        mediumPrioExecuteCount_++;
7784d6c458bSopenharmony_ci        return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM);
7794d6c458bSopenharmony_ci    }
7804d6c458bSopenharmony_ci    mediumPrioExecuteCount_ = 0;
7814d6c458bSopenharmony_ci
7824d6c458bSopenharmony_ci    auto& lowTaskQueue = taskQueues_[Priority::LOW];
7834d6c458bSopenharmony_ci    if (!lowTaskQueue->IsEmpty()) {
7844d6c458bSopenharmony_ci        return GetTaskByPriority(lowTaskQueue, Priority::LOW);
7854d6c458bSopenharmony_ci    }
7864d6c458bSopenharmony_ci
7874d6c458bSopenharmony_ci    auto& idleTaskQueue = taskQueues_[Priority::IDLE];
7884d6c458bSopenharmony_ci    if (highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() && IsChooseIdle()) {
7894d6c458bSopenharmony_ci        return GetTaskByPriority(idleTaskQueue, Priority::IDLE);
7904d6c458bSopenharmony_ci    }
7914d6c458bSopenharmony_ci    return std::make_pair(0, Priority::LOW);
7924d6c458bSopenharmony_ci}
7934d6c458bSopenharmony_ci
7944d6c458bSopenharmony_cibool TaskManager::IsChooseIdle()
7954d6c458bSopenharmony_ci{
7964d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
7974d6c458bSopenharmony_ci    for (auto& worker : workers_) {
7984d6c458bSopenharmony_ci        if (worker->state_ == WorkerState::IDLE) {
7994d6c458bSopenharmony_ci            // If worker->state_ is WorkerState::IDLE, it means that the worker is free
8004d6c458bSopenharmony_ci            continue;
8014d6c458bSopenharmony_ci        }
8024d6c458bSopenharmony_ci        // If there is a worker running a task, do not take the idle task.
8034d6c458bSopenharmony_ci        return false;
8044d6c458bSopenharmony_ci    }
8054d6c458bSopenharmony_ci    // Only when all workers are free, will idle task be taken.
8064d6c458bSopenharmony_ci    return true;
8074d6c458bSopenharmony_ci}
8084d6c458bSopenharmony_ci
8094d6c458bSopenharmony_cistd::pair<uint64_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue,
8104d6c458bSopenharmony_ci    Priority priority)
8114d6c458bSopenharmony_ci{
8124d6c458bSopenharmony_ci    uint64_t taskId = taskQueue->DequeueTaskId();
8134d6c458bSopenharmony_ci    if (IsDependendByTaskId(taskId)) {
8144d6c458bSopenharmony_ci        EnqueuePendingTaskInfo(taskId, priority);
8154d6c458bSopenharmony_ci        return std::make_pair(0, priority);
8164d6c458bSopenharmony_ci    }
8174d6c458bSopenharmony_ci    DecreaseNumIfNoIdle(priority);
8184d6c458bSopenharmony_ci    return std::make_pair(taskId, priority);
8194d6c458bSopenharmony_ci}
8204d6c458bSopenharmony_ci
8214d6c458bSopenharmony_civoid TaskManager::NotifyExecuteTask()
8224d6c458bSopenharmony_ci{
8234d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
8244d6c458bSopenharmony_ci    if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) {
8254d6c458bSopenharmony_ci        // When there are only idle tasks and workers executing them, it is not triggered
8264d6c458bSopenharmony_ci        return;
8274d6c458bSopenharmony_ci    }
8284d6c458bSopenharmony_ci    for (auto& worker : idleWorkers_) {
8294d6c458bSopenharmony_ci        worker->NotifyExecuteTask();
8304d6c458bSopenharmony_ci    }
8314d6c458bSopenharmony_ci}
8324d6c458bSopenharmony_ci
8334d6c458bSopenharmony_civoid TaskManager::InitTaskManager(napi_env env)
8344d6c458bSopenharmony_ci{
8354d6c458bSopenharmony_ci    HITRACE_HELPER_METER_NAME("InitTaskManager");
8364d6c458bSopenharmony_ci    if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
8374d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT)
8384d6c458bSopenharmony_ci        globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0);
8394d6c458bSopenharmony_ci        if (!globalEnableFfrtFlag_) {
8404d6c458bSopenharmony_ci            UpdateSystemAppFlag();
8414d6c458bSopenharmony_ci            if (IsSystemApp()) {
8424d6c458bSopenharmony_ci                disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0);
8434d6c458bSopenharmony_ci            }
8444d6c458bSopenharmony_ci        }
8454d6c458bSopenharmony_ci        if (EnableFfrt()) {
8464d6c458bSopenharmony_ci            HILOG_INFO("taskpool:: apps use ffrt");
8474d6c458bSopenharmony_ci        } else {
8484d6c458bSopenharmony_ci            HILOG_INFO("taskpool:: apps do not use ffrt");
8494d6c458bSopenharmony_ci        }
8504d6c458bSopenharmony_ci#endif
8514d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_EVENTHANDLER)
8524d6c458bSopenharmony_ci        mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(
8534d6c458bSopenharmony_ci            OHOS::AppExecFwk::EventRunner::GetMainEventRunner());
8544d6c458bSopenharmony_ci#endif
8554d6c458bSopenharmony_ci        auto mainThreadEngine = NativeEngine::GetMainThreadEngine();
8564d6c458bSopenharmony_ci        if (mainThreadEngine == nullptr) {
8574d6c458bSopenharmony_ci            HILOG_FATAL("taskpool:: mainThreadEngine is nullptr");
8584d6c458bSopenharmony_ci            return;
8594d6c458bSopenharmony_ci        }
8604d6c458bSopenharmony_ci        hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine);
8614d6c458bSopenharmony_ci        // Add a reserved thread for taskpool
8624d6c458bSopenharmony_ci        CreateWorkers(hostEnv_);
8634d6c458bSopenharmony_ci        // Create a timer to manage worker threads
8644d6c458bSopenharmony_ci        std::thread workerManager([this] {this->RunTaskManager();});
8654d6c458bSopenharmony_ci        workerManager.detach();
8664d6c458bSopenharmony_ci    }
8674d6c458bSopenharmony_ci}
8684d6c458bSopenharmony_ci
8694d6c458bSopenharmony_civoid TaskManager::CreateWorkers(napi_env env, uint32_t num)
8704d6c458bSopenharmony_ci{
8714d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num);
8724d6c458bSopenharmony_ci    for (uint32_t i = 0; i < num; i++) {
8734d6c458bSopenharmony_ci        auto worker = Worker::WorkerConstructor(env);
8744d6c458bSopenharmony_ci        NotifyWorkerAdded(worker);
8754d6c458bSopenharmony_ci    }
8764d6c458bSopenharmony_ci    CountTraceForWorker();
8774d6c458bSopenharmony_ci}
8784d6c458bSopenharmony_ci
8794d6c458bSopenharmony_civoid TaskManager::RemoveWorker(Worker* worker)
8804d6c458bSopenharmony_ci{
8814d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
8824d6c458bSopenharmony_ci    idleWorkers_.erase(worker);
8834d6c458bSopenharmony_ci    timeoutWorkers_.erase(worker);
8844d6c458bSopenharmony_ci    workers_.erase(worker);
8854d6c458bSopenharmony_ci}
8864d6c458bSopenharmony_ci
8874d6c458bSopenharmony_civoid TaskManager::RestoreWorker(Worker* worker)
8884d6c458bSopenharmony_ci{
8894d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
8904d6c458bSopenharmony_ci    if (UNLIKELY(suspend_)) {
8914d6c458bSopenharmony_ci        suspend_ = false;
8924d6c458bSopenharmony_ci        uv_timer_again(timer_);
8934d6c458bSopenharmony_ci    }
8944d6c458bSopenharmony_ci    if (worker->state_ == WorkerState::BLOCKED) {
8954d6c458bSopenharmony_ci        // since the worker is blocked, we should add it to the timeout set
8964d6c458bSopenharmony_ci        timeoutWorkers_.insert(worker);
8974d6c458bSopenharmony_ci        return;
8984d6c458bSopenharmony_ci    }
8994d6c458bSopenharmony_ci    // Since the worker may be executing some tasks in IO thread, we should add it to the
9004d6c458bSopenharmony_ci    // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
9014d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
9024d6c458bSopenharmony_ci    idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
9034d6c458bSopenharmony_ci    if (GetTaskNum() != 0) {
9044d6c458bSopenharmony_ci        NotifyExecuteTask();
9054d6c458bSopenharmony_ci    }
9064d6c458bSopenharmony_ci}
9074d6c458bSopenharmony_ci
9084d6c458bSopenharmony_ci// ---------------------------------- SendData ---------------------------------------
9094d6c458bSopenharmony_civoid TaskManager::RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo)
9104d6c458bSopenharmony_ci{
9114d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(callbackMutex_);
9124d6c458bSopenharmony_ci    callbackTable_[taskId] = callbackInfo;
9134d6c458bSopenharmony_ci}
9144d6c458bSopenharmony_ci
9154d6c458bSopenharmony_cistd::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint64_t taskId)
9164d6c458bSopenharmony_ci{
9174d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(callbackMutex_);
9184d6c458bSopenharmony_ci    auto iter = callbackTable_.find(taskId);
9194d6c458bSopenharmony_ci    if (iter == callbackTable_.end() || iter->second == nullptr) {
9204d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: the callback does not exist");
9214d6c458bSopenharmony_ci        return nullptr;
9224d6c458bSopenharmony_ci    }
9234d6c458bSopenharmony_ci    return iter->second;
9244d6c458bSopenharmony_ci}
9254d6c458bSopenharmony_ci
9264d6c458bSopenharmony_civoid TaskManager::IncreaseRefCount(uint64_t taskId)
9274d6c458bSopenharmony_ci{
9284d6c458bSopenharmony_ci    if (taskId == 0) { // do not support func
9294d6c458bSopenharmony_ci        return;
9304d6c458bSopenharmony_ci    }
9314d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(callbackMutex_);
9324d6c458bSopenharmony_ci    auto iter = callbackTable_.find(taskId);
9334d6c458bSopenharmony_ci    if (iter == callbackTable_.end() || iter->second == nullptr) {
9344d6c458bSopenharmony_ci        return;
9354d6c458bSopenharmony_ci    }
9364d6c458bSopenharmony_ci    iter->second->refCount++;
9374d6c458bSopenharmony_ci}
9384d6c458bSopenharmony_ci
9394d6c458bSopenharmony_civoid TaskManager::DecreaseRefCount(napi_env env, uint64_t taskId)
9404d6c458bSopenharmony_ci{
9414d6c458bSopenharmony_ci    if (taskId == 0) { // do not support func
9424d6c458bSopenharmony_ci        return;
9434d6c458bSopenharmony_ci    }
9444d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(callbackMutex_);
9454d6c458bSopenharmony_ci    auto iter = callbackTable_.find(taskId);
9464d6c458bSopenharmony_ci    if (iter == callbackTable_.end() || iter->second == nullptr) {
9474d6c458bSopenharmony_ci        return;
9484d6c458bSopenharmony_ci    }
9494d6c458bSopenharmony_ci
9504d6c458bSopenharmony_ci    auto task = reinterpret_cast<Task*>(taskId);
9514d6c458bSopenharmony_ci    if (!task->IsValid()) {
9524d6c458bSopenharmony_ci        callbackTable_.erase(iter);
9534d6c458bSopenharmony_ci        return;
9544d6c458bSopenharmony_ci    }
9554d6c458bSopenharmony_ci
9564d6c458bSopenharmony_ci    iter->second->refCount--;
9574d6c458bSopenharmony_ci    if (iter->second->refCount == 0) {
9584d6c458bSopenharmony_ci        callbackTable_.erase(iter);
9594d6c458bSopenharmony_ci    }
9604d6c458bSopenharmony_ci}
9614d6c458bSopenharmony_ci
9624d6c458bSopenharmony_cinapi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task)
9634d6c458bSopenharmony_ci{
9644d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: task:%{public}s NotifyCallbackExecute", std::to_string(task->taskId_).c_str());
9654d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(callbackMutex_);
9664d6c458bSopenharmony_ci    auto iter = callbackTable_.find(task->taskId_);
9674d6c458bSopenharmony_ci    if (iter == callbackTable_.end() || iter->second == nullptr) {
9684d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
9694d6c458bSopenharmony_ci        ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
9704d6c458bSopenharmony_ci        delete resultInfo;
9714d6c458bSopenharmony_ci        return nullptr;
9724d6c458bSopenharmony_ci    }
9734d6c458bSopenharmony_ci    Worker* worker = static_cast<Worker*>(task->worker_);
9744d6c458bSopenharmony_ci    worker->Enqueue(task->env_, resultInfo);
9754d6c458bSopenharmony_ci    auto callbackInfo = iter->second;
9764d6c458bSopenharmony_ci    callbackInfo->refCount++;
9774d6c458bSopenharmony_ci    callbackInfo->worker = worker;
9784d6c458bSopenharmony_ci    auto workerEngine = reinterpret_cast<NativeEngine*>(env);
9794d6c458bSopenharmony_ci    workerEngine->IncreaseListeningCounter();
9804d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_EVENTHANDLER)
9814d6c458bSopenharmony_ci    if (task->IsMainThreadTask()) {
9824d6c458bSopenharmony_ci        HITRACE_HELPER_METER_NAME("NotifyCallbackExecute: PostTask");
9834d6c458bSopenharmony_ci        auto onCallbackTask = [callbackInfo]() {
9844d6c458bSopenharmony_ci            TaskPool::ExecuteCallbackTask(callbackInfo.get());
9854d6c458bSopenharmony_ci        };
9864d6c458bSopenharmony_ci        TaskManager::GetInstance().PostTask(onCallbackTask, "TaskPoolOnCallbackTask", worker->priority_);
9874d6c458bSopenharmony_ci    } else {
9884d6c458bSopenharmony_ci        callbackInfo->onCallbackSignal->data = callbackInfo.get();
9894d6c458bSopenharmony_ci        uv_async_send(callbackInfo->onCallbackSignal);
9904d6c458bSopenharmony_ci    }
9914d6c458bSopenharmony_ci#else
9924d6c458bSopenharmony_ci    callbackInfo->onCallbackSignal->data = callbackInfo.get();
9934d6c458bSopenharmony_ci    uv_async_send(callbackInfo->onCallbackSignal);
9944d6c458bSopenharmony_ci#endif
9954d6c458bSopenharmony_ci    return nullptr;
9964d6c458bSopenharmony_ci}
9974d6c458bSopenharmony_ci
9984d6c458bSopenharmony_ciMsgQueue* TaskManager::GetMessageQueue(const uv_async_t* req)
9994d6c458bSopenharmony_ci{
10004d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(callbackMutex_);
10014d6c458bSopenharmony_ci    auto info = static_cast<CallbackInfo*>(req->data);
10024d6c458bSopenharmony_ci    if (info == nullptr || info->worker == nullptr) {
10034d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: info or worker is nullptr");
10044d6c458bSopenharmony_ci        return nullptr;
10054d6c458bSopenharmony_ci    }
10064d6c458bSopenharmony_ci    auto worker = info->worker;
10074d6c458bSopenharmony_ci    MsgQueue* queue = nullptr;
10084d6c458bSopenharmony_ci    worker->Dequeue(info->hostEnv, queue);
10094d6c458bSopenharmony_ci    return queue;
10104d6c458bSopenharmony_ci}
10114d6c458bSopenharmony_ci
10124d6c458bSopenharmony_ciMsgQueue* TaskManager::GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo)
10134d6c458bSopenharmony_ci{
10144d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(callbackMutex_);
10154d6c458bSopenharmony_ci    if (callbackInfo == nullptr || callbackInfo->worker == nullptr) {
10164d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: callbackInfo or worker is nullptr");
10174d6c458bSopenharmony_ci        return nullptr;
10184d6c458bSopenharmony_ci    }
10194d6c458bSopenharmony_ci    auto worker = callbackInfo->worker;
10204d6c458bSopenharmony_ci    MsgQueue* queue = nullptr;
10214d6c458bSopenharmony_ci    worker->Dequeue(callbackInfo->hostEnv, queue);
10224d6c458bSopenharmony_ci    return queue;
10234d6c458bSopenharmony_ci}
10244d6c458bSopenharmony_ci// ---------------------------------- SendData ---------------------------------------
10254d6c458bSopenharmony_ci
10264d6c458bSopenharmony_civoid TaskManager::NotifyDependencyTaskInfo(uint64_t taskId)
10274d6c458bSopenharmony_ci{
10284d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str());
10294d6c458bSopenharmony_ci    HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
10304d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
10314d6c458bSopenharmony_ci    auto iter = dependentTaskInfos_.find(taskId);
10324d6c458bSopenharmony_ci    if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
10334d6c458bSopenharmony_ci        HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
10344d6c458bSopenharmony_ci        return;
10354d6c458bSopenharmony_ci    }
10364d6c458bSopenharmony_ci    for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
10374d6c458bSopenharmony_ci        auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
10384d6c458bSopenharmony_ci        RemoveDependencyById(taskId, *taskIdIter);
10394d6c458bSopenharmony_ci        taskIdIter = iter->second.erase(taskIdIter);
10404d6c458bSopenharmony_ci        if (taskInfo.first != 0) {
10414d6c458bSopenharmony_ci            EnqueueTaskId(taskInfo.first, taskInfo.second);
10424d6c458bSopenharmony_ci        }
10434d6c458bSopenharmony_ci    }
10444d6c458bSopenharmony_ci}
10454d6c458bSopenharmony_ci
10464d6c458bSopenharmony_civoid TaskManager::RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId)
10474d6c458bSopenharmony_ci{
10484d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str());
10494d6c458bSopenharmony_ci    // remove dependency after task execute
10504d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
10514d6c458bSopenharmony_ci    auto dependTaskIter = dependTaskInfos_.find(taskId);
10524d6c458bSopenharmony_ci    if (dependTaskIter != dependTaskInfos_.end()) {
10534d6c458bSopenharmony_ci        auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId);
10544d6c458bSopenharmony_ci        if (dependTaskInnerIter != dependTaskIter->second.end()) {
10554d6c458bSopenharmony_ci            dependTaskIter->second.erase(dependTaskInnerIter);
10564d6c458bSopenharmony_ci        }
10574d6c458bSopenharmony_ci    }
10584d6c458bSopenharmony_ci}
10594d6c458bSopenharmony_ci
10604d6c458bSopenharmony_cibool TaskManager::IsDependendByTaskId(uint64_t taskId)
10614d6c458bSopenharmony_ci{
10624d6c458bSopenharmony_ci    std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
10634d6c458bSopenharmony_ci    auto iter = dependTaskInfos_.find(taskId);
10644d6c458bSopenharmony_ci    if (iter == dependTaskInfos_.end() || iter->second.empty()) {
10654d6c458bSopenharmony_ci        return false;
10664d6c458bSopenharmony_ci    }
10674d6c458bSopenharmony_ci    return true;
10684d6c458bSopenharmony_ci}
10694d6c458bSopenharmony_ci
10704d6c458bSopenharmony_cibool TaskManager::IsDependentByTaskId(uint64_t dependentTaskId)
10714d6c458bSopenharmony_ci{
10724d6c458bSopenharmony_ci    std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
10734d6c458bSopenharmony_ci    auto iter = dependentTaskInfos_.find(dependentTaskId);
10744d6c458bSopenharmony_ci    if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
10754d6c458bSopenharmony_ci        return false;
10764d6c458bSopenharmony_ci    }
10774d6c458bSopenharmony_ci    return true;
10784d6c458bSopenharmony_ci}
10794d6c458bSopenharmony_ci
10804d6c458bSopenharmony_cibool TaskManager::StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet)
10814d6c458bSopenharmony_ci{
10824d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str());
10834d6c458bSopenharmony_ci    StoreDependentTaskInfo(taskIdSet, taskId);
10844d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
10854d6c458bSopenharmony_ci    auto iter = dependTaskInfos_.find(taskId);
10864d6c458bSopenharmony_ci    if (iter == dependTaskInfos_.end()) {
10874d6c458bSopenharmony_ci        for (const auto& dependentId : taskIdSet) {
10884d6c458bSopenharmony_ci            auto idIter = dependTaskInfos_.find(dependentId);
10894d6c458bSopenharmony_ci            if (idIter == dependTaskInfos_.end()) {
10904d6c458bSopenharmony_ci                continue;
10914d6c458bSopenharmony_ci            }
10924d6c458bSopenharmony_ci            if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
10934d6c458bSopenharmony_ci                return false;
10944d6c458bSopenharmony_ci            }
10954d6c458bSopenharmony_ci        }
10964d6c458bSopenharmony_ci        dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
10974d6c458bSopenharmony_ci        return true;
10984d6c458bSopenharmony_ci    }
10994d6c458bSopenharmony_ci
11004d6c458bSopenharmony_ci    for (const auto& dependentId : iter->second) {
11014d6c458bSopenharmony_ci        auto idIter = dependTaskInfos_.find(dependentId);
11024d6c458bSopenharmony_ci        if (idIter == dependTaskInfos_.end()) {
11034d6c458bSopenharmony_ci            continue;
11044d6c458bSopenharmony_ci        }
11054d6c458bSopenharmony_ci        if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
11064d6c458bSopenharmony_ci            return false;
11074d6c458bSopenharmony_ci        }
11084d6c458bSopenharmony_ci    }
11094d6c458bSopenharmony_ci    iter->second.insert(taskIdSet.begin(), taskIdSet.end());
11104d6c458bSopenharmony_ci    return true;
11114d6c458bSopenharmony_ci}
11124d6c458bSopenharmony_ci
11134d6c458bSopenharmony_cibool TaskManager::CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId)
11144d6c458bSopenharmony_ci{
11154d6c458bSopenharmony_ci    for (const auto& id : idSet) {
11164d6c458bSopenharmony_ci        if (id == taskId) {
11174d6c458bSopenharmony_ci            return false;
11184d6c458bSopenharmony_ci        }
11194d6c458bSopenharmony_ci        auto iter = dependentIdSet.find(id);
11204d6c458bSopenharmony_ci        if (iter != dependentIdSet.end()) {
11214d6c458bSopenharmony_ci            continue;
11224d6c458bSopenharmony_ci        }
11234d6c458bSopenharmony_ci        auto dIter = dependTaskInfos_.find(id);
11244d6c458bSopenharmony_ci        if (dIter == dependTaskInfos_.end()) {
11254d6c458bSopenharmony_ci            continue;
11264d6c458bSopenharmony_ci        }
11274d6c458bSopenharmony_ci        if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
11284d6c458bSopenharmony_ci            return false;
11294d6c458bSopenharmony_ci        }
11304d6c458bSopenharmony_ci    }
11314d6c458bSopenharmony_ci    return true;
11324d6c458bSopenharmony_ci}
11334d6c458bSopenharmony_ci
11344d6c458bSopenharmony_cibool TaskManager::RemoveTaskDependency(uint64_t taskId, uint64_t dependentId)
11354d6c458bSopenharmony_ci{
11364d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str());
11374d6c458bSopenharmony_ci    RemoveDependentTaskInfo(dependentId, taskId);
11384d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
11394d6c458bSopenharmony_ci    auto iter = dependTaskInfos_.find(taskId);
11404d6c458bSopenharmony_ci    if (iter == dependTaskInfos_.end()) {
11414d6c458bSopenharmony_ci        return false;
11424d6c458bSopenharmony_ci    }
11434d6c458bSopenharmony_ci    auto dependIter = iter->second.find(dependentId);
11444d6c458bSopenharmony_ci    if (dependIter ==  iter->second.end()) {
11454d6c458bSopenharmony_ci        return false;
11464d6c458bSopenharmony_ci    }
11474d6c458bSopenharmony_ci    iter->second.erase(dependIter);
11484d6c458bSopenharmony_ci    return true;
11494d6c458bSopenharmony_ci}
11504d6c458bSopenharmony_ci
11514d6c458bSopenharmony_civoid TaskManager::EnqueuePendingTaskInfo(uint64_t taskId, Priority priority)
11524d6c458bSopenharmony_ci{
11534d6c458bSopenharmony_ci    if (taskId == 0) {
11544d6c458bSopenharmony_ci        return;
11554d6c458bSopenharmony_ci    }
11564d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
11574d6c458bSopenharmony_ci    pendingTaskInfos_.emplace(taskId, priority);
11584d6c458bSopenharmony_ci}
11594d6c458bSopenharmony_ci
11604d6c458bSopenharmony_cistd::pair<uint64_t, Priority> TaskManager::DequeuePendingTaskInfo(uint64_t taskId)
11614d6c458bSopenharmony_ci{
11624d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
11634d6c458bSopenharmony_ci    if (pendingTaskInfos_.empty()) {
11644d6c458bSopenharmony_ci        return std::make_pair(0, Priority::DEFAULT);
11654d6c458bSopenharmony_ci    }
11664d6c458bSopenharmony_ci    std::pair<uint64_t, Priority> result;
11674d6c458bSopenharmony_ci    for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
11684d6c458bSopenharmony_ci        if (it->first == taskId) {
11694d6c458bSopenharmony_ci            result = std::make_pair(it->first, it->second);
11704d6c458bSopenharmony_ci            it = pendingTaskInfos_.erase(it);
11714d6c458bSopenharmony_ci            break;
11724d6c458bSopenharmony_ci        }
11734d6c458bSopenharmony_ci    }
11744d6c458bSopenharmony_ci    return result;
11754d6c458bSopenharmony_ci}
11764d6c458bSopenharmony_ci
11774d6c458bSopenharmony_civoid TaskManager::RemovePendingTaskInfo(uint64_t taskId)
11784d6c458bSopenharmony_ci{
11794d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str());
11804d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
11814d6c458bSopenharmony_ci    pendingTaskInfos_.erase(taskId);
11824d6c458bSopenharmony_ci}
11834d6c458bSopenharmony_ci
11844d6c458bSopenharmony_civoid TaskManager::StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet, uint64_t taskId)
11854d6c458bSopenharmony_ci{
11864d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str());
11874d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
11884d6c458bSopenharmony_ci    for (const auto& id : dependentTaskIdSet) {
11894d6c458bSopenharmony_ci        auto iter = dependentTaskInfos_.find(id);
11904d6c458bSopenharmony_ci        if (iter == dependentTaskInfos_.end()) {
11914d6c458bSopenharmony_ci            std::set<uint64_t> set{taskId};
11924d6c458bSopenharmony_ci            dependentTaskInfos_.emplace(id, std::move(set));
11934d6c458bSopenharmony_ci        } else {
11944d6c458bSopenharmony_ci            iter->second.emplace(taskId);
11954d6c458bSopenharmony_ci        }
11964d6c458bSopenharmony_ci    }
11974d6c458bSopenharmony_ci}
11984d6c458bSopenharmony_ci
11994d6c458bSopenharmony_civoid TaskManager::RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId)
12004d6c458bSopenharmony_ci{
12014d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str());
12024d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
12034d6c458bSopenharmony_ci    auto iter = dependentTaskInfos_.find(dependentTaskId);
12044d6c458bSopenharmony_ci    if (iter == dependentTaskInfos_.end()) {
12054d6c458bSopenharmony_ci        return;
12064d6c458bSopenharmony_ci    }
12074d6c458bSopenharmony_ci    auto taskIter = iter->second.find(taskId);
12084d6c458bSopenharmony_ci    if (taskIter == iter->second.end()) {
12094d6c458bSopenharmony_ci        return;
12104d6c458bSopenharmony_ci    }
12114d6c458bSopenharmony_ci    iter->second.erase(taskIter);
12124d6c458bSopenharmony_ci}
12134d6c458bSopenharmony_ci
12144d6c458bSopenharmony_cistd::string TaskManager::GetTaskDependInfoToString(uint64_t taskId)
12154d6c458bSopenharmony_ci{
12164d6c458bSopenharmony_ci    std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
12174d6c458bSopenharmony_ci    std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:";
12184d6c458bSopenharmony_ci    auto iter = dependTaskInfos_.find(taskId);
12194d6c458bSopenharmony_ci    if (iter != dependTaskInfos_.end()) {
12204d6c458bSopenharmony_ci        for (const auto& id : iter->second) {
12214d6c458bSopenharmony_ci            str += " " + std::to_string(id);
12224d6c458bSopenharmony_ci        }
12234d6c458bSopenharmony_ci    }
12244d6c458bSopenharmony_ci    return str;
12254d6c458bSopenharmony_ci}
12264d6c458bSopenharmony_ci
12274d6c458bSopenharmony_civoid TaskManager::StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
12284d6c458bSopenharmony_ci{
12294d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str());
12304d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
12314d6c458bSopenharmony_ci    auto iter = taskDurationInfos_.find(taskId);
12324d6c458bSopenharmony_ci    if (iter == taskDurationInfos_.end()) {
12334d6c458bSopenharmony_ci        std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
12344d6c458bSopenharmony_ci        taskDurationInfos_.emplace(taskId, std::move(durationData));
12354d6c458bSopenharmony_ci    } else {
12364d6c458bSopenharmony_ci        if (totalDuration != 0) {
12374d6c458bSopenharmony_ci            iter->second.first = totalDuration;
12384d6c458bSopenharmony_ci        }
12394d6c458bSopenharmony_ci        if (cpuDuration != 0) {
12404d6c458bSopenharmony_ci            iter->second.second = cpuDuration;
12414d6c458bSopenharmony_ci        }
12424d6c458bSopenharmony_ci    }
12434d6c458bSopenharmony_ci}
12444d6c458bSopenharmony_ci
12454d6c458bSopenharmony_ciuint64_t TaskManager::GetTaskDuration(uint64_t taskId, std::string durationType)
12464d6c458bSopenharmony_ci{
12474d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
12484d6c458bSopenharmony_ci    auto iter = taskDurationInfos_.find(taskId);
12494d6c458bSopenharmony_ci    if (iter == taskDurationInfos_.end()) {
12504d6c458bSopenharmony_ci        return 0;
12514d6c458bSopenharmony_ci    }
12524d6c458bSopenharmony_ci    if (durationType == TASK_TOTAL_TIME) {
12534d6c458bSopenharmony_ci        return iter->second.first;
12544d6c458bSopenharmony_ci    } else if (durationType == TASK_CPU_TIME) {
12554d6c458bSopenharmony_ci        return iter->second.second;
12564d6c458bSopenharmony_ci    } else if (iter->second.first == 0) {
12574d6c458bSopenharmony_ci        return 0;
12584d6c458bSopenharmony_ci    }
12594d6c458bSopenharmony_ci    return iter->second.first - iter->second.second;
12604d6c458bSopenharmony_ci}
12614d6c458bSopenharmony_ci
12624d6c458bSopenharmony_civoid TaskManager::RemoveTaskDuration(uint64_t taskId)
12634d6c458bSopenharmony_ci{
12644d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str());
12654d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
12664d6c458bSopenharmony_ci    auto iter = taskDurationInfos_.find(taskId);
12674d6c458bSopenharmony_ci    if (iter != taskDurationInfos_.end()) {
12684d6c458bSopenharmony_ci        taskDurationInfos_.erase(iter);
12694d6c458bSopenharmony_ci    }
12704d6c458bSopenharmony_ci}
12714d6c458bSopenharmony_ci
12724d6c458bSopenharmony_civoid TaskManager::StoreLongTaskInfo(uint64_t taskId, Worker* worker)
12734d6c458bSopenharmony_ci{
12744d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
12754d6c458bSopenharmony_ci    longTasksMap_.emplace(taskId, worker);
12764d6c458bSopenharmony_ci}
12774d6c458bSopenharmony_ci
12784d6c458bSopenharmony_civoid TaskManager::RemoveLongTaskInfo(uint64_t taskId)
12794d6c458bSopenharmony_ci{
12804d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
12814d6c458bSopenharmony_ci    longTasksMap_.erase(taskId);
12824d6c458bSopenharmony_ci}
12834d6c458bSopenharmony_ci
12844d6c458bSopenharmony_ciWorker* TaskManager::GetLongTaskInfo(uint64_t taskId)
12854d6c458bSopenharmony_ci{
12864d6c458bSopenharmony_ci    std::shared_lock<std::shared_mutex> lock(longTasksMutex_);
12874d6c458bSopenharmony_ci    auto iter = longTasksMap_.find(taskId);
12884d6c458bSopenharmony_ci    return iter != longTasksMap_.end() ? iter->second : nullptr;
12894d6c458bSopenharmony_ci}
12904d6c458bSopenharmony_ci
12914d6c458bSopenharmony_civoid TaskManager::TerminateTask(uint64_t taskId)
12924d6c458bSopenharmony_ci{
12934d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str());
12944d6c458bSopenharmony_ci    auto worker = GetLongTaskInfo(taskId);
12954d6c458bSopenharmony_ci    if (UNLIKELY(worker == nullptr)) {
12964d6c458bSopenharmony_ci        return;
12974d6c458bSopenharmony_ci    }
12984d6c458bSopenharmony_ci    worker->TerminateTask(taskId);
12994d6c458bSopenharmony_ci    RemoveLongTaskInfo(taskId);
13004d6c458bSopenharmony_ci}
13014d6c458bSopenharmony_ci
13024d6c458bSopenharmony_civoid TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask)
13034d6c458bSopenharmony_ci{
13044d6c458bSopenharmony_ci    uint64_t taskId = task->taskId_;
13054d6c458bSopenharmony_ci    if (shouldDeleteTask) {
13064d6c458bSopenharmony_ci        RemoveTask(taskId);
13074d6c458bSopenharmony_ci    }
13084d6c458bSopenharmony_ci    if (task->onResultSignal_ != nullptr) {
13094d6c458bSopenharmony_ci        if (!uv_is_closing((uv_handle_t*)task->onResultSignal_)) {
13104d6c458bSopenharmony_ci            ConcurrentHelper::UvHandleClose(task->onResultSignal_);
13114d6c458bSopenharmony_ci        } else {
13124d6c458bSopenharmony_ci            delete task->onResultSignal_;
13134d6c458bSopenharmony_ci        }
13144d6c458bSopenharmony_ci        task->onResultSignal_ = nullptr;
13154d6c458bSopenharmony_ci    }
13164d6c458bSopenharmony_ci
13174d6c458bSopenharmony_ci    if (task->currentTaskInfo_ != nullptr) {
13184d6c458bSopenharmony_ci        delete task->currentTaskInfo_;
13194d6c458bSopenharmony_ci        task->currentTaskInfo_ = nullptr;
13204d6c458bSopenharmony_ci    }
13214d6c458bSopenharmony_ci
13224d6c458bSopenharmony_ci    task->CancelPendingTask(env);
13234d6c458bSopenharmony_ci
13244d6c458bSopenharmony_ci    task->ClearDelayedTimers();
13254d6c458bSopenharmony_ci
13264d6c458bSopenharmony_ci    if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
13274d6c458bSopenharmony_ci        return;
13284d6c458bSopenharmony_ci    }
13294d6c458bSopenharmony_ci    DecreaseRefCount(env, taskId);
13304d6c458bSopenharmony_ci    RemoveTaskDuration(taskId);
13314d6c458bSopenharmony_ci    RemovePendingTaskInfo(taskId);
13324d6c458bSopenharmony_ci    ReleaseCallBackInfo(task);
13334d6c458bSopenharmony_ci    {
13344d6c458bSopenharmony_ci        std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
13354d6c458bSopenharmony_ci        for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
13364d6c458bSopenharmony_ci            if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
13374d6c458bSopenharmony_ci                dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
13384d6c458bSopenharmony_ci            } else {
13394d6c458bSopenharmony_ci                ++dependentTaskIter;
13404d6c458bSopenharmony_ci            }
13414d6c458bSopenharmony_ci        }
13424d6c458bSopenharmony_ci    }
13434d6c458bSopenharmony_ci    std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
13444d6c458bSopenharmony_ci    auto dependTaskIter = dependTaskInfos_.find(taskId);
13454d6c458bSopenharmony_ci    if (dependTaskIter != dependTaskInfos_.end()) {
13464d6c458bSopenharmony_ci        dependTaskInfos_.erase(dependTaskIter);
13474d6c458bSopenharmony_ci    }
13484d6c458bSopenharmony_ci}
13494d6c458bSopenharmony_ci
13504d6c458bSopenharmony_civoid TaskManager::ReleaseCallBackInfo(Task* task)
13514d6c458bSopenharmony_ci{
13524d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str());
13534d6c458bSopenharmony_ci    if (task->onEnqueuedCallBackInfo_ != nullptr) {
13544d6c458bSopenharmony_ci        delete task->onEnqueuedCallBackInfo_;
13554d6c458bSopenharmony_ci        task->onEnqueuedCallBackInfo_ = nullptr;
13564d6c458bSopenharmony_ci    }
13574d6c458bSopenharmony_ci
13584d6c458bSopenharmony_ci    if (task->onStartExecutionCallBackInfo_ != nullptr) {
13594d6c458bSopenharmony_ci        delete task->onStartExecutionCallBackInfo_;
13604d6c458bSopenharmony_ci        task->onStartExecutionCallBackInfo_ = nullptr;
13614d6c458bSopenharmony_ci    }
13624d6c458bSopenharmony_ci
13634d6c458bSopenharmony_ci    if (task->onExecutionFailedCallBackInfo_ != nullptr) {
13644d6c458bSopenharmony_ci        delete task->onExecutionFailedCallBackInfo_;
13654d6c458bSopenharmony_ci        task->onExecutionFailedCallBackInfo_ = nullptr;
13664d6c458bSopenharmony_ci    }
13674d6c458bSopenharmony_ci
13684d6c458bSopenharmony_ci    if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
13694d6c458bSopenharmony_ci        delete task->onExecutionSucceededCallBackInfo_;
13704d6c458bSopenharmony_ci        task->onExecutionSucceededCallBackInfo_ = nullptr;
13714d6c458bSopenharmony_ci    }
13724d6c458bSopenharmony_ci
13734d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_EVENTHANDLER)
13744d6c458bSopenharmony_ci    if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) {
13754d6c458bSopenharmony_ci        if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
13764d6c458bSopenharmony_ci            ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
13774d6c458bSopenharmony_ci        } else {
13784d6c458bSopenharmony_ci            delete task->onStartExecutionSignal_;
13794d6c458bSopenharmony_ci        }
13804d6c458bSopenharmony_ci        task->onStartExecutionSignal_ = nullptr;
13814d6c458bSopenharmony_ci    }
13824d6c458bSopenharmony_ci#else
13834d6c458bSopenharmony_ci    if (task->onStartExecutionSignal_ != nullptr) {
13844d6c458bSopenharmony_ci        if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
13854d6c458bSopenharmony_ci            ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
13864d6c458bSopenharmony_ci        } else {
13874d6c458bSopenharmony_ci            delete task->onStartExecutionSignal_;
13884d6c458bSopenharmony_ci        }
13894d6c458bSopenharmony_ci        task->onStartExecutionSignal_ = nullptr;
13904d6c458bSopenharmony_ci    }
13914d6c458bSopenharmony_ci#endif
13924d6c458bSopenharmony_ci}
13934d6c458bSopenharmony_ci
13944d6c458bSopenharmony_civoid TaskManager::StoreTask(uint64_t taskId, Task* task)
13954d6c458bSopenharmony_ci{
13964d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
13974d6c458bSopenharmony_ci    tasks_.emplace(taskId, task);
13984d6c458bSopenharmony_ci}
13994d6c458bSopenharmony_ci
14004d6c458bSopenharmony_civoid TaskManager::RemoveTask(uint64_t taskId)
14014d6c458bSopenharmony_ci{
14024d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
14034d6c458bSopenharmony_ci    tasks_.erase(taskId);
14044d6c458bSopenharmony_ci}
14054d6c458bSopenharmony_ci
14064d6c458bSopenharmony_ciTask* TaskManager::GetTask(uint64_t taskId)
14074d6c458bSopenharmony_ci{
14084d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
14094d6c458bSopenharmony_ci    auto iter = tasks_.find(taskId);
14104d6c458bSopenharmony_ci    if (iter == tasks_.end()) {
14114d6c458bSopenharmony_ci        return nullptr;
14124d6c458bSopenharmony_ci    }
14134d6c458bSopenharmony_ci    return iter->second;
14144d6c458bSopenharmony_ci}
14154d6c458bSopenharmony_ci
14164d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_FFRT)
14174d6c458bSopenharmony_civoid TaskManager::UpdateSystemAppFlag()
14184d6c458bSopenharmony_ci{
14194d6c458bSopenharmony_ci    auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
14204d6c458bSopenharmony_ci    if (abilityManager == nullptr) {
14214d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr.");
14224d6c458bSopenharmony_ci        return;
14234d6c458bSopenharmony_ci    }
14244d6c458bSopenharmony_ci    auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID);
14254d6c458bSopenharmony_ci    if (bundleObj == nullptr) {
14264d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: fail to get bundle manager service.");
14274d6c458bSopenharmony_ci        return;
14284d6c458bSopenharmony_ci    }
14294d6c458bSopenharmony_ci    auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj);
14304d6c458bSopenharmony_ci    if (bundleMgr == nullptr) {
14314d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: Bundle manager is nullptr.");
14324d6c458bSopenharmony_ci        return;
14334d6c458bSopenharmony_ci    }
14344d6c458bSopenharmony_ci    OHOS::AppExecFwk::BundleInfo bundleInfo;
14354d6c458bSopenharmony_ci    if (bundleMgr->GetBundleInfoForSelf(
14364d6c458bSopenharmony_ci        static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo)
14374d6c458bSopenharmony_ci        != OHOS::ERR_OK) {
14384d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf");
14394d6c458bSopenharmony_ci        return;
14404d6c458bSopenharmony_ci    }
14414d6c458bSopenharmony_ci    isSystemApp_ = bundleInfo.applicationInfo.isSystemApp;
14424d6c458bSopenharmony_ci}
14434d6c458bSopenharmony_ci#endif
14444d6c458bSopenharmony_ci
14454d6c458bSopenharmony_ci#if defined(ENABLE_TASKPOOL_EVENTHANDLER)
14464d6c458bSopenharmony_cibool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority)
14474d6c458bSopenharmony_ci{
14484d6c458bSopenharmony_ci    return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority));
14494d6c458bSopenharmony_ci}
14504d6c458bSopenharmony_ci#endif
14514d6c458bSopenharmony_ci
14524d6c458bSopenharmony_cibool TaskManager::CheckTask(uint64_t taskId)
14534d6c458bSopenharmony_ci{
14544d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
14554d6c458bSopenharmony_ci    auto item = tasks_.find(taskId);
14564d6c458bSopenharmony_ci    return item != tasks_.end();
14574d6c458bSopenharmony_ci}
14584d6c458bSopenharmony_ci
14594d6c458bSopenharmony_ci// ----------------------------------- TaskGroupManager ----------------------------------------
14604d6c458bSopenharmony_ciTaskGroupManager& TaskGroupManager::GetInstance()
14614d6c458bSopenharmony_ci{
14624d6c458bSopenharmony_ci    static TaskGroupManager groupManager;
14634d6c458bSopenharmony_ci    return groupManager;
14644d6c458bSopenharmony_ci}
14654d6c458bSopenharmony_ci
14664d6c458bSopenharmony_civoid TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId)
14674d6c458bSopenharmony_ci{
14684d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(taskGroupsMutex_);
14694d6c458bSopenharmony_ci    auto groupIter = taskGroups_.find(groupId);
14704d6c458bSopenharmony_ci    if (groupIter == taskGroups_.end()) {
14714d6c458bSopenharmony_ci        HILOG_DEBUG("taskpool:: taskGroup has been released");
14724d6c458bSopenharmony_ci        return;
14734d6c458bSopenharmony_ci    }
14744d6c458bSopenharmony_ci    auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second);
14754d6c458bSopenharmony_ci    if (taskGroup == nullptr) {
14764d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: taskGroup is null");
14774d6c458bSopenharmony_ci        return;
14784d6c458bSopenharmony_ci    }
14794d6c458bSopenharmony_ci    taskGroup->taskRefs_.push_back(taskRef);
14804d6c458bSopenharmony_ci    taskGroup->taskNum_++;
14814d6c458bSopenharmony_ci    taskGroup->taskIds_.push_back(taskId);
14824d6c458bSopenharmony_ci}
14834d6c458bSopenharmony_ci
14844d6c458bSopenharmony_civoid TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group)
14854d6c458bSopenharmony_ci{
14864d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group");
14874d6c458bSopenharmony_ci    TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
14884d6c458bSopenharmony_ci    for (uint64_t taskId : group->taskIds_) {
14894d6c458bSopenharmony_ci        Task* task = TaskManager::GetInstance().GetTask(taskId);
14904d6c458bSopenharmony_ci        if (task == nullptr || !task->IsValid()) {
14914d6c458bSopenharmony_ci            continue;
14924d6c458bSopenharmony_ci        }
14934d6c458bSopenharmony_ci        napi_reference_unref(task->env_, task->taskRef_, nullptr);
14944d6c458bSopenharmony_ci    }
14954d6c458bSopenharmony_ci
14964d6c458bSopenharmony_ci    if (group->currentGroupInfo_ != nullptr) {
14974d6c458bSopenharmony_ci        delete group->currentGroupInfo_;
14984d6c458bSopenharmony_ci    }
14994d6c458bSopenharmony_ci
15004d6c458bSopenharmony_ci    group->CancelPendingGroup(env);
15014d6c458bSopenharmony_ci}
15024d6c458bSopenharmony_ci
15034d6c458bSopenharmony_civoid TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId)
15044d6c458bSopenharmony_ci{
15054d6c458bSopenharmony_ci    std::string strTrace = "CancelGroup: groupId: " + std::to_string(groupId);
15064d6c458bSopenharmony_ci    HITRACE_HELPER_METER_NAME(strTrace);
15074d6c458bSopenharmony_ci    HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
15084d6c458bSopenharmony_ci    TaskGroup* taskGroup = GetTaskGroup(groupId);
15094d6c458bSopenharmony_ci    if (taskGroup == nullptr) {
15104d6c458bSopenharmony_ci        HILOG_ERROR("taskpool:: CancelGroup group is nullptr");
15114d6c458bSopenharmony_ci        return;
15124d6c458bSopenharmony_ci    }
15134d6c458bSopenharmony_ci    if (taskGroup->groupState_ == ExecuteState::CANCELED) {
15144d6c458bSopenharmony_ci        return;
15154d6c458bSopenharmony_ci    }
15164d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
15174d6c458bSopenharmony_ci    if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND ||
15184d6c458bSopenharmony_ci        taskGroup->groupState_ == ExecuteState::FINISHED) {
15194d6c458bSopenharmony_ci        std::string errMsg = "taskpool:: taskGroup is not executed or has been executed";
15204d6c458bSopenharmony_ci        HILOG_ERROR("%{public}s", errMsg.c_str());
15214d6c458bSopenharmony_ci        ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str());
15224d6c458bSopenharmony_ci        return;
15234d6c458bSopenharmony_ci    }
15244d6c458bSopenharmony_ci    ExecuteState groupState = taskGroup->groupState_;
15254d6c458bSopenharmony_ci    taskGroup->groupState_ = ExecuteState::CANCELED;
15264d6c458bSopenharmony_ci    taskGroup->CancelPendingGroup(env);
15274d6c458bSopenharmony_ci    if (taskGroup->currentGroupInfo_->finishedTask != taskGroup->taskNum_) {
15284d6c458bSopenharmony_ci        for (uint64_t taskId : taskGroup->taskIds_) {
15294d6c458bSopenharmony_ci            CancelGroupTask(env, taskId, taskGroup);
15304d6c458bSopenharmony_ci        }
15314d6c458bSopenharmony_ci    }
15324d6c458bSopenharmony_ci    if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) {
15334d6c458bSopenharmony_ci        auto engine = reinterpret_cast<NativeEngine*>(env);
15344d6c458bSopenharmony_ci        for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) {
15354d6c458bSopenharmony_ci            engine->DecreaseSubEnvCounter();
15364d6c458bSopenharmony_ci        }
15374d6c458bSopenharmony_ci        napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
15384d6c458bSopenharmony_ci        napi_reject_deferred(env, taskGroup->currentGroupInfo_->deferred, error);
15394d6c458bSopenharmony_ci        napi_delete_reference(env, taskGroup->currentGroupInfo_->resArr);
15404d6c458bSopenharmony_ci        napi_reference_unref(env, taskGroup->groupRef_, nullptr);
15414d6c458bSopenharmony_ci        delete taskGroup->currentGroupInfo_;
15424d6c458bSopenharmony_ci        taskGroup->currentGroupInfo_ = nullptr;
15434d6c458bSopenharmony_ci    }
15444d6c458bSopenharmony_ci}
15454d6c458bSopenharmony_ci
15464d6c458bSopenharmony_civoid TaskGroupManager::CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group)
15474d6c458bSopenharmony_ci{
15484d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str());
15494d6c458bSopenharmony_ci    auto task = TaskManager::GetInstance().GetTask(taskId);
15504d6c458bSopenharmony_ci    if (task == nullptr) {
15514d6c458bSopenharmony_ci        HILOG_INFO("taskpool:: CancelGroupTask task is nullptr");
15524d6c458bSopenharmony_ci        return;
15534d6c458bSopenharmony_ci    }
15544d6c458bSopenharmony_ci    std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
15554d6c458bSopenharmony_ci    if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
15564d6c458bSopenharmony_ci        reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
15574d6c458bSopenharmony_ci        task->DecreaseTaskRefCount();
15584d6c458bSopenharmony_ci        TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
15594d6c458bSopenharmony_ci        delete task->currentTaskInfo_;
15604d6c458bSopenharmony_ci        task->currentTaskInfo_ = nullptr;
15614d6c458bSopenharmony_ci    }
15624d6c458bSopenharmony_ci    task->taskState_ = ExecuteState::CANCELED;
15634d6c458bSopenharmony_ci}
15644d6c458bSopenharmony_ci
15654d6c458bSopenharmony_civoid TaskGroupManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner)
15664d6c458bSopenharmony_ci{
15674d6c458bSopenharmony_ci    std::unique_lock<std::mutex> lock(seqRunnersMutex_);
15684d6c458bSopenharmony_ci    seqRunners_.emplace(seqRunnerId, seqRunner);
15694d6c458bSopenharmony_ci}
15704d6c458bSopenharmony_ci
15714d6c458bSopenharmony_civoid TaskGroupManager::RemoveSequenceRunner(uint64_t seqRunnerId)
15724d6c458bSopenharmony_ci{
15734d6c458bSopenharmony_ci    std::unique_lock<std::mutex> lock(seqRunnersMutex_);
15744d6c458bSopenharmony_ci    seqRunners_.erase(seqRunnerId);
15754d6c458bSopenharmony_ci}
15764d6c458bSopenharmony_ci
15774d6c458bSopenharmony_ciSequenceRunner* TaskGroupManager::GetSeqRunner(uint64_t seqRunnerId)
15784d6c458bSopenharmony_ci{
15794d6c458bSopenharmony_ci    std::unique_lock<std::mutex> lock(seqRunnersMutex_);
15804d6c458bSopenharmony_ci    auto iter = seqRunners_.find(seqRunnerId);
15814d6c458bSopenharmony_ci    if (iter != seqRunners_.end()) {
15824d6c458bSopenharmony_ci        return iter->second;
15834d6c458bSopenharmony_ci    }
15844d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: sequenceRunner has been released.");
15854d6c458bSopenharmony_ci    return nullptr;
15864d6c458bSopenharmony_ci}
15874d6c458bSopenharmony_ci
15884d6c458bSopenharmony_civoid TaskGroupManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task)
15894d6c458bSopenharmony_ci{
15904d6c458bSopenharmony_ci    std::unique_lock<std::mutex> lock(seqRunnersMutex_);
15914d6c458bSopenharmony_ci    auto iter = seqRunners_.find(seqRunnerId);
15924d6c458bSopenharmony_ci    if (iter == seqRunners_.end()) {
15934d6c458bSopenharmony_ci        HILOG_ERROR("seqRunner:: seqRunner not found.");
15944d6c458bSopenharmony_ci        return;
15954d6c458bSopenharmony_ci    } else {
15964d6c458bSopenharmony_ci        std::unique_lock<std::shared_mutex> seqRunnerLock(iter->second->seqRunnerMutex_);
15974d6c458bSopenharmony_ci        iter->second->seqRunnerTasks_.push(task);
15984d6c458bSopenharmony_ci    }
15994d6c458bSopenharmony_ci}
16004d6c458bSopenharmony_ci
16014d6c458bSopenharmony_cibool TaskGroupManager::TriggerSeqRunner(napi_env env, Task* lastTask)
16024d6c458bSopenharmony_ci{
16034d6c458bSopenharmony_ci    uint64_t seqRunnerId = lastTask->seqRunnerId_;
16044d6c458bSopenharmony_ci    SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId);
16054d6c458bSopenharmony_ci    if (seqRunner == nullptr) {
16064d6c458bSopenharmony_ci        HILOG_ERROR("seqRunner:: trigger seqRunner not exist.");
16074d6c458bSopenharmony_ci        return false;
16084d6c458bSopenharmony_ci    }
16094d6c458bSopenharmony_ci    if (!SequenceRunnerManager::GetInstance().TriggerGlobalSeqRunner(env, seqRunner)) {
16104d6c458bSopenharmony_ci        HILOG_ERROR("seqRunner:: trigger globalSeqRunner not exist.");
16114d6c458bSopenharmony_ci        return false;
16124d6c458bSopenharmony_ci    }
16134d6c458bSopenharmony_ci    if (seqRunner->currentTaskId_ != lastTask->taskId_) {
16144d6c458bSopenharmony_ci        HILOG_ERROR("seqRunner:: only front task can trigger seqRunner.");
16154d6c458bSopenharmony_ci        return false;
16164d6c458bSopenharmony_ci    }
16174d6c458bSopenharmony_ci    {
16184d6c458bSopenharmony_ci        std::unique_lock<std::shared_mutex> lock(seqRunner->seqRunnerMutex_);
16194d6c458bSopenharmony_ci        if (seqRunner->seqRunnerTasks_.empty()) {
16204d6c458bSopenharmony_ci            HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty.", std::to_string(seqRunnerId).c_str());
16214d6c458bSopenharmony_ci            seqRunner->currentTaskId_ = 0;
16224d6c458bSopenharmony_ci            return true;
16234d6c458bSopenharmony_ci        }
16244d6c458bSopenharmony_ci        Task* task = seqRunner->seqRunnerTasks_.front();
16254d6c458bSopenharmony_ci        seqRunner->seqRunnerTasks_.pop();
16264d6c458bSopenharmony_ci        while (task->taskState_ == ExecuteState::CANCELED) {
16274d6c458bSopenharmony_ci            DisposeCanceledTask(env, task);
16284d6c458bSopenharmony_ci            if (seqRunner->seqRunnerTasks_.empty()) {
16294d6c458bSopenharmony_ci                HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.",
16304d6c458bSopenharmony_ci                            std::to_string(seqRunnerId).c_str());
16314d6c458bSopenharmony_ci                seqRunner->currentTaskId_ = 0;
16324d6c458bSopenharmony_ci                return true;
16334d6c458bSopenharmony_ci            }
16344d6c458bSopenharmony_ci            task = seqRunner->seqRunnerTasks_.front();
16354d6c458bSopenharmony_ci            seqRunner->seqRunnerTasks_.pop();
16364d6c458bSopenharmony_ci        }
16374d6c458bSopenharmony_ci        seqRunner->currentTaskId_ = task->taskId_;
16384d6c458bSopenharmony_ci        task->IncreaseRefCount();
16394d6c458bSopenharmony_ci        task->taskState_ = ExecuteState::WAITING;
16404d6c458bSopenharmony_ci        HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.",
16414d6c458bSopenharmony_ci                    std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
16424d6c458bSopenharmony_ci        TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_);
16434d6c458bSopenharmony_ci    }
16444d6c458bSopenharmony_ci    return true;
16454d6c458bSopenharmony_ci}
16464d6c458bSopenharmony_ci
16474d6c458bSopenharmony_civoid TaskGroupManager::DisposeCanceledTask(napi_env env, Task* task)
16484d6c458bSopenharmony_ci{
16494d6c458bSopenharmony_ci    napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: sequenceRunner task has been canceled");
16504d6c458bSopenharmony_ci    napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
16514d6c458bSopenharmony_ci    reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
16524d6c458bSopenharmony_ci    napi_reference_unref(env, task->taskRef_, nullptr);
16534d6c458bSopenharmony_ci    delete task->currentTaskInfo_;
16544d6c458bSopenharmony_ci    task->currentTaskInfo_ = nullptr;
16554d6c458bSopenharmony_ci}
16564d6c458bSopenharmony_ci
16574d6c458bSopenharmony_civoid TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup)
16584d6c458bSopenharmony_ci{
16594d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(taskGroupsMutex_);
16604d6c458bSopenharmony_ci    taskGroups_.emplace(groupId, taskGroup);
16614d6c458bSopenharmony_ci}
16624d6c458bSopenharmony_ci
16634d6c458bSopenharmony_civoid TaskGroupManager::RemoveTaskGroup(uint64_t groupId)
16644d6c458bSopenharmony_ci{
16654d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(taskGroupsMutex_);
16664d6c458bSopenharmony_ci    taskGroups_.erase(groupId);
16674d6c458bSopenharmony_ci}
16684d6c458bSopenharmony_ci
16694d6c458bSopenharmony_ciTaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId)
16704d6c458bSopenharmony_ci{
16714d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(taskGroupsMutex_);
16724d6c458bSopenharmony_ci    auto groupIter = taskGroups_.find(groupId);
16734d6c458bSopenharmony_ci    if (groupIter == taskGroups_.end()) {
16744d6c458bSopenharmony_ci        return nullptr;
16754d6c458bSopenharmony_ci    }
16764d6c458bSopenharmony_ci    return reinterpret_cast<TaskGroup*>(groupIter->second);
16774d6c458bSopenharmony_ci}
16784d6c458bSopenharmony_ci
16794d6c458bSopenharmony_cibool TaskGroupManager::UpdateGroupState(uint64_t groupId)
16804d6c458bSopenharmony_ci{
16814d6c458bSopenharmony_ci    HILOG_DEBUG("taskpool:: UpdateGroupState groupId:%{public}s", std::to_string(groupId).c_str());
16824d6c458bSopenharmony_ci    // During the modification process of the group, prevent other sub threads from performing other
16834d6c458bSopenharmony_ci    // operations on the group pointer, which may cause the modification to fail.
16844d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(taskGroupsMutex_);
16854d6c458bSopenharmony_ci    auto groupIter = taskGroups_.find(groupId);
16864d6c458bSopenharmony_ci    if (groupIter == taskGroups_.end()) {
16874d6c458bSopenharmony_ci        return false;
16884d6c458bSopenharmony_ci    }
16894d6c458bSopenharmony_ci    TaskGroup* group = reinterpret_cast<TaskGroup*>(groupIter->second);
16904d6c458bSopenharmony_ci    if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) {
16914d6c458bSopenharmony_ci        HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled");
16924d6c458bSopenharmony_ci        return false;
16934d6c458bSopenharmony_ci    }
16944d6c458bSopenharmony_ci    group->groupState_ = ExecuteState::RUNNING;
16954d6c458bSopenharmony_ci    return true;
16964d6c458bSopenharmony_ci}
16974d6c458bSopenharmony_ci
16984d6c458bSopenharmony_ci// ----------------------------------- SequenceRunnerManager ----------------------------------------
16994d6c458bSopenharmony_ciSequenceRunnerManager& SequenceRunnerManager::GetInstance()
17004d6c458bSopenharmony_ci{
17014d6c458bSopenharmony_ci    static SequenceRunnerManager sequenceRunnerManager;
17024d6c458bSopenharmony_ci    return sequenceRunnerManager;
17034d6c458bSopenharmony_ci}
17044d6c458bSopenharmony_ci
17054d6c458bSopenharmony_ciSequenceRunner* SequenceRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc,
17064d6c458bSopenharmony_ci                                                               const std::string &name, uint32_t priority)
17074d6c458bSopenharmony_ci{
17084d6c458bSopenharmony_ci    std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
17094d6c458bSopenharmony_ci    SequenceRunner *seqRunner = nullptr;
17104d6c458bSopenharmony_ci    auto iter = globalSeqRunner_.find(name);
17114d6c458bSopenharmony_ci    if (iter == globalSeqRunner_.end()) {
17124d6c458bSopenharmony_ci        seqRunner = new SequenceRunner();
17134d6c458bSopenharmony_ci        // refresh priority default values on first creation
17144d6c458bSopenharmony_ci        if (argc == 2) { // 2: The number of parameters is 2.
17154d6c458bSopenharmony_ci            seqRunner->priority_ = static_cast<Priority>(priority);
17164d6c458bSopenharmony_ci        }
17174d6c458bSopenharmony_ci        seqRunner->isGlobalRunner_ = true;
17184d6c458bSopenharmony_ci        seqRunner->seqName_ = name;
17194d6c458bSopenharmony_ci        globalSeqRunner_.emplace(name, seqRunner);
17204d6c458bSopenharmony_ci    } else {
17214d6c458bSopenharmony_ci        seqRunner = iter->second;
17224d6c458bSopenharmony_ci        if (priority != seqRunner->priority_) {
17234d6c458bSopenharmony_ci            ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: priority can not changed.");
17244d6c458bSopenharmony_ci            return nullptr;
17254d6c458bSopenharmony_ci        }
17264d6c458bSopenharmony_ci    }
17274d6c458bSopenharmony_ci    seqRunner->count_++;
17284d6c458bSopenharmony_ci    auto tmpIter = seqRunner->globalSeqRunnerRef_.find(env);
17294d6c458bSopenharmony_ci    if (tmpIter == seqRunner->globalSeqRunnerRef_.end()) {
17304d6c458bSopenharmony_ci        napi_ref gloableSeqRunnerRef = nullptr;
17314d6c458bSopenharmony_ci        napi_create_reference(env, thisVar, 0, &gloableSeqRunnerRef);
17324d6c458bSopenharmony_ci        seqRunner->globalSeqRunnerRef_.emplace(env, gloableSeqRunnerRef);
17334d6c458bSopenharmony_ci    }
17344d6c458bSopenharmony_ci
17354d6c458bSopenharmony_ci    return seqRunner;
17364d6c458bSopenharmony_ci}
17374d6c458bSopenharmony_ci
17384d6c458bSopenharmony_cibool SequenceRunnerManager::TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner)
17394d6c458bSopenharmony_ci{
17404d6c458bSopenharmony_ci    std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
17414d6c458bSopenharmony_ci    if (seqRunner->isGlobalRunner_) {
17424d6c458bSopenharmony_ci        auto iter = seqRunner->globalSeqRunnerRef_.find(env);
17434d6c458bSopenharmony_ci        if (iter == seqRunner->globalSeqRunnerRef_.end()) {
17444d6c458bSopenharmony_ci            return false;
17454d6c458bSopenharmony_ci        }
17464d6c458bSopenharmony_ci        napi_reference_unref(env, iter->second, nullptr);
17474d6c458bSopenharmony_ci    } else {
17484d6c458bSopenharmony_ci        napi_reference_unref(env, seqRunner->seqRunnerRef_, nullptr);
17494d6c458bSopenharmony_ci    }
17504d6c458bSopenharmony_ci    return true;
17514d6c458bSopenharmony_ci}
17524d6c458bSopenharmony_ci
17534d6c458bSopenharmony_ciuint64_t SequenceRunnerManager::DecreaseSeqCount(SequenceRunner* seqRunner)
17544d6c458bSopenharmony_ci{
17554d6c458bSopenharmony_ci    std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
17564d6c458bSopenharmony_ci    return --(seqRunner->count_);
17574d6c458bSopenharmony_ci}
17584d6c458bSopenharmony_ci
17594d6c458bSopenharmony_civoid SequenceRunnerManager::RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner)
17604d6c458bSopenharmony_ci{
17614d6c458bSopenharmony_ci    std::lock_guard<std::mutex> lock(globalSeqRunnerMutex_);
17624d6c458bSopenharmony_ci    auto iter = seqRunner->globalSeqRunnerRef_.find(env);
17634d6c458bSopenharmony_ci    if (iter != seqRunner->globalSeqRunnerRef_.end()) {
17644d6c458bSopenharmony_ci        napi_delete_reference(env, iter->second);
17654d6c458bSopenharmony_ci        seqRunner->globalSeqRunnerRef_.erase(iter);
17664d6c458bSopenharmony_ci    }
17674d6c458bSopenharmony_ci}
17684d6c458bSopenharmony_ci
17694d6c458bSopenharmony_civoid SequenceRunnerManager::RemoveSequenceRunner(const std::string &name)
17704d6c458bSopenharmony_ci{
17714d6c458bSopenharmony_ci    std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
17724d6c458bSopenharmony_ci    auto iter = globalSeqRunner_.find(name.c_str());
17734d6c458bSopenharmony_ci    if (iter != globalSeqRunner_.end()) {
17744d6c458bSopenharmony_ci        globalSeqRunner_.erase(iter->first);
17754d6c458bSopenharmony_ci    }
17764d6c458bSopenharmony_ci}
17774d6c458bSopenharmony_ci
17784d6c458bSopenharmony_civoid SequenceRunnerManager::GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner)
17794d6c458bSopenharmony_ci{
17804d6c458bSopenharmony_ci    RemoveGlobalSeqRunnerRef(env, seqRunner);
17814d6c458bSopenharmony_ci    if (DecreaseSeqCount(seqRunner) == 0) {
17824d6c458bSopenharmony_ci        RemoveSequenceRunner(seqRunner->seqName_);
17834d6c458bSopenharmony_ci        TaskGroupManager::GetInstance().RemoveSequenceRunner(seqRunner->seqRunnerId_);
17844d6c458bSopenharmony_ci        delete seqRunner;
17854d6c458bSopenharmony_ci    }
17864d6c458bSopenharmony_ci}
17874d6c458bSopenharmony_ci} // namespace Commonlibrary::Concurrent::TaskPoolModule