Lines Matching refs:worker
39 #include "worker.h"
98 for (auto& worker : workers_) {
99 delete worker;
145 for (auto& worker : workers_) {
146 if (worker->workerEnv_ == nullptr) {
149 napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->tid_));
150 napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->priority_));
156 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
157 for (auto& currentId : worker->currentTaskId_) {
239 // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
246 auto worker = *iter;
247 // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
248 // if the worker is executing the longTask, we will not do the check
249 if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) ||
250 (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
251 !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
254 // When executing the promise task, the worker state may not be updated and will be
256 // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
258 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
259 if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
261 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
263 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
264 worker->startTime_ = ConcurrentHelper::GetMilliseconds();
269 HILOG_INFO("taskpool:: The worker has been marked as timeout.");
270 // If the current worker has a longTask and is not executing, we will only interrupt it.
271 if (worker->HasLongTask()) {
275 idleWorkers_.erase(worker);
276 timeoutWorkers_.insert(worker);
329 for (auto& worker : idleWorkers_) {
331 if (worker->ffrtTaskHandle_ != nullptr) {
332 if (worker->GetWaitTime() > 0) {
338 tids.emplace(worker->tid_);
361 for (auto& worker : idleWorkers_) {
363 if (worker->ffrtTaskHandle_ != nullptr) {
364 uint64_t workerWaitTime = worker->GetWaitTime();
365 bool isWorkerLoopActive = worker->IsLoopActive();
372 freeList_.emplace_back(worker);
374 freeList_.emplace_back(worker);
375 HILOG_INFO("taskpool:: worker in ffrt epoll wait more than 2 intervals, force to free.");
377 // worker uv alive, and will be free in 2 intervals if not wake
378 HILOG_INFO("taskpool:: worker will be free if not wake.");
383 if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) {
390 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
393 if (state != 'S' || utime != worker->lastCpuTime_) {
394 worker->idleCount_ = 0;
395 worker->lastCpuTime_ = utime;
398 if (++worker->idleCount_ >= IDLE_THRESHOLD) {
399 freeList_.emplace_back(worker);
410 auto worker = freeList_[i];
411 if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) {
414 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
415 if (idleTime < MAX_IDLE_TIME || worker->HasRunningTasks()) {
418 idleWorkers_.erase(worker);
419 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
420 worker->PostReleaseSignal();
437 // try to free the worker that idle time meets the requirement
438 auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker *worker) {
439 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
440 return idleTime > MAX_IDLE_TIME && !worker->HasRunningTasks() && !worker->HasLongTask();
444 auto worker = *iter;
445 idleWorkers_.erase(worker);
446 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
447 worker->PostReleaseSignal();
468 auto worker = *iter;
469 if (workers_.find(worker) == workers_.end()) {
470 HILOG_WARN("taskpool:: current worker maybe release");
472 } else if (!worker->HasRunningTasks()) {
473 HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", worker->tid_);
474 worker->PostReleaseSignal();
482 // System memory state is moderate and the worker has exeuted tasks, we will try to release it
484 auto worker = *(idleWorkers_.begin());
485 // worker that has longTask should not be released
486 if (worker == nullptr || worker->HasLongTask()) {
489 if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released
495 // Create a worker for performance
655 void TaskManager::NotifyWorkerIdle(Worker* worker)
659 if (worker->state_ == WorkerState::BLOCKED) {
662 idleWorkers_.insert(worker);
670 void TaskManager::NotifyWorkerCreated(Worker* worker)
672 NotifyWorkerIdle(worker);
675 void TaskManager::NotifyWorkerAdded(Worker* worker)
678 workers_.insert(worker);
679 HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
682 void TaskManager::NotifyWorkerRunning(Worker* worker)
685 idleWorkers_.erase(worker);
692 return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
693 return worker->HasRunningTasks();
797 for (auto& worker : workers_) {
798 if (worker->state_ == WorkerState::IDLE) {
799 // If worker->state_ is WorkerState::IDLE, it means that the worker is free
802 // If there is a worker running a task, do not take the idle task.
828 for (auto& worker : idleWorkers_) {
829 worker->NotifyExecuteTask();
863 // Create a timer to manage worker threads
873 auto worker = Worker::WorkerConstructor(env);
874 NotifyWorkerAdded(worker);
879 void TaskManager::RemoveWorker(Worker* worker)
882 idleWorkers_.erase(worker);
883 timeoutWorkers_.erase(worker);
884 workers_.erase(worker);
887 void TaskManager::RestoreWorker(Worker* worker)
894 if (worker->state_ == WorkerState::BLOCKED) {
895 // since the worker is blocked, we should add it to the timeout set
896 timeoutWorkers_.insert(worker);
899 // Since the worker may be executing some tasks in IO thread, we should add it to the
900 // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
901 HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
902 idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
973 Worker* worker = static_cast<Worker*>(task->worker_);
974 worker->Enqueue(task->env_, resultInfo);
977 callbackInfo->worker = worker;
986 TaskManager::GetInstance().PostTask(onCallbackTask, "TaskPoolOnCallbackTask", worker->priority_);
1002 if (info == nullptr || info->worker == nullptr) {
1003 HILOG_ERROR("taskpool:: info or worker is nullptr");
1006 auto worker = info->worker;
1008 worker->Dequeue(info->hostEnv, queue);
1015 if (callbackInfo == nullptr || callbackInfo->worker == nullptr) {
1016 HILOG_ERROR("taskpool:: callbackInfo or worker is nullptr");
1019 auto worker = callbackInfo->worker;
1021 worker->Dequeue(callbackInfo->hostEnv, queue);
1272 void TaskManager::StoreLongTaskInfo(uint64_t taskId, Worker* worker)
1275 longTasksMap_.emplace(taskId, worker);
1294 auto worker = GetLongTaskInfo(taskId);
1295 if (UNLIKELY(worker == nullptr)) {
1298 worker->TerminateTask(taskId);