1/*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "task_manager.h"
17
18#include <cinttypes>
19#include <securec.h>
20#include <thread>
21
22#if defined(ENABLE_TASKPOOL_FFRT)
23#include "bundle_info.h"
24#include "bundle_mgr_interface.h"
25#include "bundle_mgr_proxy.h"
26#include "c/executor_task.h"
27#include "ffrt_inner.h"
28#include "iservice_registry.h"
29#include "parameters.h"
30#include "status_receiver_interface.h"
31#include "system_ability_definition.h"
32#endif
33#include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
34#include "helper/concurrent_helper.h"
35#include "helper/error_helper.h"
36#include "helper/hitrace_helper.h"
37#include "taskpool.h"
38#include "tools/log.h"
39#include "worker.h"
40
41namespace Commonlibrary::Concurrent::TaskPoolModule {
42using namespace OHOS::JsSysModule;
43
44static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
45static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
46static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
47static constexpr uint32_t STEP_SIZE = 2;
48static constexpr uint32_t DEFAULT_THREADS = 3;
49static constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle
50static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
51static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
52static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
53static constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s
54static constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time
55[[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread
56
57#if defined(ENABLE_TASKPOOL_EVENTHANDLER)
58static const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = {
59    {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE},
60    {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW},
61    {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH},
62    {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE},
63};
64#endif
65
66// ----------------------------------- TaskManager ----------------------------------------
67TaskManager& TaskManager::GetInstance()
68{
69    static TaskManager manager;
70    return manager;
71}
72
73TaskManager::TaskManager()
74{
75    for (size_t i = 0; i < taskQueues_.size(); i++) {
76        std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
77        taskQueues_[i] = std::move(taskQueue);
78    }
79}
80
81TaskManager::~TaskManager()
82{
83    HILOG_INFO("taskpool:: ~TaskManager");
84    if (timer_ == nullptr) {
85        HILOG_ERROR("taskpool:: timer_ is nullptr");
86    } else {
87        uv_timer_stop(timer_);
88        ConcurrentHelper::UvHandleClose(timer_);
89        ConcurrentHelper::UvHandleClose(expandHandle_);
90    }
91
92    if (loop_ != nullptr) {
93        uv_stop(loop_);
94    }
95
96    {
97        std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
98        for (auto& worker : workers_) {
99            delete worker;
100        }
101        workers_.clear();
102    }
103
104    {
105        std::lock_guard<std::mutex> lock(callbackMutex_);
106        for (auto& [_, callbackPtr] : callbackTable_) {
107            if (callbackPtr == nullptr) {
108                continue;
109            }
110            callbackPtr.reset();
111        }
112        callbackTable_.clear();
113    }
114
115    {
116        std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
117        for (auto& [_, task] : tasks_) {
118            delete task;
119            task = nullptr;
120        }
121        tasks_.clear();
122    }
123    CountTraceForWorker();
124}
125
126void TaskManager::CountTraceForWorker()
127{
128    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
129    int64_t threadNum = static_cast<int64_t>(workers_.size());
130    int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
131    int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
132    HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
133    HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
134    HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
135    HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
136}
137
138napi_value TaskManager::GetThreadInfos(napi_env env)
139{
140    napi_value threadInfos = nullptr;
141    napi_create_array(env, &threadInfos);
142    {
143        std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
144        int32_t i = 0;
145        for (auto& worker : workers_) {
146            if (worker->workerEnv_ == nullptr) {
147                continue;
148            }
149            napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->tid_));
150            napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->priority_));
151
152            napi_value taskId = nullptr;
153            napi_create_array(env, &taskId);
154            int32_t j = 0;
155            {
156                std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
157                for (auto& currentId : worker->currentTaskId_) {
158                    napi_value id = NapiHelper::CreateUint32(env, currentId);
159                    napi_set_element(env, taskId, j, id);
160                    j++;
161                }
162            }
163            napi_value threadInfo = nullptr;
164            napi_create_object(env, &threadInfo);
165            napi_set_named_property(env, threadInfo, "tid", tid);
166            napi_set_named_property(env, threadInfo, "priority", priority);
167            napi_set_named_property(env, threadInfo, "taskIds", taskId);
168            napi_set_element(env, threadInfos, i, threadInfo);
169            i++;
170        }
171    }
172    return threadInfos;
173}
174
175napi_value TaskManager::GetTaskInfos(napi_env env)
176{
177    napi_value taskInfos = nullptr;
178    napi_create_array(env, &taskInfos);
179    {
180        std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
181        int32_t i = 0;
182        for (const auto& [_, task] : tasks_) {
183            if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED ||
184                task->taskState_ == ExecuteState::FINISHED) {
185                continue;
186            }
187            napi_value taskInfoValue = NapiHelper::CreateObject(env);
188            std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
189            napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_);
190            napi_value name = nullptr;
191            napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name);
192            napi_set_named_property(env, taskInfoValue, "name", name);
193            ExecuteState state = task->taskState_;
194            uint64_t duration = 0;
195            if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) {
196                duration = ConcurrentHelper::GetMilliseconds() - task->startTime_;
197            }
198            napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state));
199            napi_set_named_property(env, taskInfoValue, "taskId", taskId);
200            napi_set_named_property(env, taskInfoValue, "state", stateValue);
201            napi_value durationValue = NapiHelper::CreateUint32(env, duration);
202            napi_set_named_property(env, taskInfoValue, "duration", durationValue);
203            napi_set_element(env, taskInfos, i, taskInfoValue);
204            i++;
205        }
206    }
207    return taskInfos;
208}
209
210void TaskManager::UpdateExecutedInfo(uint64_t duration)
211{
212    totalExecTime_ += duration;
213    totalExecCount_++;
214}
215
216uint32_t TaskManager::ComputeSuitableThreadNum()
217{
218    uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers();
219    return targetNum;
220}
221
222uint32_t TaskManager::ComputeSuitableIdleNum()
223{
224    uint32_t targetNum = 0;
225    if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) {
226        // this branch is used for avoiding time-consuming tasks that may block the taskpool
227        targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum());
228    } else if (totalExecCount_ != 0) {
229        auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
230        uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION);
231        targetNum = std::min(result, GetNonIdleTaskNum());
232    }
233    return targetNum;
234}
235
236void TaskManager::CheckForBlockedWorkers()
237{
238    // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
239    // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
240    // else we will choose the longer one
241    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
242    bool needChecking = false;
243    bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
244    uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
245    for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
246        auto worker = *iter;
247        // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
248        // if the worker is executing the longTask, we will not do the check
249        if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) ||
250            (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
251            !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
252            continue;
253        }
254        // When executing the promise task, the worker state may not be updated and will be
255        // marked as 'BLOCKED', so we should exclude this situation.
256        // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
257        // the task like I/O in uv threads, we should also exclude this situation.
258        auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
259        if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
260            if (!workerEngine->HasWaitingRequest()) {
261                worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
262            } else {
263                worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
264                worker->startTime_ = ConcurrentHelper::GetMilliseconds();
265            }
266            continue;
267        }
268
269        HILOG_INFO("taskpool:: The worker has been marked as timeout.");
270        // If the current worker has a longTask and is not executing, we will only interrupt it.
271        if (worker->HasLongTask()) {
272            continue;
273        }
274        needChecking = true;
275        idleWorkers_.erase(worker);
276        timeoutWorkers_.insert(worker);
277    }
278    // should trigger the check when we have marked and removed workers
279    if (UNLIKELY(needChecking)) {
280        TryExpand();
281    }
282}
283
284void TaskManager::TryTriggerExpand()
285{
286    // post the signal to notify the monitor thread to expand
287    if (UNLIKELY(!isHandleInited_)) {
288        NotifyExecuteTask();
289        needChecking_ = true;
290        HILOG_DEBUG("taskpool:: the expandHandle_ is nullptr");
291        return;
292    }
293    uv_async_send(expandHandle_);
294}
295
296#if defined(OHOS_PLATFORM)
297// read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
298bool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size)
299{
300    char path[128]; // 128: buffer for path
301    pid_t pid = getpid();
302    ssize_t bytesLen = -1;
303    int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
304    if (ret < 0) {
305        HILOG_ERROR("snprintf_s failed");
306        return false;
307    }
308    int fd = open(path, O_RDONLY | O_NONBLOCK);
309    if (UNLIKELY(fd == -1)) {
310        return false;
311    }
312    bytesLen = read(fd, buf, size - 1);
313    close(fd);
314    if (bytesLen <= 0) {
315        HILOG_ERROR("taskpool:: failed to read %{public}s", path);
316        return false;
317    }
318    buf[bytesLen] = '\0';
319    return true;
320}
321
322uint32_t TaskManager::GetIdleWorkers()
323{
324    char buf[4096]; // 4096: buffer for thread info
325    uint32_t idleCount = 0;
326    std::unordered_set<pid_t> tids {};
327    {
328        std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
329        for (auto& worker : idleWorkers_) {
330#if defined(ENABLE_TASKPOOL_FFRT)
331            if (worker->ffrtTaskHandle_ != nullptr) {
332                if (worker->GetWaitTime() > 0) {
333                    idleCount++;
334                }
335                continue;
336            }
337#endif
338            tids.emplace(worker->tid_);
339        }
340    }
341    // The ffrt thread does not read thread info
342    for (auto tid : tids) {
343        if (!ReadThreadInfo(tid, buf, sizeof(buf))) {
344            continue;
345        }
346        char state;
347        if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
348            HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
349            return 0;
350        }
351        if (state == 'S') {
352            idleCount++;
353        }
354    }
355    return idleCount;
356}
357
358void TaskManager::GetIdleWorkersList(uint32_t step)
359{
360    char buf[4096]; // 4096: buffer for thread info
361    for (auto& worker : idleWorkers_) {
362#if defined(ENABLE_TASKPOOL_FFRT)
363        if (worker->ffrtTaskHandle_ != nullptr) {
364            uint64_t workerWaitTime = worker->GetWaitTime();
365            bool isWorkerLoopActive = worker->IsLoopActive();
366            if (workerWaitTime == 0) {
367                continue;
368            }
369            uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
370                std::chrono::steady_clock::now().time_since_epoch()).count());
371            if (!isWorkerLoopActive) {
372                freeList_.emplace_back(worker);
373            } else if ((currTime - workerWaitTime) > IDLE_THRESHOLD * TRIGGER_INTERVAL) {
374                freeList_.emplace_back(worker);
375                HILOG_INFO("taskpool:: worker in ffrt epoll wait more than 2 intervals, force to free.");
376            } else {
377                // worker uv alive, and will be free in 2 intervals if not wake
378                HILOG_INFO("taskpool:: worker will be free if not wake.");
379            }
380            continue;
381        }
382#endif
383        if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) {
384            continue;
385        }
386        char state;
387        uint64_t utime;
388        if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
389            &state, sizeof(state), &utime) != 2) { // 2: state and utime
390            HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
391            return;
392        }
393        if (state != 'S' || utime != worker->lastCpuTime_) {
394            worker->idleCount_ = 0;
395            worker->lastCpuTime_ = utime;
396            continue;
397        }
398        if (++worker->idleCount_ >= IDLE_THRESHOLD) {
399            freeList_.emplace_back(worker);
400        }
401    }
402}
403
404void TaskManager::TriggerShrink(uint32_t step)
405{
406    GetIdleWorkersList(step);
407    step = std::min(step, static_cast<uint32_t>(freeList_.size()));
408    uint32_t count = 0;
409    for (size_t i = 0; i < freeList_.size(); i++) {
410        auto worker = freeList_[i];
411        if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) {
412            continue;
413        }
414        auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
415        if (idleTime < MAX_IDLE_TIME || worker->HasRunningTasks()) {
416            continue;
417        }
418        idleWorkers_.erase(worker);
419        HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
420        worker->PostReleaseSignal();
421        if (++count == step) {
422            break;
423        }
424    }
425    freeList_.clear();
426}
427#else
428uint32_t TaskManager::GetIdleWorkers()
429{
430    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
431    return idleWorkers_.size();
432}
433
434void TaskManager::TriggerShrink(uint32_t step)
435{
436    for (uint32_t i = 0; i < step; i++) {
437        // try to free the worker that idle time meets the requirement
438        auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker *worker) {
439            auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
440            return idleTime > MAX_IDLE_TIME && !worker->HasRunningTasks() && !worker->HasLongTask();
441        });
442        // remove it from all sets
443        if (iter != idleWorkers_.end()) {
444            auto worker = *iter;
445            idleWorkers_.erase(worker);
446            HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
447            worker->PostReleaseSignal();
448        }
449    }
450}
451#endif
452
453void TaskManager::NotifyShrink(uint32_t targetNum)
454{
455    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
456    uint32_t workerCount = workers_.size();
457    uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS;
458    if (minThread == 0) {
459        HILOG_INFO("taskpool:: the system now is under low memory");
460    }
461    if (workerCount > minThread && workerCount > targetNum) {
462        targetNum = std::max(minThread, targetNum);
463        uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP);
464        TriggerShrink(step);
465    }
466    // remove all timeout workers
467    for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
468        auto worker = *iter;
469        if (workers_.find(worker) == workers_.end()) {
470            HILOG_WARN("taskpool:: current worker maybe release");
471            iter = timeoutWorkers_.erase(iter);
472        } else if (!worker->HasRunningTasks()) {
473            HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", worker->tid_);
474            worker->PostReleaseSignal();
475            timeoutWorkers_.erase(iter++);
476            return;
477        } else {
478            iter++;
479        }
480    }
481    uint32_t idleNum = idleWorkers_.size();
482    // System memory state is moderate and the worker has exeuted tasks, we will try to release it
483    if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) {
484        auto worker = *(idleWorkers_.begin());
485        // worker that has longTask should not be released
486        if (worker == nullptr || worker->HasLongTask()) {
487            return;
488        }
489        if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released
490            TriggerShrink(DEFAULT_MIN_THREADS);
491            return;
492        }
493    }
494
495    // Create a worker for performance
496    if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) {
497        CreateWorkers(hostEnv_);
498    }
499    // stop the timer
500    if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) {
501        suspend_ = true;
502        uv_timer_stop(timer_);
503        HILOG_DEBUG("taskpool:: timer will be suspended");
504    }
505}
506
507void TaskManager::TriggerLoadBalance(const uv_timer_t* req)
508{
509    TaskManager& taskManager = TaskManager::GetInstance();
510    taskManager.CheckForBlockedWorkers();
511    uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
512    taskManager.NotifyShrink(targetNum);
513    taskManager.CountTraceForWorker();
514}
515
516void TaskManager::TryExpand()
517{
518    // dispatch task in the TaskPoolManager thread
519    NotifyExecuteTask();
520    // do not trigger when there are more idleWorkers than tasks
521    uint32_t idleNum = GetIdleWorkers();
522    if (idleNum > GetNonIdleTaskNum()) {
523        return;
524    }
525    needChecking_ = false; // do not need to check
526    uint32_t targetNum = ComputeSuitableIdleNum();
527    uint32_t workerCount = 0;
528    uint32_t idleCount = 0;
529    uint32_t timeoutWorkers = 0;
530    {
531        std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
532        idleCount = idleWorkers_.size();
533        workerCount = workers_.size();
534        timeoutWorkers = timeoutWorkers_.size();
535    }
536    uint32_t maxThreads = std::max(ConcurrentHelper::GetMaxThreads(), DEFAULT_THREADS);
537    maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads
538    if (workerCount < maxThreads && idleCount < targetNum) {
539        uint32_t step = std::min(maxThreads, targetNum) - idleCount;
540        // Prevent the total number of expanded threads from exceeding maxThreads
541        if (step + workerCount > maxThreads) {
542            step = maxThreads - workerCount;
543        }
544        CreateWorkers(hostEnv_, step);
545        HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
546            maxThreads, step, GetThreadNum());
547    }
548    if (UNLIKELY(suspend_)) {
549        suspend_ = false;
550        uv_timer_again(timer_);
551    }
552}
553
554void TaskManager::NotifyExpand(const uv_async_t* req)
555{
556    TaskManager& taskManager = TaskManager::GetInstance();
557    taskManager.TryExpand();
558}
559
560void TaskManager::RunTaskManager()
561{
562    loop_ = uv_loop_new();
563    if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE
564        HILOG_FATAL("taskpool:: new loop failed.");
565        return;
566    }
567    ConcurrentHelper::UvHandleInit(loop_, expandHandle_, TaskManager::NotifyExpand);
568    timer_ = new uv_timer_t;
569    uv_timer_init(loop_, timer_);
570    uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL);
571    isHandleInited_ = true;
572#if defined IOS_PLATFORM || defined MAC_PLATFORM
573    pthread_setname_np("OS_TaskManager");
574#else
575    pthread_setname_np(pthread_self(), "OS_TaskManager");
576#endif
577    if (UNLIKELY(needChecking_)) {
578        needChecking_ = false;
579        uv_async_send(expandHandle_);
580    }
581    uv_run(loop_, UV_RUN_DEFAULT);
582    if (loop_ != nullptr) {
583        uv_loop_delete(loop_);
584    }
585}
586
587void TaskManager::CancelTask(napi_env env, uint64_t taskId)
588{
589    // 1. Cannot find taskInfo by executeId, throw error
590    // 2. Find executing taskInfo, skip it
591    // 3. Find waiting taskInfo, cancel it
592    // 4. Find canceled taskInfo, skip it
593    std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId);
594    HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
595    HITRACE_HELPER_METER_NAME(strTrace);
596    Task* task = GetTask(taskId);
597    if (task == nullptr) {
598        std::string errMsg = "taskpool:: the task may not exist";
599        HILOG_ERROR("%{public}s", errMsg.c_str());
600        ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
601        return;
602    }
603    if (task->taskState_ == ExecuteState::CANCELED) {
604        HILOG_DEBUG("taskpool:: task has been canceled");
605        return;
606    }
607    std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
608    if (task->IsPeriodicTask()) {
609        task->CancelPendingTask(env);
610        uv_timer_stop(task->timer_);
611        uv_close(reinterpret_cast<uv_handle_t*>(task->timer_), [](uv_handle_t* handle) {
612            delete (uv_timer_t*)handle;
613            handle = nullptr;
614        });
615        return;
616    } else if (task->IsSeqRunnerTask()) {
617        CancelSeqRunnerTask(env, task);
618        return;
619    }
620    if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
621        task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
622        task->taskState_ == ExecuteState::ENDING) {
623        std::string errMsg = "taskpool:: task is not executed or has been executed";
624        HILOG_ERROR("%{public}s", errMsg.c_str());
625        ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
626        return;
627    }
628
629    task->ClearDelayedTimers();
630    ExecuteState state = task->taskState_.exchange(ExecuteState::CANCELED);
631    task->CancelPendingTask(env);
632    if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
633        reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
634        task->DecreaseTaskRefCount();
635        EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
636        napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: task has been canceled");
637        napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
638        napi_reference_unref(env, task->taskRef_, nullptr);
639        delete task->currentTaskInfo_;
640        task->currentTaskInfo_ = nullptr;
641    }
642}
643
644void TaskManager::CancelSeqRunnerTask(napi_env env, Task *task)
645{
646    if (task->taskState_ == ExecuteState::FINISHED) {
647        std::string errMsg = "taskpool:: sequenceRunner task has been executed";
648        HILOG_ERROR("%{public}s", errMsg.c_str());
649        ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
650    } else {
651        task->taskState_ = ExecuteState::CANCELED;
652    }
653}
654
655void TaskManager::NotifyWorkerIdle(Worker* worker)
656{
657    {
658        std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
659        if (worker->state_ == WorkerState::BLOCKED) {
660            return;
661        }
662        idleWorkers_.insert(worker);
663    }
664    if (GetTaskNum() != 0) {
665        NotifyExecuteTask();
666    }
667    CountTraceForWorker();
668}
669
670void TaskManager::NotifyWorkerCreated(Worker* worker)
671{
672    NotifyWorkerIdle(worker);
673}
674
675void TaskManager::NotifyWorkerAdded(Worker* worker)
676{
677    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
678    workers_.insert(worker);
679    HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
680}
681
682void TaskManager::NotifyWorkerRunning(Worker* worker)
683{
684    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
685    idleWorkers_.erase(worker);
686    CountTraceForWorker();
687}
688
689uint32_t TaskManager::GetRunningWorkers()
690{
691    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
692    return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
693        return worker->HasRunningTasks();
694    });
695}
696
697uint32_t TaskManager::GetTimeoutWorkers()
698{
699    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
700    return timeoutWorkers_.size();
701}
702
703uint32_t TaskManager::GetTaskNum()
704{
705    std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_);
706    uint32_t sum = 0;
707    for (const auto& elements : taskQueues_) {
708        sum += elements->GetTaskNum();
709    }
710    return sum;
711}
712
713uint32_t TaskManager::GetNonIdleTaskNum()
714{
715    return nonIdleTaskNum_;
716}
717
718void TaskManager::IncreaseNumIfNoIdle(Priority priority)
719{
720    if (priority != Priority::IDLE) {
721        ++nonIdleTaskNum_;
722    }
723}
724
725void TaskManager::DecreaseNumIfNoIdle(Priority priority)
726{
727    if (priority != Priority::IDLE) {
728        --nonIdleTaskNum_;
729    }
730}
731
732uint32_t TaskManager::GetThreadNum()
733{
734    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
735    return workers_.size();
736}
737
738void TaskManager::EnqueueTaskId(uint64_t taskId, Priority priority)
739{
740    {
741        std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_);
742        IncreaseNumIfNoIdle(priority);
743        taskQueues_[priority]->EnqueueTaskId(taskId);
744    }
745    TryTriggerExpand();
746    Task* task = GetTask(taskId);
747    if (task == nullptr) {
748        HILOG_FATAL("taskpool:: task is nullptr");
749        return;
750    }
751    task->IncreaseTaskRefCount();
752    if (task->onEnqueuedCallBackInfo_ != nullptr) {
753        task->ExecuteListenerCallback(task->onEnqueuedCallBackInfo_);
754    }
755}
756
757void TaskManager::EraseWaitingTaskId(uint64_t taskId, Priority priority)
758{
759    std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_);
760    if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) {
761        HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel");
762    }
763}
764
765std::pair<uint64_t, Priority> TaskManager::DequeueTaskId()
766{
767    std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_);
768    auto& highTaskQueue = taskQueues_[Priority::HIGH];
769    if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
770        highPrioExecuteCount_++;
771        return GetTaskByPriority(highTaskQueue, Priority::HIGH);
772    }
773    highPrioExecuteCount_ = 0;
774
775    auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
776    if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
777        mediumPrioExecuteCount_++;
778        return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM);
779    }
780    mediumPrioExecuteCount_ = 0;
781
782    auto& lowTaskQueue = taskQueues_[Priority::LOW];
783    if (!lowTaskQueue->IsEmpty()) {
784        return GetTaskByPriority(lowTaskQueue, Priority::LOW);
785    }
786
787    auto& idleTaskQueue = taskQueues_[Priority::IDLE];
788    if (highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() && IsChooseIdle()) {
789        return GetTaskByPriority(idleTaskQueue, Priority::IDLE);
790    }
791    return std::make_pair(0, Priority::LOW);
792}
793
794bool TaskManager::IsChooseIdle()
795{
796    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
797    for (auto& worker : workers_) {
798        if (worker->state_ == WorkerState::IDLE) {
799            // If worker->state_ is WorkerState::IDLE, it means that the worker is free
800            continue;
801        }
802        // If there is a worker running a task, do not take the idle task.
803        return false;
804    }
805    // Only when all workers are free, will idle task be taken.
806    return true;
807}
808
809std::pair<uint64_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue,
810    Priority priority)
811{
812    uint64_t taskId = taskQueue->DequeueTaskId();
813    if (IsDependendByTaskId(taskId)) {
814        EnqueuePendingTaskInfo(taskId, priority);
815        return std::make_pair(0, priority);
816    }
817    DecreaseNumIfNoIdle(priority);
818    return std::make_pair(taskId, priority);
819}
820
821void TaskManager::NotifyExecuteTask()
822{
823    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
824    if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) {
825        // When there are only idle tasks and workers executing them, it is not triggered
826        return;
827    }
828    for (auto& worker : idleWorkers_) {
829        worker->NotifyExecuteTask();
830    }
831}
832
833void TaskManager::InitTaskManager(napi_env env)
834{
835    HITRACE_HELPER_METER_NAME("InitTaskManager");
836    if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
837#if defined(ENABLE_TASKPOOL_FFRT)
838        globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0);
839        if (!globalEnableFfrtFlag_) {
840            UpdateSystemAppFlag();
841            if (IsSystemApp()) {
842                disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0);
843            }
844        }
845        if (EnableFfrt()) {
846            HILOG_INFO("taskpool:: apps use ffrt");
847        } else {
848            HILOG_INFO("taskpool:: apps do not use ffrt");
849        }
850#endif
851#if defined(ENABLE_TASKPOOL_EVENTHANDLER)
852        mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(
853            OHOS::AppExecFwk::EventRunner::GetMainEventRunner());
854#endif
855        auto mainThreadEngine = NativeEngine::GetMainThreadEngine();
856        if (mainThreadEngine == nullptr) {
857            HILOG_FATAL("taskpool:: mainThreadEngine is nullptr");
858            return;
859        }
860        hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine);
861        // Add a reserved thread for taskpool
862        CreateWorkers(hostEnv_);
863        // Create a timer to manage worker threads
864        std::thread workerManager([this] {this->RunTaskManager();});
865        workerManager.detach();
866    }
867}
868
869void TaskManager::CreateWorkers(napi_env env, uint32_t num)
870{
871    HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num);
872    for (uint32_t i = 0; i < num; i++) {
873        auto worker = Worker::WorkerConstructor(env);
874        NotifyWorkerAdded(worker);
875    }
876    CountTraceForWorker();
877}
878
879void TaskManager::RemoveWorker(Worker* worker)
880{
881    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
882    idleWorkers_.erase(worker);
883    timeoutWorkers_.erase(worker);
884    workers_.erase(worker);
885}
886
887void TaskManager::RestoreWorker(Worker* worker)
888{
889    std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
890    if (UNLIKELY(suspend_)) {
891        suspend_ = false;
892        uv_timer_again(timer_);
893    }
894    if (worker->state_ == WorkerState::BLOCKED) {
895        // since the worker is blocked, we should add it to the timeout set
896        timeoutWorkers_.insert(worker);
897        return;
898    }
899    // Since the worker may be executing some tasks in IO thread, we should add it to the
900    // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
901    HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
902    idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
903    if (GetTaskNum() != 0) {
904        NotifyExecuteTask();
905    }
906}
907
908// ---------------------------------- SendData ---------------------------------------
909void TaskManager::RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo)
910{
911    std::lock_guard<std::mutex> lock(callbackMutex_);
912    callbackTable_[taskId] = callbackInfo;
913}
914
915std::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint64_t taskId)
916{
917    std::lock_guard<std::mutex> lock(callbackMutex_);
918    auto iter = callbackTable_.find(taskId);
919    if (iter == callbackTable_.end() || iter->second == nullptr) {
920        HILOG_ERROR("taskpool:: the callback does not exist");
921        return nullptr;
922    }
923    return iter->second;
924}
925
926void TaskManager::IncreaseRefCount(uint64_t taskId)
927{
928    if (taskId == 0) { // do not support func
929        return;
930    }
931    std::lock_guard<std::mutex> lock(callbackMutex_);
932    auto iter = callbackTable_.find(taskId);
933    if (iter == callbackTable_.end() || iter->second == nullptr) {
934        return;
935    }
936    iter->second->refCount++;
937}
938
939void TaskManager::DecreaseRefCount(napi_env env, uint64_t taskId)
940{
941    if (taskId == 0) { // do not support func
942        return;
943    }
944    std::lock_guard<std::mutex> lock(callbackMutex_);
945    auto iter = callbackTable_.find(taskId);
946    if (iter == callbackTable_.end() || iter->second == nullptr) {
947        return;
948    }
949
950    auto task = reinterpret_cast<Task*>(taskId);
951    if (!task->IsValid()) {
952        callbackTable_.erase(iter);
953        return;
954    }
955
956    iter->second->refCount--;
957    if (iter->second->refCount == 0) {
958        callbackTable_.erase(iter);
959    }
960}
961
962napi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task)
963{
964    HILOG_DEBUG("taskpool:: task:%{public}s NotifyCallbackExecute", std::to_string(task->taskId_).c_str());
965    std::lock_guard<std::mutex> lock(callbackMutex_);
966    auto iter = callbackTable_.find(task->taskId_);
967    if (iter == callbackTable_.end() || iter->second == nullptr) {
968        HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
969        ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
970        delete resultInfo;
971        return nullptr;
972    }
973    Worker* worker = static_cast<Worker*>(task->worker_);
974    worker->Enqueue(task->env_, resultInfo);
975    auto callbackInfo = iter->second;
976    callbackInfo->refCount++;
977    callbackInfo->worker = worker;
978    auto workerEngine = reinterpret_cast<NativeEngine*>(env);
979    workerEngine->IncreaseListeningCounter();
980#if defined(ENABLE_TASKPOOL_EVENTHANDLER)
981    if (task->IsMainThreadTask()) {
982        HITRACE_HELPER_METER_NAME("NotifyCallbackExecute: PostTask");
983        auto onCallbackTask = [callbackInfo]() {
984            TaskPool::ExecuteCallbackTask(callbackInfo.get());
985        };
986        TaskManager::GetInstance().PostTask(onCallbackTask, "TaskPoolOnCallbackTask", worker->priority_);
987    } else {
988        callbackInfo->onCallbackSignal->data = callbackInfo.get();
989        uv_async_send(callbackInfo->onCallbackSignal);
990    }
991#else
992    callbackInfo->onCallbackSignal->data = callbackInfo.get();
993    uv_async_send(callbackInfo->onCallbackSignal);
994#endif
995    return nullptr;
996}
997
998MsgQueue* TaskManager::GetMessageQueue(const uv_async_t* req)
999{
1000    std::lock_guard<std::mutex> lock(callbackMutex_);
1001    auto info = static_cast<CallbackInfo*>(req->data);
1002    if (info == nullptr || info->worker == nullptr) {
1003        HILOG_ERROR("taskpool:: info or worker is nullptr");
1004        return nullptr;
1005    }
1006    auto worker = info->worker;
1007    MsgQueue* queue = nullptr;
1008    worker->Dequeue(info->hostEnv, queue);
1009    return queue;
1010}
1011
1012MsgQueue* TaskManager::GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo)
1013{
1014    std::lock_guard<std::mutex> lock(callbackMutex_);
1015    if (callbackInfo == nullptr || callbackInfo->worker == nullptr) {
1016        HILOG_ERROR("taskpool:: callbackInfo or worker is nullptr");
1017        return nullptr;
1018    }
1019    auto worker = callbackInfo->worker;
1020    MsgQueue* queue = nullptr;
1021    worker->Dequeue(callbackInfo->hostEnv, queue);
1022    return queue;
1023}
1024// ---------------------------------- SendData ---------------------------------------
1025
1026void TaskManager::NotifyDependencyTaskInfo(uint64_t taskId)
1027{
1028    HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str());
1029    HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
1030    std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1031    auto iter = dependentTaskInfos_.find(taskId);
1032    if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1033        HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1034        return;
1035    }
1036    for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1037        auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1038        RemoveDependencyById(taskId, *taskIdIter);
1039        taskIdIter = iter->second.erase(taskIdIter);
1040        if (taskInfo.first != 0) {
1041            EnqueueTaskId(taskInfo.first, taskInfo.second);
1042        }
1043    }
1044}
1045
1046void TaskManager::RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId)
1047{
1048    HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str());
1049    // remove dependency after task execute
1050    std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1051    auto dependTaskIter = dependTaskInfos_.find(taskId);
1052    if (dependTaskIter != dependTaskInfos_.end()) {
1053        auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId);
1054        if (dependTaskInnerIter != dependTaskIter->second.end()) {
1055            dependTaskIter->second.erase(dependTaskInnerIter);
1056        }
1057    }
1058}
1059
1060bool TaskManager::IsDependendByTaskId(uint64_t taskId)
1061{
1062    std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1063    auto iter = dependTaskInfos_.find(taskId);
1064    if (iter == dependTaskInfos_.end() || iter->second.empty()) {
1065        return false;
1066    }
1067    return true;
1068}
1069
1070bool TaskManager::IsDependentByTaskId(uint64_t dependentTaskId)
1071{
1072    std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1073    auto iter = dependentTaskInfos_.find(dependentTaskId);
1074    if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1075        return false;
1076    }
1077    return true;
1078}
1079
1080bool TaskManager::StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet)
1081{
1082    HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str());
1083    StoreDependentTaskInfo(taskIdSet, taskId);
1084    std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1085    auto iter = dependTaskInfos_.find(taskId);
1086    if (iter == dependTaskInfos_.end()) {
1087        for (const auto& dependentId : taskIdSet) {
1088            auto idIter = dependTaskInfos_.find(dependentId);
1089            if (idIter == dependTaskInfos_.end()) {
1090                continue;
1091            }
1092            if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
1093                return false;
1094            }
1095        }
1096        dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
1097        return true;
1098    }
1099
1100    for (const auto& dependentId : iter->second) {
1101        auto idIter = dependTaskInfos_.find(dependentId);
1102        if (idIter == dependTaskInfos_.end()) {
1103            continue;
1104        }
1105        if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
1106            return false;
1107        }
1108    }
1109    iter->second.insert(taskIdSet.begin(), taskIdSet.end());
1110    return true;
1111}
1112
1113bool TaskManager::CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId)
1114{
1115    for (const auto& id : idSet) {
1116        if (id == taskId) {
1117            return false;
1118        }
1119        auto iter = dependentIdSet.find(id);
1120        if (iter != dependentIdSet.end()) {
1121            continue;
1122        }
1123        auto dIter = dependTaskInfos_.find(id);
1124        if (dIter == dependTaskInfos_.end()) {
1125            continue;
1126        }
1127        if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
1128            return false;
1129        }
1130    }
1131    return true;
1132}
1133
1134bool TaskManager::RemoveTaskDependency(uint64_t taskId, uint64_t dependentId)
1135{
1136    HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str());
1137    RemoveDependentTaskInfo(dependentId, taskId);
1138    std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1139    auto iter = dependTaskInfos_.find(taskId);
1140    if (iter == dependTaskInfos_.end()) {
1141        return false;
1142    }
1143    auto dependIter = iter->second.find(dependentId);
1144    if (dependIter ==  iter->second.end()) {
1145        return false;
1146    }
1147    iter->second.erase(dependIter);
1148    return true;
1149}
1150
1151void TaskManager::EnqueuePendingTaskInfo(uint64_t taskId, Priority priority)
1152{
1153    if (taskId == 0) {
1154        return;
1155    }
1156    std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1157    pendingTaskInfos_.emplace(taskId, priority);
1158}
1159
1160std::pair<uint64_t, Priority> TaskManager::DequeuePendingTaskInfo(uint64_t taskId)
1161{
1162    std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1163    if (pendingTaskInfos_.empty()) {
1164        return std::make_pair(0, Priority::DEFAULT);
1165    }
1166    std::pair<uint64_t, Priority> result;
1167    for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
1168        if (it->first == taskId) {
1169            result = std::make_pair(it->first, it->second);
1170            it = pendingTaskInfos_.erase(it);
1171            break;
1172        }
1173    }
1174    return result;
1175}
1176
1177void TaskManager::RemovePendingTaskInfo(uint64_t taskId)
1178{
1179    HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str());
1180    std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1181    pendingTaskInfos_.erase(taskId);
1182}
1183
1184void TaskManager::StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet, uint64_t taskId)
1185{
1186    HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str());
1187    std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1188    for (const auto& id : dependentTaskIdSet) {
1189        auto iter = dependentTaskInfos_.find(id);
1190        if (iter == dependentTaskInfos_.end()) {
1191            std::set<uint64_t> set{taskId};
1192            dependentTaskInfos_.emplace(id, std::move(set));
1193        } else {
1194            iter->second.emplace(taskId);
1195        }
1196    }
1197}
1198
1199void TaskManager::RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId)
1200{
1201    HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str());
1202    std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1203    auto iter = dependentTaskInfos_.find(dependentTaskId);
1204    if (iter == dependentTaskInfos_.end()) {
1205        return;
1206    }
1207    auto taskIter = iter->second.find(taskId);
1208    if (taskIter == iter->second.end()) {
1209        return;
1210    }
1211    iter->second.erase(taskIter);
1212}
1213
1214std::string TaskManager::GetTaskDependInfoToString(uint64_t taskId)
1215{
1216    std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1217    std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:";
1218    auto iter = dependTaskInfos_.find(taskId);
1219    if (iter != dependTaskInfos_.end()) {
1220        for (const auto& id : iter->second) {
1221            str += " " + std::to_string(id);
1222        }
1223    }
1224    return str;
1225}
1226
1227void TaskManager::StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
1228{
1229    HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str());
1230    std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1231    auto iter = taskDurationInfos_.find(taskId);
1232    if (iter == taskDurationInfos_.end()) {
1233        std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
1234        taskDurationInfos_.emplace(taskId, std::move(durationData));
1235    } else {
1236        if (totalDuration != 0) {
1237            iter->second.first = totalDuration;
1238        }
1239        if (cpuDuration != 0) {
1240            iter->second.second = cpuDuration;
1241        }
1242    }
1243}
1244
1245uint64_t TaskManager::GetTaskDuration(uint64_t taskId, std::string durationType)
1246{
1247    std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1248    auto iter = taskDurationInfos_.find(taskId);
1249    if (iter == taskDurationInfos_.end()) {
1250        return 0;
1251    }
1252    if (durationType == TASK_TOTAL_TIME) {
1253        return iter->second.first;
1254    } else if (durationType == TASK_CPU_TIME) {
1255        return iter->second.second;
1256    } else if (iter->second.first == 0) {
1257        return 0;
1258    }
1259    return iter->second.first - iter->second.second;
1260}
1261
1262void TaskManager::RemoveTaskDuration(uint64_t taskId)
1263{
1264    HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str());
1265    std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1266    auto iter = taskDurationInfos_.find(taskId);
1267    if (iter != taskDurationInfos_.end()) {
1268        taskDurationInfos_.erase(iter);
1269    }
1270}
1271
1272void TaskManager::StoreLongTaskInfo(uint64_t taskId, Worker* worker)
1273{
1274    std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1275    longTasksMap_.emplace(taskId, worker);
1276}
1277
1278void TaskManager::RemoveLongTaskInfo(uint64_t taskId)
1279{
1280    std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1281    longTasksMap_.erase(taskId);
1282}
1283
1284Worker* TaskManager::GetLongTaskInfo(uint64_t taskId)
1285{
1286    std::shared_lock<std::shared_mutex> lock(longTasksMutex_);
1287    auto iter = longTasksMap_.find(taskId);
1288    return iter != longTasksMap_.end() ? iter->second : nullptr;
1289}
1290
1291void TaskManager::TerminateTask(uint64_t taskId)
1292{
1293    HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str());
1294    auto worker = GetLongTaskInfo(taskId);
1295    if (UNLIKELY(worker == nullptr)) {
1296        return;
1297    }
1298    worker->TerminateTask(taskId);
1299    RemoveLongTaskInfo(taskId);
1300}
1301
1302void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask)
1303{
1304    uint64_t taskId = task->taskId_;
1305    if (shouldDeleteTask) {
1306        RemoveTask(taskId);
1307    }
1308    if (task->onResultSignal_ != nullptr) {
1309        if (!uv_is_closing((uv_handle_t*)task->onResultSignal_)) {
1310            ConcurrentHelper::UvHandleClose(task->onResultSignal_);
1311        } else {
1312            delete task->onResultSignal_;
1313        }
1314        task->onResultSignal_ = nullptr;
1315    }
1316
1317    if (task->currentTaskInfo_ != nullptr) {
1318        delete task->currentTaskInfo_;
1319        task->currentTaskInfo_ = nullptr;
1320    }
1321
1322    task->CancelPendingTask(env);
1323
1324    task->ClearDelayedTimers();
1325
1326    if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
1327        return;
1328    }
1329    DecreaseRefCount(env, taskId);
1330    RemoveTaskDuration(taskId);
1331    RemovePendingTaskInfo(taskId);
1332    ReleaseCallBackInfo(task);
1333    {
1334        std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1335        for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
1336            if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
1337                dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
1338            } else {
1339                ++dependentTaskIter;
1340            }
1341        }
1342    }
1343    std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1344    auto dependTaskIter = dependTaskInfos_.find(taskId);
1345    if (dependTaskIter != dependTaskInfos_.end()) {
1346        dependTaskInfos_.erase(dependTaskIter);
1347    }
1348}
1349
1350void TaskManager::ReleaseCallBackInfo(Task* task)
1351{
1352    HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str());
1353    if (task->onEnqueuedCallBackInfo_ != nullptr) {
1354        delete task->onEnqueuedCallBackInfo_;
1355        task->onEnqueuedCallBackInfo_ = nullptr;
1356    }
1357
1358    if (task->onStartExecutionCallBackInfo_ != nullptr) {
1359        delete task->onStartExecutionCallBackInfo_;
1360        task->onStartExecutionCallBackInfo_ = nullptr;
1361    }
1362
1363    if (task->onExecutionFailedCallBackInfo_ != nullptr) {
1364        delete task->onExecutionFailedCallBackInfo_;
1365        task->onExecutionFailedCallBackInfo_ = nullptr;
1366    }
1367
1368    if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
1369        delete task->onExecutionSucceededCallBackInfo_;
1370        task->onExecutionSucceededCallBackInfo_ = nullptr;
1371    }
1372
1373#if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1374    if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) {
1375        if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
1376            ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1377        } else {
1378            delete task->onStartExecutionSignal_;
1379        }
1380        task->onStartExecutionSignal_ = nullptr;
1381    }
1382#else
1383    if (task->onStartExecutionSignal_ != nullptr) {
1384        if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
1385            ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1386        } else {
1387            delete task->onStartExecutionSignal_;
1388        }
1389        task->onStartExecutionSignal_ = nullptr;
1390    }
1391#endif
1392}
1393
1394void TaskManager::StoreTask(uint64_t taskId, Task* task)
1395{
1396    std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1397    tasks_.emplace(taskId, task);
1398}
1399
1400void TaskManager::RemoveTask(uint64_t taskId)
1401{
1402    std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1403    tasks_.erase(taskId);
1404}
1405
1406Task* TaskManager::GetTask(uint64_t taskId)
1407{
1408    std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1409    auto iter = tasks_.find(taskId);
1410    if (iter == tasks_.end()) {
1411        return nullptr;
1412    }
1413    return iter->second;
1414}
1415
1416#if defined(ENABLE_TASKPOOL_FFRT)
1417void TaskManager::UpdateSystemAppFlag()
1418{
1419    auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
1420    if (abilityManager == nullptr) {
1421        HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr.");
1422        return;
1423    }
1424    auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID);
1425    if (bundleObj == nullptr) {
1426        HILOG_ERROR("taskpool:: fail to get bundle manager service.");
1427        return;
1428    }
1429    auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj);
1430    if (bundleMgr == nullptr) {
1431        HILOG_ERROR("taskpool:: Bundle manager is nullptr.");
1432        return;
1433    }
1434    OHOS::AppExecFwk::BundleInfo bundleInfo;
1435    if (bundleMgr->GetBundleInfoForSelf(
1436        static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo)
1437        != OHOS::ERR_OK) {
1438        HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf");
1439        return;
1440    }
1441    isSystemApp_ = bundleInfo.applicationInfo.isSystemApp;
1442}
1443#endif
1444
1445#if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1446bool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority)
1447{
1448    return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority));
1449}
1450#endif
1451
1452bool TaskManager::CheckTask(uint64_t taskId)
1453{
1454    std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1455    auto item = tasks_.find(taskId);
1456    return item != tasks_.end();
1457}
1458
1459// ----------------------------------- TaskGroupManager ----------------------------------------
1460TaskGroupManager& TaskGroupManager::GetInstance()
1461{
1462    static TaskGroupManager groupManager;
1463    return groupManager;
1464}
1465
1466void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId)
1467{
1468    std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1469    auto groupIter = taskGroups_.find(groupId);
1470    if (groupIter == taskGroups_.end()) {
1471        HILOG_DEBUG("taskpool:: taskGroup has been released");
1472        return;
1473    }
1474    auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second);
1475    if (taskGroup == nullptr) {
1476        HILOG_ERROR("taskpool:: taskGroup is null");
1477        return;
1478    }
1479    taskGroup->taskRefs_.push_back(taskRef);
1480    taskGroup->taskNum_++;
1481    taskGroup->taskIds_.push_back(taskId);
1482}
1483
1484void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group)
1485{
1486    HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group");
1487    TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
1488    for (uint64_t taskId : group->taskIds_) {
1489        Task* task = TaskManager::GetInstance().GetTask(taskId);
1490        if (task == nullptr || !task->IsValid()) {
1491            continue;
1492        }
1493        napi_reference_unref(task->env_, task->taskRef_, nullptr);
1494    }
1495
1496    if (group->currentGroupInfo_ != nullptr) {
1497        delete group->currentGroupInfo_;
1498    }
1499
1500    group->CancelPendingGroup(env);
1501}
1502
1503void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId)
1504{
1505    std::string strTrace = "CancelGroup: groupId: " + std::to_string(groupId);
1506    HITRACE_HELPER_METER_NAME(strTrace);
1507    HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
1508    TaskGroup* taskGroup = GetTaskGroup(groupId);
1509    if (taskGroup == nullptr) {
1510        HILOG_ERROR("taskpool:: CancelGroup group is nullptr");
1511        return;
1512    }
1513    if (taskGroup->groupState_ == ExecuteState::CANCELED) {
1514        return;
1515    }
1516    std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
1517    if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND ||
1518        taskGroup->groupState_ == ExecuteState::FINISHED) {
1519        std::string errMsg = "taskpool:: taskGroup is not executed or has been executed";
1520        HILOG_ERROR("%{public}s", errMsg.c_str());
1521        ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str());
1522        return;
1523    }
1524    ExecuteState groupState = taskGroup->groupState_;
1525    taskGroup->groupState_ = ExecuteState::CANCELED;
1526    taskGroup->CancelPendingGroup(env);
1527    if (taskGroup->currentGroupInfo_->finishedTask != taskGroup->taskNum_) {
1528        for (uint64_t taskId : taskGroup->taskIds_) {
1529            CancelGroupTask(env, taskId, taskGroup);
1530        }
1531    }
1532    if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) {
1533        auto engine = reinterpret_cast<NativeEngine*>(env);
1534        for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) {
1535            engine->DecreaseSubEnvCounter();
1536        }
1537        napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
1538        napi_reject_deferred(env, taskGroup->currentGroupInfo_->deferred, error);
1539        napi_delete_reference(env, taskGroup->currentGroupInfo_->resArr);
1540        napi_reference_unref(env, taskGroup->groupRef_, nullptr);
1541        delete taskGroup->currentGroupInfo_;
1542        taskGroup->currentGroupInfo_ = nullptr;
1543    }
1544}
1545
1546void TaskGroupManager::CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group)
1547{
1548    HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str());
1549    auto task = TaskManager::GetInstance().GetTask(taskId);
1550    if (task == nullptr) {
1551        HILOG_INFO("taskpool:: CancelGroupTask task is nullptr");
1552        return;
1553    }
1554    std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
1555    if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
1556        reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
1557        task->DecreaseTaskRefCount();
1558        TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
1559        delete task->currentTaskInfo_;
1560        task->currentTaskInfo_ = nullptr;
1561    }
1562    task->taskState_ = ExecuteState::CANCELED;
1563}
1564
1565void TaskGroupManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner)
1566{
1567    std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1568    seqRunners_.emplace(seqRunnerId, seqRunner);
1569}
1570
1571void TaskGroupManager::RemoveSequenceRunner(uint64_t seqRunnerId)
1572{
1573    std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1574    seqRunners_.erase(seqRunnerId);
1575}
1576
1577SequenceRunner* TaskGroupManager::GetSeqRunner(uint64_t seqRunnerId)
1578{
1579    std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1580    auto iter = seqRunners_.find(seqRunnerId);
1581    if (iter != seqRunners_.end()) {
1582        return iter->second;
1583    }
1584    HILOG_DEBUG("taskpool:: sequenceRunner has been released.");
1585    return nullptr;
1586}
1587
1588void TaskGroupManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task)
1589{
1590    std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1591    auto iter = seqRunners_.find(seqRunnerId);
1592    if (iter == seqRunners_.end()) {
1593        HILOG_ERROR("seqRunner:: seqRunner not found.");
1594        return;
1595    } else {
1596        std::unique_lock<std::shared_mutex> seqRunnerLock(iter->second->seqRunnerMutex_);
1597        iter->second->seqRunnerTasks_.push(task);
1598    }
1599}
1600
1601bool TaskGroupManager::TriggerSeqRunner(napi_env env, Task* lastTask)
1602{
1603    uint64_t seqRunnerId = lastTask->seqRunnerId_;
1604    SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId);
1605    if (seqRunner == nullptr) {
1606        HILOG_ERROR("seqRunner:: trigger seqRunner not exist.");
1607        return false;
1608    }
1609    if (!SequenceRunnerManager::GetInstance().TriggerGlobalSeqRunner(env, seqRunner)) {
1610        HILOG_ERROR("seqRunner:: trigger globalSeqRunner not exist.");
1611        return false;
1612    }
1613    if (seqRunner->currentTaskId_ != lastTask->taskId_) {
1614        HILOG_ERROR("seqRunner:: only front task can trigger seqRunner.");
1615        return false;
1616    }
1617    {
1618        std::unique_lock<std::shared_mutex> lock(seqRunner->seqRunnerMutex_);
1619        if (seqRunner->seqRunnerTasks_.empty()) {
1620            HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty.", std::to_string(seqRunnerId).c_str());
1621            seqRunner->currentTaskId_ = 0;
1622            return true;
1623        }
1624        Task* task = seqRunner->seqRunnerTasks_.front();
1625        seqRunner->seqRunnerTasks_.pop();
1626        while (task->taskState_ == ExecuteState::CANCELED) {
1627            DisposeCanceledTask(env, task);
1628            if (seqRunner->seqRunnerTasks_.empty()) {
1629                HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.",
1630                            std::to_string(seqRunnerId).c_str());
1631                seqRunner->currentTaskId_ = 0;
1632                return true;
1633            }
1634            task = seqRunner->seqRunnerTasks_.front();
1635            seqRunner->seqRunnerTasks_.pop();
1636        }
1637        seqRunner->currentTaskId_ = task->taskId_;
1638        task->IncreaseRefCount();
1639        task->taskState_ = ExecuteState::WAITING;
1640        HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.",
1641                    std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
1642        TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_);
1643    }
1644    return true;
1645}
1646
1647void TaskGroupManager::DisposeCanceledTask(napi_env env, Task* task)
1648{
1649    napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: sequenceRunner task has been canceled");
1650    napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
1651    reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
1652    napi_reference_unref(env, task->taskRef_, nullptr);
1653    delete task->currentTaskInfo_;
1654    task->currentTaskInfo_ = nullptr;
1655}
1656
1657void TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup)
1658{
1659    std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1660    taskGroups_.emplace(groupId, taskGroup);
1661}
1662
1663void TaskGroupManager::RemoveTaskGroup(uint64_t groupId)
1664{
1665    std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1666    taskGroups_.erase(groupId);
1667}
1668
1669TaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId)
1670{
1671    std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1672    auto groupIter = taskGroups_.find(groupId);
1673    if (groupIter == taskGroups_.end()) {
1674        return nullptr;
1675    }
1676    return reinterpret_cast<TaskGroup*>(groupIter->second);
1677}
1678
1679bool TaskGroupManager::UpdateGroupState(uint64_t groupId)
1680{
1681    HILOG_DEBUG("taskpool:: UpdateGroupState groupId:%{public}s", std::to_string(groupId).c_str());
1682    // During the modification process of the group, prevent other sub threads from performing other
1683    // operations on the group pointer, which may cause the modification to fail.
1684    std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1685    auto groupIter = taskGroups_.find(groupId);
1686    if (groupIter == taskGroups_.end()) {
1687        return false;
1688    }
1689    TaskGroup* group = reinterpret_cast<TaskGroup*>(groupIter->second);
1690    if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) {
1691        HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled");
1692        return false;
1693    }
1694    group->groupState_ = ExecuteState::RUNNING;
1695    return true;
1696}
1697
1698// ----------------------------------- SequenceRunnerManager ----------------------------------------
1699SequenceRunnerManager& SequenceRunnerManager::GetInstance()
1700{
1701    static SequenceRunnerManager sequenceRunnerManager;
1702    return sequenceRunnerManager;
1703}
1704
1705SequenceRunner* SequenceRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc,
1706                                                               const std::string &name, uint32_t priority)
1707{
1708    std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1709    SequenceRunner *seqRunner = nullptr;
1710    auto iter = globalSeqRunner_.find(name);
1711    if (iter == globalSeqRunner_.end()) {
1712        seqRunner = new SequenceRunner();
1713        // refresh priority default values on first creation
1714        if (argc == 2) { // 2: The number of parameters is 2.
1715            seqRunner->priority_ = static_cast<Priority>(priority);
1716        }
1717        seqRunner->isGlobalRunner_ = true;
1718        seqRunner->seqName_ = name;
1719        globalSeqRunner_.emplace(name, seqRunner);
1720    } else {
1721        seqRunner = iter->second;
1722        if (priority != seqRunner->priority_) {
1723            ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: priority can not changed.");
1724            return nullptr;
1725        }
1726    }
1727    seqRunner->count_++;
1728    auto tmpIter = seqRunner->globalSeqRunnerRef_.find(env);
1729    if (tmpIter == seqRunner->globalSeqRunnerRef_.end()) {
1730        napi_ref gloableSeqRunnerRef = nullptr;
1731        napi_create_reference(env, thisVar, 0, &gloableSeqRunnerRef);
1732        seqRunner->globalSeqRunnerRef_.emplace(env, gloableSeqRunnerRef);
1733    }
1734
1735    return seqRunner;
1736}
1737
1738bool SequenceRunnerManager::TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner)
1739{
1740    std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1741    if (seqRunner->isGlobalRunner_) {
1742        auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1743        if (iter == seqRunner->globalSeqRunnerRef_.end()) {
1744            return false;
1745        }
1746        napi_reference_unref(env, iter->second, nullptr);
1747    } else {
1748        napi_reference_unref(env, seqRunner->seqRunnerRef_, nullptr);
1749    }
1750    return true;
1751}
1752
1753uint64_t SequenceRunnerManager::DecreaseSeqCount(SequenceRunner* seqRunner)
1754{
1755    std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1756    return --(seqRunner->count_);
1757}
1758
1759void SequenceRunnerManager::RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner)
1760{
1761    std::lock_guard<std::mutex> lock(globalSeqRunnerMutex_);
1762    auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1763    if (iter != seqRunner->globalSeqRunnerRef_.end()) {
1764        napi_delete_reference(env, iter->second);
1765        seqRunner->globalSeqRunnerRef_.erase(iter);
1766    }
1767}
1768
1769void SequenceRunnerManager::RemoveSequenceRunner(const std::string &name)
1770{
1771    std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1772    auto iter = globalSeqRunner_.find(name.c_str());
1773    if (iter != globalSeqRunner_.end()) {
1774        globalSeqRunner_.erase(iter->first);
1775    }
1776}
1777
1778void SequenceRunnerManager::GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner)
1779{
1780    RemoveGlobalSeqRunnerRef(env, seqRunner);
1781    if (DecreaseSeqCount(seqRunner) == 0) {
1782        RemoveSequenceRunner(seqRunner->seqName_);
1783        TaskGroupManager::GetInstance().RemoveSequenceRunner(seqRunner->seqRunnerId_);
1784        delete seqRunner;
1785    }
1786}
1787} // namespace Commonlibrary::Concurrent::TaskPoolModule