1/* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "task_manager.h" 17 18#include <cinttypes> 19#include <securec.h> 20#include <thread> 21 22#if defined(ENABLE_TASKPOOL_FFRT) 23#include "bundle_info.h" 24#include "bundle_mgr_interface.h" 25#include "bundle_mgr_proxy.h" 26#include "c/executor_task.h" 27#include "ffrt_inner.h" 28#include "iservice_registry.h" 29#include "parameters.h" 30#include "status_receiver_interface.h" 31#include "system_ability_definition.h" 32#endif 33#include "commonlibrary/ets_utils/js_sys_module/timer/timer.h" 34#include "helper/concurrent_helper.h" 35#include "helper/error_helper.h" 36#include "helper/hitrace_helper.h" 37#include "taskpool.h" 38#include "tools/log.h" 39#include "worker.h" 40 41namespace Commonlibrary::Concurrent::TaskPoolModule { 42using namespace OHOS::JsSysModule; 43 44static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5; 45static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5; 46static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms 47static constexpr uint32_t STEP_SIZE = 2; 48static constexpr uint32_t DEFAULT_THREADS = 3; 49static constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle 50static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min 51static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min 52static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s 53static constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s 54static constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time 55[[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread 56 57#if defined(ENABLE_TASKPOOL_EVENTHANDLER) 58static const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = { 59 {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE}, 60 {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW}, 61 {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH}, 62 {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE}, 63}; 64#endif 65 66// ----------------------------------- TaskManager ---------------------------------------- 67TaskManager& TaskManager::GetInstance() 68{ 69 static TaskManager manager; 70 return manager; 71} 72 73TaskManager::TaskManager() 74{ 75 for (size_t i = 0; i < taskQueues_.size(); i++) { 76 std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>(); 77 taskQueues_[i] = std::move(taskQueue); 78 } 79} 80 81TaskManager::~TaskManager() 82{ 83 HILOG_INFO("taskpool:: ~TaskManager"); 84 if (timer_ == nullptr) { 85 HILOG_ERROR("taskpool:: timer_ is nullptr"); 86 } else { 87 uv_timer_stop(timer_); 88 ConcurrentHelper::UvHandleClose(timer_); 89 ConcurrentHelper::UvHandleClose(expandHandle_); 90 } 91 92 if (loop_ != nullptr) { 93 uv_stop(loop_); 94 } 95 96 { 97 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 98 for (auto& worker : workers_) { 99 delete worker; 100 } 101 workers_.clear(); 102 } 103 104 { 105 std::lock_guard<std::mutex> lock(callbackMutex_); 106 for (auto& [_, callbackPtr] : callbackTable_) { 107 if (callbackPtr == nullptr) { 108 continue; 109 } 110 callbackPtr.reset(); 111 } 112 callbackTable_.clear(); 113 } 114 115 { 116 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 117 for (auto& [_, task] : tasks_) { 118 delete task; 119 task = nullptr; 120 } 121 tasks_.clear(); 122 } 123 CountTraceForWorker(); 124} 125 126void TaskManager::CountTraceForWorker() 127{ 128 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 129 int64_t threadNum = static_cast<int64_t>(workers_.size()); 130 int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size()); 131 int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size()); 132 HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers); 133 HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum); 134 HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers); 135 HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers); 136} 137 138napi_value TaskManager::GetThreadInfos(napi_env env) 139{ 140 napi_value threadInfos = nullptr; 141 napi_create_array(env, &threadInfos); 142 { 143 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 144 int32_t i = 0; 145 for (auto& worker : workers_) { 146 if (worker->workerEnv_ == nullptr) { 147 continue; 148 } 149 napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->tid_)); 150 napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->priority_)); 151 152 napi_value taskId = nullptr; 153 napi_create_array(env, &taskId); 154 int32_t j = 0; 155 { 156 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_); 157 for (auto& currentId : worker->currentTaskId_) { 158 napi_value id = NapiHelper::CreateUint32(env, currentId); 159 napi_set_element(env, taskId, j, id); 160 j++; 161 } 162 } 163 napi_value threadInfo = nullptr; 164 napi_create_object(env, &threadInfo); 165 napi_set_named_property(env, threadInfo, "tid", tid); 166 napi_set_named_property(env, threadInfo, "priority", priority); 167 napi_set_named_property(env, threadInfo, "taskIds", taskId); 168 napi_set_element(env, threadInfos, i, threadInfo); 169 i++; 170 } 171 } 172 return threadInfos; 173} 174 175napi_value TaskManager::GetTaskInfos(napi_env env) 176{ 177 napi_value taskInfos = nullptr; 178 napi_create_array(env, &taskInfos); 179 { 180 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 181 int32_t i = 0; 182 for (const auto& [_, task] : tasks_) { 183 if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED || 184 task->taskState_ == ExecuteState::FINISHED) { 185 continue; 186 } 187 napi_value taskInfoValue = NapiHelper::CreateObject(env); 188 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_); 189 napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_); 190 napi_value name = nullptr; 191 napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name); 192 napi_set_named_property(env, taskInfoValue, "name", name); 193 ExecuteState state = task->taskState_; 194 uint64_t duration = 0; 195 if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) { 196 duration = ConcurrentHelper::GetMilliseconds() - task->startTime_; 197 } 198 napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state)); 199 napi_set_named_property(env, taskInfoValue, "taskId", taskId); 200 napi_set_named_property(env, taskInfoValue, "state", stateValue); 201 napi_value durationValue = NapiHelper::CreateUint32(env, duration); 202 napi_set_named_property(env, taskInfoValue, "duration", durationValue); 203 napi_set_element(env, taskInfos, i, taskInfoValue); 204 i++; 205 } 206 } 207 return taskInfos; 208} 209 210void TaskManager::UpdateExecutedInfo(uint64_t duration) 211{ 212 totalExecTime_ += duration; 213 totalExecCount_++; 214} 215 216uint32_t TaskManager::ComputeSuitableThreadNum() 217{ 218 uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers(); 219 return targetNum; 220} 221 222uint32_t TaskManager::ComputeSuitableIdleNum() 223{ 224 uint32_t targetNum = 0; 225 if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) { 226 // this branch is used for avoiding time-consuming tasks that may block the taskpool 227 targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum()); 228 } else if (totalExecCount_ != 0) { 229 auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_; 230 uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION); 231 targetNum = std::min(result, GetNonIdleTaskNum()); 232 } 233 return targetNum; 234} 235 236void TaskManager::CheckForBlockedWorkers() 237{ 238 // the threshold will be dynamically modified to provide more flexibility in detecting exceptions 239 // if the thread num has reached the limit and the idle worker is not available, a short time will be used, 240 // else we will choose the longer one 241 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 242 bool needChecking = false; 243 bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0); 244 uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME; 245 for (auto iter = workers_.begin(); iter != workers_.end(); iter++) { 246 auto worker = *iter; 247 // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout 248 // if the worker is executing the longTask, we will not do the check 249 if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) || 250 (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) || 251 !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) { 252 continue; 253 } 254 // When executing the promise task, the worker state may not be updated and will be 255 // marked as 'BLOCKED', so we should exclude this situation. 256 // Besides, if the worker is not executing sync tasks or micro tasks, it may handle 257 // the task like I/O in uv threads, we should also exclude this situation. 258 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_); 259 if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) { 260 if (!workerEngine->HasWaitingRequest()) { 261 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE); 262 } else { 263 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING); 264 worker->startTime_ = ConcurrentHelper::GetMilliseconds(); 265 } 266 continue; 267 } 268 269 HILOG_INFO("taskpool:: The worker has been marked as timeout."); 270 // If the current worker has a longTask and is not executing, we will only interrupt it. 271 if (worker->HasLongTask()) { 272 continue; 273 } 274 needChecking = true; 275 idleWorkers_.erase(worker); 276 timeoutWorkers_.insert(worker); 277 } 278 // should trigger the check when we have marked and removed workers 279 if (UNLIKELY(needChecking)) { 280 TryExpand(); 281 } 282} 283 284void TaskManager::TryTriggerExpand() 285{ 286 // post the signal to notify the monitor thread to expand 287 if (UNLIKELY(!isHandleInited_)) { 288 NotifyExecuteTask(); 289 needChecking_ = true; 290 HILOG_DEBUG("taskpool:: the expandHandle_ is nullptr"); 291 return; 292 } 293 uv_async_send(expandHandle_); 294} 295 296#if defined(OHOS_PLATFORM) 297// read /proc/[pid]/task/[tid]/stat to get the number of idle threads. 298bool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size) 299{ 300 char path[128]; // 128: buffer for path 301 pid_t pid = getpid(); 302 ssize_t bytesLen = -1; 303 int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid); 304 if (ret < 0) { 305 HILOG_ERROR("snprintf_s failed"); 306 return false; 307 } 308 int fd = open(path, O_RDONLY | O_NONBLOCK); 309 if (UNLIKELY(fd == -1)) { 310 return false; 311 } 312 bytesLen = read(fd, buf, size - 1); 313 close(fd); 314 if (bytesLen <= 0) { 315 HILOG_ERROR("taskpool:: failed to read %{public}s", path); 316 return false; 317 } 318 buf[bytesLen] = '\0'; 319 return true; 320} 321 322uint32_t TaskManager::GetIdleWorkers() 323{ 324 char buf[4096]; // 4096: buffer for thread info 325 uint32_t idleCount = 0; 326 std::unordered_set<pid_t> tids {}; 327 { 328 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 329 for (auto& worker : idleWorkers_) { 330#if defined(ENABLE_TASKPOOL_FFRT) 331 if (worker->ffrtTaskHandle_ != nullptr) { 332 if (worker->GetWaitTime() > 0) { 333 idleCount++; 334 } 335 continue; 336 } 337#endif 338 tids.emplace(worker->tid_); 339 } 340 } 341 // The ffrt thread does not read thread info 342 for (auto tid : tids) { 343 if (!ReadThreadInfo(tid, buf, sizeof(buf))) { 344 continue; 345 } 346 char state; 347 if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state 348 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state); 349 return 0; 350 } 351 if (state == 'S') { 352 idleCount++; 353 } 354 } 355 return idleCount; 356} 357 358void TaskManager::GetIdleWorkersList(uint32_t step) 359{ 360 char buf[4096]; // 4096: buffer for thread info 361 for (auto& worker : idleWorkers_) { 362#if defined(ENABLE_TASKPOOL_FFRT) 363 if (worker->ffrtTaskHandle_ != nullptr) { 364 uint64_t workerWaitTime = worker->GetWaitTime(); 365 bool isWorkerLoopActive = worker->IsLoopActive(); 366 if (workerWaitTime == 0) { 367 continue; 368 } 369 uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>( 370 std::chrono::steady_clock::now().time_since_epoch()).count()); 371 if (!isWorkerLoopActive) { 372 freeList_.emplace_back(worker); 373 } else if ((currTime - workerWaitTime) > IDLE_THRESHOLD * TRIGGER_INTERVAL) { 374 freeList_.emplace_back(worker); 375 HILOG_INFO("taskpool:: worker in ffrt epoll wait more than 2 intervals, force to free."); 376 } else { 377 // worker uv alive, and will be free in 2 intervals if not wake 378 HILOG_INFO("taskpool:: worker will be free if not wake."); 379 } 380 continue; 381 } 382#endif 383 if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) { 384 continue; 385 } 386 char state; 387 uint64_t utime; 388 if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu", 389 &state, sizeof(state), &utime) != 2) { // 2: state and utime 390 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_); 391 return; 392 } 393 if (state != 'S' || utime != worker->lastCpuTime_) { 394 worker->idleCount_ = 0; 395 worker->lastCpuTime_ = utime; 396 continue; 397 } 398 if (++worker->idleCount_ >= IDLE_THRESHOLD) { 399 freeList_.emplace_back(worker); 400 } 401 } 402} 403 404void TaskManager::TriggerShrink(uint32_t step) 405{ 406 GetIdleWorkersList(step); 407 step = std::min(step, static_cast<uint32_t>(freeList_.size())); 408 uint32_t count = 0; 409 for (size_t i = 0; i < freeList_.size(); i++) { 410 auto worker = freeList_[i]; 411 if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) { 412 continue; 413 } 414 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_; 415 if (idleTime < MAX_IDLE_TIME || worker->HasRunningTasks()) { 416 continue; 417 } 418 idleWorkers_.erase(worker); 419 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_); 420 worker->PostReleaseSignal(); 421 if (++count == step) { 422 break; 423 } 424 } 425 freeList_.clear(); 426} 427#else 428uint32_t TaskManager::GetIdleWorkers() 429{ 430 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 431 return idleWorkers_.size(); 432} 433 434void TaskManager::TriggerShrink(uint32_t step) 435{ 436 for (uint32_t i = 0; i < step; i++) { 437 // try to free the worker that idle time meets the requirement 438 auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker *worker) { 439 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_; 440 return idleTime > MAX_IDLE_TIME && !worker->HasRunningTasks() && !worker->HasLongTask(); 441 }); 442 // remove it from all sets 443 if (iter != idleWorkers_.end()) { 444 auto worker = *iter; 445 idleWorkers_.erase(worker); 446 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_); 447 worker->PostReleaseSignal(); 448 } 449 } 450} 451#endif 452 453void TaskManager::NotifyShrink(uint32_t targetNum) 454{ 455 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 456 uint32_t workerCount = workers_.size(); 457 uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS; 458 if (minThread == 0) { 459 HILOG_INFO("taskpool:: the system now is under low memory"); 460 } 461 if (workerCount > minThread && workerCount > targetNum) { 462 targetNum = std::max(minThread, targetNum); 463 uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP); 464 TriggerShrink(step); 465 } 466 // remove all timeout workers 467 for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) { 468 auto worker = *iter; 469 if (workers_.find(worker) == workers_.end()) { 470 HILOG_WARN("taskpool:: current worker maybe release"); 471 iter = timeoutWorkers_.erase(iter); 472 } else if (!worker->HasRunningTasks()) { 473 HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", worker->tid_); 474 worker->PostReleaseSignal(); 475 timeoutWorkers_.erase(iter++); 476 return; 477 } else { 478 iter++; 479 } 480 } 481 uint32_t idleNum = idleWorkers_.size(); 482 // System memory state is moderate and the worker has exeuted tasks, we will try to release it 483 if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) { 484 auto worker = *(idleWorkers_.begin()); 485 // worker that has longTask should not be released 486 if (worker == nullptr || worker->HasLongTask()) { 487 return; 488 } 489 if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released 490 TriggerShrink(DEFAULT_MIN_THREADS); 491 return; 492 } 493 } 494 495 // Create a worker for performance 496 if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) { 497 CreateWorkers(hostEnv_); 498 } 499 // stop the timer 500 if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) { 501 suspend_ = true; 502 uv_timer_stop(timer_); 503 HILOG_DEBUG("taskpool:: timer will be suspended"); 504 } 505} 506 507void TaskManager::TriggerLoadBalance(const uv_timer_t* req) 508{ 509 TaskManager& taskManager = TaskManager::GetInstance(); 510 taskManager.CheckForBlockedWorkers(); 511 uint32_t targetNum = taskManager.ComputeSuitableThreadNum(); 512 taskManager.NotifyShrink(targetNum); 513 taskManager.CountTraceForWorker(); 514} 515 516void TaskManager::TryExpand() 517{ 518 // dispatch task in the TaskPoolManager thread 519 NotifyExecuteTask(); 520 // do not trigger when there are more idleWorkers than tasks 521 uint32_t idleNum = GetIdleWorkers(); 522 if (idleNum > GetNonIdleTaskNum()) { 523 return; 524 } 525 needChecking_ = false; // do not need to check 526 uint32_t targetNum = ComputeSuitableIdleNum(); 527 uint32_t workerCount = 0; 528 uint32_t idleCount = 0; 529 uint32_t timeoutWorkers = 0; 530 { 531 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 532 idleCount = idleWorkers_.size(); 533 workerCount = workers_.size(); 534 timeoutWorkers = timeoutWorkers_.size(); 535 } 536 uint32_t maxThreads = std::max(ConcurrentHelper::GetMaxThreads(), DEFAULT_THREADS); 537 maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads 538 if (workerCount < maxThreads && idleCount < targetNum) { 539 uint32_t step = std::min(maxThreads, targetNum) - idleCount; 540 // Prevent the total number of expanded threads from exceeding maxThreads 541 if (step + workerCount > maxThreads) { 542 step = maxThreads - workerCount; 543 } 544 CreateWorkers(hostEnv_, step); 545 HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u", 546 maxThreads, step, GetThreadNum()); 547 } 548 if (UNLIKELY(suspend_)) { 549 suspend_ = false; 550 uv_timer_again(timer_); 551 } 552} 553 554void TaskManager::NotifyExpand(const uv_async_t* req) 555{ 556 TaskManager& taskManager = TaskManager::GetInstance(); 557 taskManager.TryExpand(); 558} 559 560void TaskManager::RunTaskManager() 561{ 562 loop_ = uv_loop_new(); 563 if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE 564 HILOG_FATAL("taskpool:: new loop failed."); 565 return; 566 } 567 ConcurrentHelper::UvHandleInit(loop_, expandHandle_, TaskManager::NotifyExpand); 568 timer_ = new uv_timer_t; 569 uv_timer_init(loop_, timer_); 570 uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL); 571 isHandleInited_ = true; 572#if defined IOS_PLATFORM || defined MAC_PLATFORM 573 pthread_setname_np("OS_TaskManager"); 574#else 575 pthread_setname_np(pthread_self(), "OS_TaskManager"); 576#endif 577 if (UNLIKELY(needChecking_)) { 578 needChecking_ = false; 579 uv_async_send(expandHandle_); 580 } 581 uv_run(loop_, UV_RUN_DEFAULT); 582 if (loop_ != nullptr) { 583 uv_loop_delete(loop_); 584 } 585} 586 587void TaskManager::CancelTask(napi_env env, uint64_t taskId) 588{ 589 // 1. Cannot find taskInfo by executeId, throw error 590 // 2. Find executing taskInfo, skip it 591 // 3. Find waiting taskInfo, cancel it 592 // 4. Find canceled taskInfo, skip it 593 std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId); 594 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str()); 595 HITRACE_HELPER_METER_NAME(strTrace); 596 Task* task = GetTask(taskId); 597 if (task == nullptr) { 598 std::string errMsg = "taskpool:: the task may not exist"; 599 HILOG_ERROR("%{public}s", errMsg.c_str()); 600 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); 601 return; 602 } 603 if (task->taskState_ == ExecuteState::CANCELED) { 604 HILOG_DEBUG("taskpool:: task has been canceled"); 605 return; 606 } 607 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_); 608 if (task->IsPeriodicTask()) { 609 task->CancelPendingTask(env); 610 uv_timer_stop(task->timer_); 611 uv_close(reinterpret_cast<uv_handle_t*>(task->timer_), [](uv_handle_t* handle) { 612 delete (uv_timer_t*)handle; 613 handle = nullptr; 614 }); 615 return; 616 } else if (task->IsSeqRunnerTask()) { 617 CancelSeqRunnerTask(env, task); 618 return; 619 } 620 if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) || 621 task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED || 622 task->taskState_ == ExecuteState::ENDING) { 623 std::string errMsg = "taskpool:: task is not executed or has been executed"; 624 HILOG_ERROR("%{public}s", errMsg.c_str()); 625 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); 626 return; 627 } 628 629 task->ClearDelayedTimers(); 630 ExecuteState state = task->taskState_.exchange(ExecuteState::CANCELED); 631 task->CancelPendingTask(env); 632 if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) { 633 reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter(); 634 task->DecreaseTaskRefCount(); 635 EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority); 636 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: task has been canceled"); 637 napi_reject_deferred(env, task->currentTaskInfo_->deferred, error); 638 napi_reference_unref(env, task->taskRef_, nullptr); 639 delete task->currentTaskInfo_; 640 task->currentTaskInfo_ = nullptr; 641 } 642} 643 644void TaskManager::CancelSeqRunnerTask(napi_env env, Task *task) 645{ 646 if (task->taskState_ == ExecuteState::FINISHED) { 647 std::string errMsg = "taskpool:: sequenceRunner task has been executed"; 648 HILOG_ERROR("%{public}s", errMsg.c_str()); 649 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); 650 } else { 651 task->taskState_ = ExecuteState::CANCELED; 652 } 653} 654 655void TaskManager::NotifyWorkerIdle(Worker* worker) 656{ 657 { 658 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 659 if (worker->state_ == WorkerState::BLOCKED) { 660 return; 661 } 662 idleWorkers_.insert(worker); 663 } 664 if (GetTaskNum() != 0) { 665 NotifyExecuteTask(); 666 } 667 CountTraceForWorker(); 668} 669 670void TaskManager::NotifyWorkerCreated(Worker* worker) 671{ 672 NotifyWorkerIdle(worker); 673} 674 675void TaskManager::NotifyWorkerAdded(Worker* worker) 676{ 677 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 678 workers_.insert(worker); 679 HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size()); 680} 681 682void TaskManager::NotifyWorkerRunning(Worker* worker) 683{ 684 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 685 idleWorkers_.erase(worker); 686 CountTraceForWorker(); 687} 688 689uint32_t TaskManager::GetRunningWorkers() 690{ 691 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 692 return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) { 693 return worker->HasRunningTasks(); 694 }); 695} 696 697uint32_t TaskManager::GetTimeoutWorkers() 698{ 699 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 700 return timeoutWorkers_.size(); 701} 702 703uint32_t TaskManager::GetTaskNum() 704{ 705 std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_); 706 uint32_t sum = 0; 707 for (const auto& elements : taskQueues_) { 708 sum += elements->GetTaskNum(); 709 } 710 return sum; 711} 712 713uint32_t TaskManager::GetNonIdleTaskNum() 714{ 715 return nonIdleTaskNum_; 716} 717 718void TaskManager::IncreaseNumIfNoIdle(Priority priority) 719{ 720 if (priority != Priority::IDLE) { 721 ++nonIdleTaskNum_; 722 } 723} 724 725void TaskManager::DecreaseNumIfNoIdle(Priority priority) 726{ 727 if (priority != Priority::IDLE) { 728 --nonIdleTaskNum_; 729 } 730} 731 732uint32_t TaskManager::GetThreadNum() 733{ 734 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 735 return workers_.size(); 736} 737 738void TaskManager::EnqueueTaskId(uint64_t taskId, Priority priority) 739{ 740 { 741 std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_); 742 IncreaseNumIfNoIdle(priority); 743 taskQueues_[priority]->EnqueueTaskId(taskId); 744 } 745 TryTriggerExpand(); 746 Task* task = GetTask(taskId); 747 if (task == nullptr) { 748 HILOG_FATAL("taskpool:: task is nullptr"); 749 return; 750 } 751 task->IncreaseTaskRefCount(); 752 if (task->onEnqueuedCallBackInfo_ != nullptr) { 753 task->ExecuteListenerCallback(task->onEnqueuedCallBackInfo_); 754 } 755} 756 757void TaskManager::EraseWaitingTaskId(uint64_t taskId, Priority priority) 758{ 759 std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_); 760 if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) { 761 HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel"); 762 } 763} 764 765std::pair<uint64_t, Priority> TaskManager::DequeueTaskId() 766{ 767 std::lock_guard<FFRT_MUTEX> lock(taskQueuesMutex_); 768 auto& highTaskQueue = taskQueues_[Priority::HIGH]; 769 if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) { 770 highPrioExecuteCount_++; 771 return GetTaskByPriority(highTaskQueue, Priority::HIGH); 772 } 773 highPrioExecuteCount_ = 0; 774 775 auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM]; 776 if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) { 777 mediumPrioExecuteCount_++; 778 return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM); 779 } 780 mediumPrioExecuteCount_ = 0; 781 782 auto& lowTaskQueue = taskQueues_[Priority::LOW]; 783 if (!lowTaskQueue->IsEmpty()) { 784 return GetTaskByPriority(lowTaskQueue, Priority::LOW); 785 } 786 787 auto& idleTaskQueue = taskQueues_[Priority::IDLE]; 788 if (highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() && IsChooseIdle()) { 789 return GetTaskByPriority(idleTaskQueue, Priority::IDLE); 790 } 791 return std::make_pair(0, Priority::LOW); 792} 793 794bool TaskManager::IsChooseIdle() 795{ 796 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 797 for (auto& worker : workers_) { 798 if (worker->state_ == WorkerState::IDLE) { 799 // If worker->state_ is WorkerState::IDLE, it means that the worker is free 800 continue; 801 } 802 // If there is a worker running a task, do not take the idle task. 803 return false; 804 } 805 // Only when all workers are free, will idle task be taken. 806 return true; 807} 808 809std::pair<uint64_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue, 810 Priority priority) 811{ 812 uint64_t taskId = taskQueue->DequeueTaskId(); 813 if (IsDependendByTaskId(taskId)) { 814 EnqueuePendingTaskInfo(taskId, priority); 815 return std::make_pair(0, priority); 816 } 817 DecreaseNumIfNoIdle(priority); 818 return std::make_pair(taskId, priority); 819} 820 821void TaskManager::NotifyExecuteTask() 822{ 823 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 824 if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) { 825 // When there are only idle tasks and workers executing them, it is not triggered 826 return; 827 } 828 for (auto& worker : idleWorkers_) { 829 worker->NotifyExecuteTask(); 830 } 831} 832 833void TaskManager::InitTaskManager(napi_env env) 834{ 835 HITRACE_HELPER_METER_NAME("InitTaskManager"); 836 if (!isInitialized_.exchange(true, std::memory_order_relaxed)) { 837#if defined(ENABLE_TASKPOOL_FFRT) 838 globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0); 839 if (!globalEnableFfrtFlag_) { 840 UpdateSystemAppFlag(); 841 if (IsSystemApp()) { 842 disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0); 843 } 844 } 845 if (EnableFfrt()) { 846 HILOG_INFO("taskpool:: apps use ffrt"); 847 } else { 848 HILOG_INFO("taskpool:: apps do not use ffrt"); 849 } 850#endif 851#if defined(ENABLE_TASKPOOL_EVENTHANDLER) 852 mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>( 853 OHOS::AppExecFwk::EventRunner::GetMainEventRunner()); 854#endif 855 auto mainThreadEngine = NativeEngine::GetMainThreadEngine(); 856 if (mainThreadEngine == nullptr) { 857 HILOG_FATAL("taskpool:: mainThreadEngine is nullptr"); 858 return; 859 } 860 hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine); 861 // Add a reserved thread for taskpool 862 CreateWorkers(hostEnv_); 863 // Create a timer to manage worker threads 864 std::thread workerManager([this] {this->RunTaskManager();}); 865 workerManager.detach(); 866 } 867} 868 869void TaskManager::CreateWorkers(napi_env env, uint32_t num) 870{ 871 HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num); 872 for (uint32_t i = 0; i < num; i++) { 873 auto worker = Worker::WorkerConstructor(env); 874 NotifyWorkerAdded(worker); 875 } 876 CountTraceForWorker(); 877} 878 879void TaskManager::RemoveWorker(Worker* worker) 880{ 881 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 882 idleWorkers_.erase(worker); 883 timeoutWorkers_.erase(worker); 884 workers_.erase(worker); 885} 886 887void TaskManager::RestoreWorker(Worker* worker) 888{ 889 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_); 890 if (UNLIKELY(suspend_)) { 891 suspend_ = false; 892 uv_timer_again(timer_); 893 } 894 if (worker->state_ == WorkerState::BLOCKED) { 895 // since the worker is blocked, we should add it to the timeout set 896 timeoutWorkers_.insert(worker); 897 return; 898 } 899 // Since the worker may be executing some tasks in IO thread, we should add it to the 900 // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread. 901 HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size()); 902 idleWorkers_.emplace_hint(idleWorkers_.end(), worker); 903 if (GetTaskNum() != 0) { 904 NotifyExecuteTask(); 905 } 906} 907 908// ---------------------------------- SendData --------------------------------------- 909void TaskManager::RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo) 910{ 911 std::lock_guard<std::mutex> lock(callbackMutex_); 912 callbackTable_[taskId] = callbackInfo; 913} 914 915std::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint64_t taskId) 916{ 917 std::lock_guard<std::mutex> lock(callbackMutex_); 918 auto iter = callbackTable_.find(taskId); 919 if (iter == callbackTable_.end() || iter->second == nullptr) { 920 HILOG_ERROR("taskpool:: the callback does not exist"); 921 return nullptr; 922 } 923 return iter->second; 924} 925 926void TaskManager::IncreaseRefCount(uint64_t taskId) 927{ 928 if (taskId == 0) { // do not support func 929 return; 930 } 931 std::lock_guard<std::mutex> lock(callbackMutex_); 932 auto iter = callbackTable_.find(taskId); 933 if (iter == callbackTable_.end() || iter->second == nullptr) { 934 return; 935 } 936 iter->second->refCount++; 937} 938 939void TaskManager::DecreaseRefCount(napi_env env, uint64_t taskId) 940{ 941 if (taskId == 0) { // do not support func 942 return; 943 } 944 std::lock_guard<std::mutex> lock(callbackMutex_); 945 auto iter = callbackTable_.find(taskId); 946 if (iter == callbackTable_.end() || iter->second == nullptr) { 947 return; 948 } 949 950 auto task = reinterpret_cast<Task*>(taskId); 951 if (!task->IsValid()) { 952 callbackTable_.erase(iter); 953 return; 954 } 955 956 iter->second->refCount--; 957 if (iter->second->refCount == 0) { 958 callbackTable_.erase(iter); 959 } 960} 961 962napi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task) 963{ 964 HILOG_DEBUG("taskpool:: task:%{public}s NotifyCallbackExecute", std::to_string(task->taskId_).c_str()); 965 std::lock_guard<std::mutex> lock(callbackMutex_); 966 auto iter = callbackTable_.find(task->taskId_); 967 if (iter == callbackTable_.end() || iter->second == nullptr) { 968 HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side"); 969 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED); 970 delete resultInfo; 971 return nullptr; 972 } 973 Worker* worker = static_cast<Worker*>(task->worker_); 974 worker->Enqueue(task->env_, resultInfo); 975 auto callbackInfo = iter->second; 976 callbackInfo->refCount++; 977 callbackInfo->worker = worker; 978 auto workerEngine = reinterpret_cast<NativeEngine*>(env); 979 workerEngine->IncreaseListeningCounter(); 980#if defined(ENABLE_TASKPOOL_EVENTHANDLER) 981 if (task->IsMainThreadTask()) { 982 HITRACE_HELPER_METER_NAME("NotifyCallbackExecute: PostTask"); 983 auto onCallbackTask = [callbackInfo]() { 984 TaskPool::ExecuteCallbackTask(callbackInfo.get()); 985 }; 986 TaskManager::GetInstance().PostTask(onCallbackTask, "TaskPoolOnCallbackTask", worker->priority_); 987 } else { 988 callbackInfo->onCallbackSignal->data = callbackInfo.get(); 989 uv_async_send(callbackInfo->onCallbackSignal); 990 } 991#else 992 callbackInfo->onCallbackSignal->data = callbackInfo.get(); 993 uv_async_send(callbackInfo->onCallbackSignal); 994#endif 995 return nullptr; 996} 997 998MsgQueue* TaskManager::GetMessageQueue(const uv_async_t* req) 999{ 1000 std::lock_guard<std::mutex> lock(callbackMutex_); 1001 auto info = static_cast<CallbackInfo*>(req->data); 1002 if (info == nullptr || info->worker == nullptr) { 1003 HILOG_ERROR("taskpool:: info or worker is nullptr"); 1004 return nullptr; 1005 } 1006 auto worker = info->worker; 1007 MsgQueue* queue = nullptr; 1008 worker->Dequeue(info->hostEnv, queue); 1009 return queue; 1010} 1011 1012MsgQueue* TaskManager::GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo) 1013{ 1014 std::lock_guard<std::mutex> lock(callbackMutex_); 1015 if (callbackInfo == nullptr || callbackInfo->worker == nullptr) { 1016 HILOG_ERROR("taskpool:: callbackInfo or worker is nullptr"); 1017 return nullptr; 1018 } 1019 auto worker = callbackInfo->worker; 1020 MsgQueue* queue = nullptr; 1021 worker->Dequeue(callbackInfo->hostEnv, queue); 1022 return queue; 1023} 1024// ---------------------------------- SendData --------------------------------------- 1025 1026void TaskManager::NotifyDependencyTaskInfo(uint64_t taskId) 1027{ 1028 HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str()); 1029 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); 1030 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_); 1031 auto iter = dependentTaskInfos_.find(taskId); 1032 if (iter == dependentTaskInfos_.end() || iter->second.empty()) { 1033 HILOG_DEBUG("taskpool:: dependentTaskInfo empty"); 1034 return; 1035 } 1036 for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) { 1037 auto taskInfo = DequeuePendingTaskInfo(*taskIdIter); 1038 RemoveDependencyById(taskId, *taskIdIter); 1039 taskIdIter = iter->second.erase(taskIdIter); 1040 if (taskInfo.first != 0) { 1041 EnqueueTaskId(taskInfo.first, taskInfo.second); 1042 } 1043 } 1044} 1045 1046void TaskManager::RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId) 1047{ 1048 HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str()); 1049 // remove dependency after task execute 1050 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 1051 auto dependTaskIter = dependTaskInfos_.find(taskId); 1052 if (dependTaskIter != dependTaskInfos_.end()) { 1053 auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId); 1054 if (dependTaskInnerIter != dependTaskIter->second.end()) { 1055 dependTaskIter->second.erase(dependTaskInnerIter); 1056 } 1057 } 1058} 1059 1060bool TaskManager::IsDependendByTaskId(uint64_t taskId) 1061{ 1062 std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 1063 auto iter = dependTaskInfos_.find(taskId); 1064 if (iter == dependTaskInfos_.end() || iter->second.empty()) { 1065 return false; 1066 } 1067 return true; 1068} 1069 1070bool TaskManager::IsDependentByTaskId(uint64_t dependentTaskId) 1071{ 1072 std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_); 1073 auto iter = dependentTaskInfos_.find(dependentTaskId); 1074 if (iter == dependentTaskInfos_.end() || iter->second.empty()) { 1075 return false; 1076 } 1077 return true; 1078} 1079 1080bool TaskManager::StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet) 1081{ 1082 HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str()); 1083 StoreDependentTaskInfo(taskIdSet, taskId); 1084 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 1085 auto iter = dependTaskInfos_.find(taskId); 1086 if (iter == dependTaskInfos_.end()) { 1087 for (const auto& dependentId : taskIdSet) { 1088 auto idIter = dependTaskInfos_.find(dependentId); 1089 if (idIter == dependTaskInfos_.end()) { 1090 continue; 1091 } 1092 if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) { 1093 return false; 1094 } 1095 } 1096 dependTaskInfos_.emplace(taskId, std::move(taskIdSet)); 1097 return true; 1098 } 1099 1100 for (const auto& dependentId : iter->second) { 1101 auto idIter = dependTaskInfos_.find(dependentId); 1102 if (idIter == dependTaskInfos_.end()) { 1103 continue; 1104 } 1105 if (!CheckCircularDependency(iter->second, idIter->second, taskId)) { 1106 return false; 1107 } 1108 } 1109 iter->second.insert(taskIdSet.begin(), taskIdSet.end()); 1110 return true; 1111} 1112 1113bool TaskManager::CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId) 1114{ 1115 for (const auto& id : idSet) { 1116 if (id == taskId) { 1117 return false; 1118 } 1119 auto iter = dependentIdSet.find(id); 1120 if (iter != dependentIdSet.end()) { 1121 continue; 1122 } 1123 auto dIter = dependTaskInfos_.find(id); 1124 if (dIter == dependTaskInfos_.end()) { 1125 continue; 1126 } 1127 if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) { 1128 return false; 1129 } 1130 } 1131 return true; 1132} 1133 1134bool TaskManager::RemoveTaskDependency(uint64_t taskId, uint64_t dependentId) 1135{ 1136 HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str()); 1137 RemoveDependentTaskInfo(dependentId, taskId); 1138 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 1139 auto iter = dependTaskInfos_.find(taskId); 1140 if (iter == dependTaskInfos_.end()) { 1141 return false; 1142 } 1143 auto dependIter = iter->second.find(dependentId); 1144 if (dependIter == iter->second.end()) { 1145 return false; 1146 } 1147 iter->second.erase(dependIter); 1148 return true; 1149} 1150 1151void TaskManager::EnqueuePendingTaskInfo(uint64_t taskId, Priority priority) 1152{ 1153 if (taskId == 0) { 1154 return; 1155 } 1156 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_); 1157 pendingTaskInfos_.emplace(taskId, priority); 1158} 1159 1160std::pair<uint64_t, Priority> TaskManager::DequeuePendingTaskInfo(uint64_t taskId) 1161{ 1162 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_); 1163 if (pendingTaskInfos_.empty()) { 1164 return std::make_pair(0, Priority::DEFAULT); 1165 } 1166 std::pair<uint64_t, Priority> result; 1167 for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) { 1168 if (it->first == taskId) { 1169 result = std::make_pair(it->first, it->second); 1170 it = pendingTaskInfos_.erase(it); 1171 break; 1172 } 1173 } 1174 return result; 1175} 1176 1177void TaskManager::RemovePendingTaskInfo(uint64_t taskId) 1178{ 1179 HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str()); 1180 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_); 1181 pendingTaskInfos_.erase(taskId); 1182} 1183 1184void TaskManager::StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet, uint64_t taskId) 1185{ 1186 HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str()); 1187 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_); 1188 for (const auto& id : dependentTaskIdSet) { 1189 auto iter = dependentTaskInfos_.find(id); 1190 if (iter == dependentTaskInfos_.end()) { 1191 std::set<uint64_t> set{taskId}; 1192 dependentTaskInfos_.emplace(id, std::move(set)); 1193 } else { 1194 iter->second.emplace(taskId); 1195 } 1196 } 1197} 1198 1199void TaskManager::RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId) 1200{ 1201 HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str()); 1202 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_); 1203 auto iter = dependentTaskInfos_.find(dependentTaskId); 1204 if (iter == dependentTaskInfos_.end()) { 1205 return; 1206 } 1207 auto taskIter = iter->second.find(taskId); 1208 if (taskIter == iter->second.end()) { 1209 return; 1210 } 1211 iter->second.erase(taskIter); 1212} 1213 1214std::string TaskManager::GetTaskDependInfoToString(uint64_t taskId) 1215{ 1216 std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 1217 std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:"; 1218 auto iter = dependTaskInfos_.find(taskId); 1219 if (iter != dependTaskInfos_.end()) { 1220 for (const auto& id : iter->second) { 1221 str += " " + std::to_string(id); 1222 } 1223 } 1224 return str; 1225} 1226 1227void TaskManager::StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration) 1228{ 1229 HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str()); 1230 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_); 1231 auto iter = taskDurationInfos_.find(taskId); 1232 if (iter == taskDurationInfos_.end()) { 1233 std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration); 1234 taskDurationInfos_.emplace(taskId, std::move(durationData)); 1235 } else { 1236 if (totalDuration != 0) { 1237 iter->second.first = totalDuration; 1238 } 1239 if (cpuDuration != 0) { 1240 iter->second.second = cpuDuration; 1241 } 1242 } 1243} 1244 1245uint64_t TaskManager::GetTaskDuration(uint64_t taskId, std::string durationType) 1246{ 1247 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_); 1248 auto iter = taskDurationInfos_.find(taskId); 1249 if (iter == taskDurationInfos_.end()) { 1250 return 0; 1251 } 1252 if (durationType == TASK_TOTAL_TIME) { 1253 return iter->second.first; 1254 } else if (durationType == TASK_CPU_TIME) { 1255 return iter->second.second; 1256 } else if (iter->second.first == 0) { 1257 return 0; 1258 } 1259 return iter->second.first - iter->second.second; 1260} 1261 1262void TaskManager::RemoveTaskDuration(uint64_t taskId) 1263{ 1264 HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str()); 1265 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_); 1266 auto iter = taskDurationInfos_.find(taskId); 1267 if (iter != taskDurationInfos_.end()) { 1268 taskDurationInfos_.erase(iter); 1269 } 1270} 1271 1272void TaskManager::StoreLongTaskInfo(uint64_t taskId, Worker* worker) 1273{ 1274 std::unique_lock<std::shared_mutex> lock(longTasksMutex_); 1275 longTasksMap_.emplace(taskId, worker); 1276} 1277 1278void TaskManager::RemoveLongTaskInfo(uint64_t taskId) 1279{ 1280 std::unique_lock<std::shared_mutex> lock(longTasksMutex_); 1281 longTasksMap_.erase(taskId); 1282} 1283 1284Worker* TaskManager::GetLongTaskInfo(uint64_t taskId) 1285{ 1286 std::shared_lock<std::shared_mutex> lock(longTasksMutex_); 1287 auto iter = longTasksMap_.find(taskId); 1288 return iter != longTasksMap_.end() ? iter->second : nullptr; 1289} 1290 1291void TaskManager::TerminateTask(uint64_t taskId) 1292{ 1293 HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str()); 1294 auto worker = GetLongTaskInfo(taskId); 1295 if (UNLIKELY(worker == nullptr)) { 1296 return; 1297 } 1298 worker->TerminateTask(taskId); 1299 RemoveLongTaskInfo(taskId); 1300} 1301 1302void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask) 1303{ 1304 uint64_t taskId = task->taskId_; 1305 if (shouldDeleteTask) { 1306 RemoveTask(taskId); 1307 } 1308 if (task->onResultSignal_ != nullptr) { 1309 if (!uv_is_closing((uv_handle_t*)task->onResultSignal_)) { 1310 ConcurrentHelper::UvHandleClose(task->onResultSignal_); 1311 } else { 1312 delete task->onResultSignal_; 1313 } 1314 task->onResultSignal_ = nullptr; 1315 } 1316 1317 if (task->currentTaskInfo_ != nullptr) { 1318 delete task->currentTaskInfo_; 1319 task->currentTaskInfo_ = nullptr; 1320 } 1321 1322 task->CancelPendingTask(env); 1323 1324 task->ClearDelayedTimers(); 1325 1326 if (task->IsFunctionTask() || task->IsGroupFunctionTask()) { 1327 return; 1328 } 1329 DecreaseRefCount(env, taskId); 1330 RemoveTaskDuration(taskId); 1331 RemovePendingTaskInfo(taskId); 1332 ReleaseCallBackInfo(task); 1333 { 1334 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_); 1335 for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) { 1336 if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) { 1337 dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter); 1338 } else { 1339 ++dependentTaskIter; 1340 } 1341 } 1342 } 1343 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_); 1344 auto dependTaskIter = dependTaskInfos_.find(taskId); 1345 if (dependTaskIter != dependTaskInfos_.end()) { 1346 dependTaskInfos_.erase(dependTaskIter); 1347 } 1348} 1349 1350void TaskManager::ReleaseCallBackInfo(Task* task) 1351{ 1352 HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str()); 1353 if (task->onEnqueuedCallBackInfo_ != nullptr) { 1354 delete task->onEnqueuedCallBackInfo_; 1355 task->onEnqueuedCallBackInfo_ = nullptr; 1356 } 1357 1358 if (task->onStartExecutionCallBackInfo_ != nullptr) { 1359 delete task->onStartExecutionCallBackInfo_; 1360 task->onStartExecutionCallBackInfo_ = nullptr; 1361 } 1362 1363 if (task->onExecutionFailedCallBackInfo_ != nullptr) { 1364 delete task->onExecutionFailedCallBackInfo_; 1365 task->onExecutionFailedCallBackInfo_ = nullptr; 1366 } 1367 1368 if (task->onExecutionSucceededCallBackInfo_ != nullptr) { 1369 delete task->onExecutionSucceededCallBackInfo_; 1370 task->onExecutionSucceededCallBackInfo_ = nullptr; 1371 } 1372 1373#if defined(ENABLE_TASKPOOL_EVENTHANDLER) 1374 if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) { 1375 if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) { 1376 ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_); 1377 } else { 1378 delete task->onStartExecutionSignal_; 1379 } 1380 task->onStartExecutionSignal_ = nullptr; 1381 } 1382#else 1383 if (task->onStartExecutionSignal_ != nullptr) { 1384 if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) { 1385 ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_); 1386 } else { 1387 delete task->onStartExecutionSignal_; 1388 } 1389 task->onStartExecutionSignal_ = nullptr; 1390 } 1391#endif 1392} 1393 1394void TaskManager::StoreTask(uint64_t taskId, Task* task) 1395{ 1396 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 1397 tasks_.emplace(taskId, task); 1398} 1399 1400void TaskManager::RemoveTask(uint64_t taskId) 1401{ 1402 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 1403 tasks_.erase(taskId); 1404} 1405 1406Task* TaskManager::GetTask(uint64_t taskId) 1407{ 1408 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 1409 auto iter = tasks_.find(taskId); 1410 if (iter == tasks_.end()) { 1411 return nullptr; 1412 } 1413 return iter->second; 1414} 1415 1416#if defined(ENABLE_TASKPOOL_FFRT) 1417void TaskManager::UpdateSystemAppFlag() 1418{ 1419 auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager(); 1420 if (abilityManager == nullptr) { 1421 HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr."); 1422 return; 1423 } 1424 auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID); 1425 if (bundleObj == nullptr) { 1426 HILOG_ERROR("taskpool:: fail to get bundle manager service."); 1427 return; 1428 } 1429 auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj); 1430 if (bundleMgr == nullptr) { 1431 HILOG_ERROR("taskpool:: Bundle manager is nullptr."); 1432 return; 1433 } 1434 OHOS::AppExecFwk::BundleInfo bundleInfo; 1435 if (bundleMgr->GetBundleInfoForSelf( 1436 static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo) 1437 != OHOS::ERR_OK) { 1438 HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf"); 1439 return; 1440 } 1441 isSystemApp_ = bundleInfo.applicationInfo.isSystemApp; 1442} 1443#endif 1444 1445#if defined(ENABLE_TASKPOOL_EVENTHANDLER) 1446bool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority) 1447{ 1448 return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority)); 1449} 1450#endif 1451 1452bool TaskManager::CheckTask(uint64_t taskId) 1453{ 1454 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_); 1455 auto item = tasks_.find(taskId); 1456 return item != tasks_.end(); 1457} 1458 1459// ----------------------------------- TaskGroupManager ---------------------------------------- 1460TaskGroupManager& TaskGroupManager::GetInstance() 1461{ 1462 static TaskGroupManager groupManager; 1463 return groupManager; 1464} 1465 1466void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId) 1467{ 1468 std::lock_guard<std::mutex> lock(taskGroupsMutex_); 1469 auto groupIter = taskGroups_.find(groupId); 1470 if (groupIter == taskGroups_.end()) { 1471 HILOG_DEBUG("taskpool:: taskGroup has been released"); 1472 return; 1473 } 1474 auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second); 1475 if (taskGroup == nullptr) { 1476 HILOG_ERROR("taskpool:: taskGroup is null"); 1477 return; 1478 } 1479 taskGroup->taskRefs_.push_back(taskRef); 1480 taskGroup->taskNum_++; 1481 taskGroup->taskIds_.push_back(taskId); 1482} 1483 1484void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group) 1485{ 1486 HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group"); 1487 TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_); 1488 for (uint64_t taskId : group->taskIds_) { 1489 Task* task = TaskManager::GetInstance().GetTask(taskId); 1490 if (task == nullptr || !task->IsValid()) { 1491 continue; 1492 } 1493 napi_reference_unref(task->env_, task->taskRef_, nullptr); 1494 } 1495 1496 if (group->currentGroupInfo_ != nullptr) { 1497 delete group->currentGroupInfo_; 1498 } 1499 1500 group->CancelPendingGroup(env); 1501} 1502 1503void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId) 1504{ 1505 std::string strTrace = "CancelGroup: groupId: " + std::to_string(groupId); 1506 HITRACE_HELPER_METER_NAME(strTrace); 1507 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str()); 1508 TaskGroup* taskGroup = GetTaskGroup(groupId); 1509 if (taskGroup == nullptr) { 1510 HILOG_ERROR("taskpool:: CancelGroup group is nullptr"); 1511 return; 1512 } 1513 if (taskGroup->groupState_ == ExecuteState::CANCELED) { 1514 return; 1515 } 1516 std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_); 1517 if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND || 1518 taskGroup->groupState_ == ExecuteState::FINISHED) { 1519 std::string errMsg = "taskpool:: taskGroup is not executed or has been executed"; 1520 HILOG_ERROR("%{public}s", errMsg.c_str()); 1521 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str()); 1522 return; 1523 } 1524 ExecuteState groupState = taskGroup->groupState_; 1525 taskGroup->groupState_ = ExecuteState::CANCELED; 1526 taskGroup->CancelPendingGroup(env); 1527 if (taskGroup->currentGroupInfo_->finishedTask != taskGroup->taskNum_) { 1528 for (uint64_t taskId : taskGroup->taskIds_) { 1529 CancelGroupTask(env, taskId, taskGroup); 1530 } 1531 } 1532 if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) { 1533 auto engine = reinterpret_cast<NativeEngine*>(env); 1534 for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) { 1535 engine->DecreaseSubEnvCounter(); 1536 } 1537 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled"); 1538 napi_reject_deferred(env, taskGroup->currentGroupInfo_->deferred, error); 1539 napi_delete_reference(env, taskGroup->currentGroupInfo_->resArr); 1540 napi_reference_unref(env, taskGroup->groupRef_, nullptr); 1541 delete taskGroup->currentGroupInfo_; 1542 taskGroup->currentGroupInfo_ = nullptr; 1543 } 1544} 1545 1546void TaskGroupManager::CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group) 1547{ 1548 HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str()); 1549 auto task = TaskManager::GetInstance().GetTask(taskId); 1550 if (task == nullptr) { 1551 HILOG_INFO("taskpool:: CancelGroupTask task is nullptr"); 1552 return; 1553 } 1554 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_); 1555 if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) { 1556 reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter(); 1557 task->DecreaseTaskRefCount(); 1558 TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority); 1559 delete task->currentTaskInfo_; 1560 task->currentTaskInfo_ = nullptr; 1561 } 1562 task->taskState_ = ExecuteState::CANCELED; 1563} 1564 1565void TaskGroupManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner) 1566{ 1567 std::unique_lock<std::mutex> lock(seqRunnersMutex_); 1568 seqRunners_.emplace(seqRunnerId, seqRunner); 1569} 1570 1571void TaskGroupManager::RemoveSequenceRunner(uint64_t seqRunnerId) 1572{ 1573 std::unique_lock<std::mutex> lock(seqRunnersMutex_); 1574 seqRunners_.erase(seqRunnerId); 1575} 1576 1577SequenceRunner* TaskGroupManager::GetSeqRunner(uint64_t seqRunnerId) 1578{ 1579 std::unique_lock<std::mutex> lock(seqRunnersMutex_); 1580 auto iter = seqRunners_.find(seqRunnerId); 1581 if (iter != seqRunners_.end()) { 1582 return iter->second; 1583 } 1584 HILOG_DEBUG("taskpool:: sequenceRunner has been released."); 1585 return nullptr; 1586} 1587 1588void TaskGroupManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task) 1589{ 1590 std::unique_lock<std::mutex> lock(seqRunnersMutex_); 1591 auto iter = seqRunners_.find(seqRunnerId); 1592 if (iter == seqRunners_.end()) { 1593 HILOG_ERROR("seqRunner:: seqRunner not found."); 1594 return; 1595 } else { 1596 std::unique_lock<std::shared_mutex> seqRunnerLock(iter->second->seqRunnerMutex_); 1597 iter->second->seqRunnerTasks_.push(task); 1598 } 1599} 1600 1601bool TaskGroupManager::TriggerSeqRunner(napi_env env, Task* lastTask) 1602{ 1603 uint64_t seqRunnerId = lastTask->seqRunnerId_; 1604 SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId); 1605 if (seqRunner == nullptr) { 1606 HILOG_ERROR("seqRunner:: trigger seqRunner not exist."); 1607 return false; 1608 } 1609 if (!SequenceRunnerManager::GetInstance().TriggerGlobalSeqRunner(env, seqRunner)) { 1610 HILOG_ERROR("seqRunner:: trigger globalSeqRunner not exist."); 1611 return false; 1612 } 1613 if (seqRunner->currentTaskId_ != lastTask->taskId_) { 1614 HILOG_ERROR("seqRunner:: only front task can trigger seqRunner."); 1615 return false; 1616 } 1617 { 1618 std::unique_lock<std::shared_mutex> lock(seqRunner->seqRunnerMutex_); 1619 if (seqRunner->seqRunnerTasks_.empty()) { 1620 HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty.", std::to_string(seqRunnerId).c_str()); 1621 seqRunner->currentTaskId_ = 0; 1622 return true; 1623 } 1624 Task* task = seqRunner->seqRunnerTasks_.front(); 1625 seqRunner->seqRunnerTasks_.pop(); 1626 while (task->taskState_ == ExecuteState::CANCELED) { 1627 DisposeCanceledTask(env, task); 1628 if (seqRunner->seqRunnerTasks_.empty()) { 1629 HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.", 1630 std::to_string(seqRunnerId).c_str()); 1631 seqRunner->currentTaskId_ = 0; 1632 return true; 1633 } 1634 task = seqRunner->seqRunnerTasks_.front(); 1635 seqRunner->seqRunnerTasks_.pop(); 1636 } 1637 seqRunner->currentTaskId_ = task->taskId_; 1638 task->IncreaseRefCount(); 1639 task->taskState_ = ExecuteState::WAITING; 1640 HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.", 1641 std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str()); 1642 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_); 1643 } 1644 return true; 1645} 1646 1647void TaskGroupManager::DisposeCanceledTask(napi_env env, Task* task) 1648{ 1649 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: sequenceRunner task has been canceled"); 1650 napi_reject_deferred(env, task->currentTaskInfo_->deferred, error); 1651 reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter(); 1652 napi_reference_unref(env, task->taskRef_, nullptr); 1653 delete task->currentTaskInfo_; 1654 task->currentTaskInfo_ = nullptr; 1655} 1656 1657void TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup) 1658{ 1659 std::lock_guard<std::mutex> lock(taskGroupsMutex_); 1660 taskGroups_.emplace(groupId, taskGroup); 1661} 1662 1663void TaskGroupManager::RemoveTaskGroup(uint64_t groupId) 1664{ 1665 std::lock_guard<std::mutex> lock(taskGroupsMutex_); 1666 taskGroups_.erase(groupId); 1667} 1668 1669TaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId) 1670{ 1671 std::lock_guard<std::mutex> lock(taskGroupsMutex_); 1672 auto groupIter = taskGroups_.find(groupId); 1673 if (groupIter == taskGroups_.end()) { 1674 return nullptr; 1675 } 1676 return reinterpret_cast<TaskGroup*>(groupIter->second); 1677} 1678 1679bool TaskGroupManager::UpdateGroupState(uint64_t groupId) 1680{ 1681 HILOG_DEBUG("taskpool:: UpdateGroupState groupId:%{public}s", std::to_string(groupId).c_str()); 1682 // During the modification process of the group, prevent other sub threads from performing other 1683 // operations on the group pointer, which may cause the modification to fail. 1684 std::lock_guard<std::mutex> lock(taskGroupsMutex_); 1685 auto groupIter = taskGroups_.find(groupId); 1686 if (groupIter == taskGroups_.end()) { 1687 return false; 1688 } 1689 TaskGroup* group = reinterpret_cast<TaskGroup*>(groupIter->second); 1690 if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) { 1691 HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled"); 1692 return false; 1693 } 1694 group->groupState_ = ExecuteState::RUNNING; 1695 return true; 1696} 1697 1698// ----------------------------------- SequenceRunnerManager ---------------------------------------- 1699SequenceRunnerManager& SequenceRunnerManager::GetInstance() 1700{ 1701 static SequenceRunnerManager sequenceRunnerManager; 1702 return sequenceRunnerManager; 1703} 1704 1705SequenceRunner* SequenceRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc, 1706 const std::string &name, uint32_t priority) 1707{ 1708 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_); 1709 SequenceRunner *seqRunner = nullptr; 1710 auto iter = globalSeqRunner_.find(name); 1711 if (iter == globalSeqRunner_.end()) { 1712 seqRunner = new SequenceRunner(); 1713 // refresh priority default values on first creation 1714 if (argc == 2) { // 2: The number of parameters is 2. 1715 seqRunner->priority_ = static_cast<Priority>(priority); 1716 } 1717 seqRunner->isGlobalRunner_ = true; 1718 seqRunner->seqName_ = name; 1719 globalSeqRunner_.emplace(name, seqRunner); 1720 } else { 1721 seqRunner = iter->second; 1722 if (priority != seqRunner->priority_) { 1723 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: priority can not changed."); 1724 return nullptr; 1725 } 1726 } 1727 seqRunner->count_++; 1728 auto tmpIter = seqRunner->globalSeqRunnerRef_.find(env); 1729 if (tmpIter == seqRunner->globalSeqRunnerRef_.end()) { 1730 napi_ref gloableSeqRunnerRef = nullptr; 1731 napi_create_reference(env, thisVar, 0, &gloableSeqRunnerRef); 1732 seqRunner->globalSeqRunnerRef_.emplace(env, gloableSeqRunnerRef); 1733 } 1734 1735 return seqRunner; 1736} 1737 1738bool SequenceRunnerManager::TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner) 1739{ 1740 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_); 1741 if (seqRunner->isGlobalRunner_) { 1742 auto iter = seqRunner->globalSeqRunnerRef_.find(env); 1743 if (iter == seqRunner->globalSeqRunnerRef_.end()) { 1744 return false; 1745 } 1746 napi_reference_unref(env, iter->second, nullptr); 1747 } else { 1748 napi_reference_unref(env, seqRunner->seqRunnerRef_, nullptr); 1749 } 1750 return true; 1751} 1752 1753uint64_t SequenceRunnerManager::DecreaseSeqCount(SequenceRunner* seqRunner) 1754{ 1755 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_); 1756 return --(seqRunner->count_); 1757} 1758 1759void SequenceRunnerManager::RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner) 1760{ 1761 std::lock_guard<std::mutex> lock(globalSeqRunnerMutex_); 1762 auto iter = seqRunner->globalSeqRunnerRef_.find(env); 1763 if (iter != seqRunner->globalSeqRunnerRef_.end()) { 1764 napi_delete_reference(env, iter->second); 1765 seqRunner->globalSeqRunnerRef_.erase(iter); 1766 } 1767} 1768 1769void SequenceRunnerManager::RemoveSequenceRunner(const std::string &name) 1770{ 1771 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_); 1772 auto iter = globalSeqRunner_.find(name.c_str()); 1773 if (iter != globalSeqRunner_.end()) { 1774 globalSeqRunner_.erase(iter->first); 1775 } 1776} 1777 1778void SequenceRunnerManager::GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner) 1779{ 1780 RemoveGlobalSeqRunnerRef(env, seqRunner); 1781 if (DecreaseSeqCount(seqRunner) == 0) { 1782 RemoveSequenceRunner(seqRunner->seqName_); 1783 TaskGroupManager::GetInstance().RemoveSequenceRunner(seqRunner->seqRunnerId_); 1784 delete seqRunner; 1785 } 1786} 1787} // namespace Commonlibrary::Concurrent::TaskPoolModule