1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "worker.h"
17 
18 #if defined(ENABLE_TASKPOOL_FFRT)
19 #include "c/executor_task.h"
20 #include "ffrt_inner.h"
21 #endif
22 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
23 #include "helper/hitrace_helper.h"
24 #include "process_helper.h"
25 #include "task_group.h"
26 #include "task_manager.h"
27 #include "taskpool.h"
28 #include "tools/log.h"
29 
30 namespace Commonlibrary::Concurrent::TaskPoolModule {
31 using namespace OHOS::JsSysModule;
32 using namespace Commonlibrary::Platform;
33 
PriorityScope(Worker* worker, Priority taskPriority)34 Worker::PriorityScope::PriorityScope(Worker* worker, Priority taskPriority) : worker_(worker)
35 {
36     if (taskPriority != worker->priority_) {
37         HILOG_DEBUG("taskpool:: reset worker priority to match task priority");
38         if (TaskManager::GetInstance().EnableFfrt()) {
39 #if defined(ENABLE_TASKPOOL_FFRT)
40             if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(taskPriority)) != 0) {
41                 SetWorkerPriority(taskPriority);
42             }
43 #endif
44         } else {
45             SetWorkerPriority(taskPriority);
46         }
47         worker->priority_ = taskPriority;
48     }
49 }
50 
~RunningScope()51 Worker::RunningScope::~RunningScope()
52 {
53     HILOG_DEBUG("taskpool:: RunningScope destruction");
54     if (scope_ != nullptr) {
55         napi_close_handle_scope(worker_->workerEnv_, scope_);
56     }
57     worker_->NotifyIdle();
58     worker_->idleState_ = true;
59 }
60 
WorkerConstructor(napi_env env)61 Worker* Worker::WorkerConstructor(napi_env env)
62 {
63     HITRACE_HELPER_METER_NAME("TaskWorkerConstructor: [Add Thread]");
64     Worker* worker = new Worker(env);
65     worker->StartExecuteInThread();
66     return worker;
67 }
68 
CloseHandles()69 void Worker::CloseHandles()
70 {
71     // set all handles to nullptr so that they can not be used even when the loop is re-running
72     ConcurrentHelper::UvHandleClose(performTaskSignal_);
73     performTaskSignal_ = nullptr;
74 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
75     ConcurrentHelper::UvHandleClose(debuggerOnPostTaskSignal_);
76     debuggerOnPostTaskSignal_ = nullptr;
77 #endif
78     ConcurrentHelper::UvHandleClose(clearWorkerSignal_);
79     clearWorkerSignal_ = nullptr;
80     ConcurrentHelper::UvHandleClose(triggerGCCheckSignal_);
81     triggerGCCheckSignal_ = nullptr;
82 }
83 
ReleaseWorkerHandles(const uv_async_t* req)84 void Worker::ReleaseWorkerHandles(const uv_async_t* req)
85 {
86     auto worker = static_cast<Worker*>(req->data);
87     HILOG_DEBUG("taskpool:: enter the worker loop and try to release thread: %{public}d", worker->tid_);
88     if (!worker->CheckFreeConditions()) {
89         return;
90     }
91 
92     TaskManager::GetInstance().RemoveWorker(worker);
93     HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]");
94     HILOG_INFO("taskpool:: the thread is idle and will be released, and the total num is %{public}u now",
95         TaskManager::GetInstance().GetThreadNum());
96     // when there is no active handle, worker loop will stop automatically.
97     worker->CloseHandles();
98 
99     uv_loop_t* loop = worker->GetWorkerLoop();
100     if (loop != nullptr) {
101         uv_stop(loop);
102     }
103 }
104 
CheckFreeConditions()105 bool Worker::CheckFreeConditions()
106 {
107     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
108     // only when all conditions are met can the worker be freed
109     if (HasRunningTasks()) {
110         HILOG_DEBUG("taskpool:: async callbacks may exist, the worker thread will not exit");
111     } else if (workerEngine->HasListeningCounter()) {
112         HILOG_DEBUG("taskpool:: listening operation exists, the worker thread will not exit");
113     } else if (Timer::HasTimer(workerEnv_)) {
114         HILOG_DEBUG("taskpool:: timer exists, the worker thread will not exit");
115     } else if (workerEngine->HasWaitingRequest()) {
116         HILOG_DEBUG("taskpool:: waiting request exists, the worker thread will not exit");
117     } else if (workerEngine->HasSubEnv()) {
118         HILOG_DEBUG("taskpool:: sub env exists, the worker thread will not exit");
119     } else if (workerEngine->HasPendingJob()) {
120         HILOG_DEBUG("taskpool:: pending job exists, the worker thread will not exit");
121     } else if (workerEngine->IsProfiling()) {
122         HILOG_DEBUG("taskpool:: the worker thread will not exit during profiling");
123     } else {
124         return true;
125     }
126     HILOG_DEBUG("taskpool:: the worker %{public}d can't be released due to not meeting the conditions", tid_);
127     TaskManager& taskManager = TaskManager::GetInstance();
128     taskManager.RestoreWorker(this);
129     taskManager.CountTraceForWorker();
130     return false;
131 }
132 
StartExecuteInThread()133 void Worker::StartExecuteInThread()
134 {
135     if (!runner_) {
136         runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
137     }
138     if (runner_) {
139         runner_->Execute(); // start a new thread
140     } else {
141         HILOG_ERROR("taskpool:: runner_ is nullptr");
142     }
143 }
144 
145 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t* req)146 void Worker::HandleDebuggerTask(const uv_async_t* req)
147 {
148     Worker* worker = reinterpret_cast<Worker*>(req->data);
149     if (worker == nullptr) {
150         HILOG_ERROR("taskpool:: worker is null");
151         return;
152     }
153     worker->debuggerMutex_.lock();
154     auto task = std::move(worker->debuggerQueue_.front());
155     worker->debuggerQueue_.pop();
156     worker->debuggerMutex_.unlock();
157     task();
158 }
159 
DebuggerOnPostTask(std::function<void()>&& task)160 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
161 {
162     if (debuggerOnPostTaskSignal_ != nullptr && !uv_is_closing(
163         reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_))) {
164         std::lock_guard<std::mutex> lock(debuggerMutex_);
165         debuggerQueue_.push(std::move(task));
166         uv_async_send(debuggerOnPostTaskSignal_);
167     }
168 }
169 #endif
170 
171 #if defined(ENABLE_TASKPOOL_FFRT)
InitFfrtInfo()172 void Worker::InitFfrtInfo()
173 {
174     if (TaskManager::GetInstance().EnableFfrt()) {
175         static const std::map<int, Priority> FFRTQOS_WORKERPRIORITY_MAP = {
176             {ffrt::qos_background, Priority::IDLE},
177             {ffrt::qos_utility, Priority::LOW},
178             {ffrt::qos_default, Priority::DEFAULT},
179             {ffrt::qos_user_initiated, Priority::HIGH},
180         };
181         ffrt_qos_t qos = ffrt_this_task_get_qos();
182         priority_ = FFRTQOS_WORKERPRIORITY_MAP.at(qos);
183         ffrtTaskHandle_ = ffrt_get_cur_task();
184     }
185 }
186 
InitLoopHandleNum()187 void Worker::InitLoopHandleNum()
188 {
189     if (ffrtTaskHandle_ == nullptr) {
190         return;
191     }
192 
193     uv_loop_t* loop = GetWorkerLoop();
194     if (loop != nullptr) {
195         initActiveHandleNum_ = loop->active_handles;
196     } else {
197         HILOG_ERROR("taskpool:: worker loop is nullptr when init loop handle num.");
198     }
199 }
200 
IsLoopActive()201 bool Worker::IsLoopActive()
202 {
203     uv_loop_t* loop = GetWorkerLoop();
204     if (loop != nullptr) {
205         return uv_loop_alive_taskpool(loop, initActiveHandleNum_);
206     } else {
207         HILOG_ERROR("taskpool:: worker loop is nullptr when judge loop alive.");
208         return false;
209     }
210 }
211 
GetWaitTime()212 uint64_t Worker::GetWaitTime()
213 {
214     return ffrt_epoll_get_wait_time(ffrtTaskHandle_);
215 }
216 #endif
217 
ExecuteInThread(const void* data)218 void Worker::ExecuteInThread(const void* data)
219 {
220     HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__);
221     auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
222     {
223         napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
224         if (worker->workerEnv_ == nullptr) {
225             HILOG_ERROR("taskpool:: worker create runtime failed");
226             return;
227         }
228         auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
229         // mark worker env is taskpoolThread
230         workerEngine->MarkTaskPoolThread();
231         workerEngine->InitTaskPoolThread(worker->workerEnv_, Worker::TaskResultCallback);
232     }
233     uv_loop_t* loop = worker->GetWorkerLoop();
234     if (loop == nullptr) {
235         HILOG_ERROR("taskpool:: loop is nullptr");
236         return;
237     }
238     // save the worker tid
239     worker->tid_ = GetThreadId();
240 
241     // Init worker task execute signal
242     ConcurrentHelper::UvHandleInit(loop, worker->performTaskSignal_, Worker::PerformTask, worker);
243     ConcurrentHelper::UvHandleInit(loop, worker->clearWorkerSignal_, Worker::ReleaseWorkerHandles, worker);
244     ConcurrentHelper::UvHandleInit(loop, worker->triggerGCCheckSignal_, Worker::TriggerGCCheck, worker);
245 
246     HITRACE_HELPER_FINISH_TRACE;
247 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
248     // Init debugger task post signal
249     ConcurrentHelper::UvHandleInit(loop, worker->debuggerOnPostTaskSignal_, Worker::HandleDebuggerTask, worker);
250 #endif
251     if (worker->PrepareForWorkerInstance()) {
252         // Call after uv_async_init
253         worker->NotifyWorkerCreated();
254 #if defined(ENABLE_TASKPOOL_FFRT)
255         worker->InitFfrtInfo();
256         worker->InitLoopHandleNum();
257 #endif
258         worker->RunLoop();
259     } else {
260         HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail");
261     }
262     TaskManager::GetInstance().RemoveWorker(worker);
263     TaskManager::GetInstance().CountTraceForWorker();
264     worker->ReleaseWorkerThreadContent();
265     delete worker;
266     worker = nullptr;
267 }
268 
PrepareForWorkerInstance()269 bool Worker::PrepareForWorkerInstance()
270 {
271     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
272     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
273 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
274     workerEngine->SetDebuggerPostTaskFunc([this](std::function<void()>&& task) {
275         this->DebuggerOnPostTask(std::move(task));
276     });
277 #endif
278     if (!workerEngine->CallInitWorkerFunc(workerEngine)) {
279         HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail");
280         return false;
281     }
282     // register timer interface
283     Timer::RegisterTime(workerEnv_);
284 
285     // Check exception after worker construction
286     if (NapiHelper::IsExceptionPending(workerEnv_)) {
287         HILOG_ERROR("taskpool:: Worker construction occur exception");
288         return false;
289     }
290     return true;
291 }
292 
ReleaseWorkerThreadContent()293 void Worker::ReleaseWorkerThreadContent()
294 {
295     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
296     auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_);
297     if (workerEngine == nullptr) {
298         HILOG_ERROR("taskpool:: workerEngine is nullptr");
299         return;
300     }
301     if (hostEngine != nullptr) {
302         if (!hostEngine->DeleteWorker(workerEngine)) {
303             HILOG_ERROR("taskpool:: DeleteWorker fail");
304         }
305     }
306     if (state_ == WorkerState::BLOCKED) {
307         HITRACE_HELPER_METER_NAME("Thread Timeout Exit");
308     } else {
309         HITRACE_HELPER_METER_NAME("Thread Exit");
310     }
311 
312     Timer::ClearEnvironmentTimer(workerEnv_);
313     // 2. delete NativeEngine created in worker thread
314     if (!workerEngine->CallOffWorkerFunc(workerEngine)) {
315         HILOG_ERROR("worker:: CallOffWorkerFunc error");
316     }
317     delete workerEngine;
318     workerEnv_ = nullptr;
319 }
320 
NotifyExecuteTask()321 void Worker::NotifyExecuteTask()
322 {
323     if (LIKELY(performTaskSignal_ != nullptr && !uv_is_closing(reinterpret_cast<uv_handle_t*>(performTaskSignal_)))) {
324         uv_async_send(performTaskSignal_);
325     }
326 }
327 
NotifyIdle()328 void Worker::NotifyIdle()
329 {
330     TaskManager::GetInstance().NotifyWorkerIdle(this);
331 }
332 
NotifyWorkerCreated()333 void Worker::NotifyWorkerCreated()
334 {
335     TaskManager::GetInstance().NotifyWorkerCreated(this);
336 }
337 
NotifyTaskBegin()338 void Worker::NotifyTaskBegin()
339 {
340     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
341     workerEngine->NotifyTaskBegin();
342 }
343 
TriggerGCCheck(const uv_async_t* req)344 void Worker::TriggerGCCheck(const uv_async_t* req)
345 {
346     if (req == nullptr || req->data == nullptr) {
347         HILOG_ERROR("taskpool:: req handle is invalid");
348         return;
349     }
350     auto worker = reinterpret_cast<Worker*>(req->data);
351     auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
352     workerEngine->NotifyTaskFinished();
353 }
354 
NotifyTaskFinished()355 void Worker::NotifyTaskFinished()
356 {
357     // trigger gc check by uv and return immediately if the handle is invalid
358     if (UNLIKELY(triggerGCCheckSignal_ == nullptr || uv_is_closing(
359         reinterpret_cast<uv_handle_t*>(triggerGCCheckSignal_)))) {
360         HILOG_ERROR("taskpool:: triggerGCCheckSignal_ is nullptr or closed");
361         return;
362     } else {
363         uv_async_send(triggerGCCheckSignal_);
364     }
365 
366     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
367     if (--runningCount_ != 0 || workerEngine->HasPendingJob()) {
368         // the worker state is still RUNNING and the start time will be updated
369         startTime_ = ConcurrentHelper::GetMilliseconds();
370     } else {
371         UpdateWorkerState(WorkerState::RUNNING, WorkerState::IDLE);
372     }
373     idlePoint_ = ConcurrentHelper::GetMilliseconds();
374 }
375 
UpdateWorkerState(WorkerState expect, WorkerState desired)376 bool Worker::UpdateWorkerState(WorkerState expect, WorkerState desired)
377 {
378     return state_.compare_exchange_strong(expect, desired);
379 }
380 
PerformTask(const uv_async_t* req)381 void Worker::PerformTask(const uv_async_t* req)
382 {
383     uint64_t startTime = ConcurrentHelper::GetMilliseconds();
384     auto worker = static_cast<Worker*>(req->data);
385     napi_env env = worker->workerEnv_;
386     TaskManager::GetInstance().NotifyWorkerRunning(worker);
387     auto taskInfo = TaskManager::GetInstance().DequeueTaskId();
388     if (taskInfo.first == 0) {
389         worker->NotifyIdle();
390         return;
391     }
392     RunningScope runningScope(worker);
393     PriorityScope priorityScope(worker, taskInfo.second);
394     Task* task = TaskManager::GetInstance().GetTask(taskInfo.first);
395     if (task == nullptr) {
396         HILOG_DEBUG("taskpool:: task has been released");
397         return;
398     } else if (!task->IsValid() && task->ShouldDeleteTask(false)) {
399         HILOG_WARN("taskpool:: task is invalid");
400         delete task;
401         return;
402     }
403     // try to record the memory data for gc
404     worker->NotifyTaskBegin();
405 
406     if (!task->UpdateTask(startTime, worker)) {
407         worker->NotifyTaskFinished();
408         return;
409     }
410     if (task->IsGroupTask() && (!TaskGroupManager::GetInstance().UpdateGroupState(task->groupId_))) {
411         return;
412     }
413     if (task->IsLongTask()) {
414         worker->UpdateLongTaskInfo(task);
415     }
416     worker->StoreTaskId(task->taskId_);
417     // tag for trace parse: Task Perform
418     std::string strTrace = "Task Perform: name : "  + task->name_ + ", taskId : " + std::to_string(task->taskId_)
419                             + ", priority : " + std::to_string(taskInfo.second);
420     HITRACE_HELPER_METER_NAME(strTrace);
421     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
422 
423     napi_value func = nullptr;
424     napi_value args = nullptr;
425     napi_value errorInfo = task->DeserializeValue(env, &func, &args);
426     if (UNLIKELY(func == nullptr || args == nullptr)) {
427         if (errorInfo != nullptr) {
428             worker->NotifyTaskResult(env, task, errorInfo);
429         }
430         return;
431     }
432     if (!worker->InitTaskPoolFunc(env, func, task)) {
433         return;
434     }
435     worker->hasExecuted_ = true;
436     uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
437     napi_value argsArray[argsNum];
438     for (size_t i = 0; i < argsNum; i++) {
439         argsArray[i] = NapiHelper::GetElement(env, args, i);
440     }
441 
442     if (!task->CheckStartExecution(taskInfo.second)) {
443         if (task->ShouldDeleteTask()) {
444             delete task;
445         }
446         return;
447     }
448     napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, nullptr);
449     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
450     workerEngine->ClearCurrentTaskInfo();
451     task->DecreaseRefCount();
452     task->StoreTaskDuration();
453     worker->UpdateExecutedInfo();
454     HandleFunctionException(env, task);
455 }
456 
NotifyTaskResult(napi_env env, Task* task, napi_value result)457 void Worker::NotifyTaskResult(napi_env env, Task* task, napi_value result)
458 {
459     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
460     HILOG_DEBUG("taskpool:: NotifyTaskResult task:%{public}s", std::to_string(task->taskId_).c_str());
461     void* resultData = nullptr;
462     napi_value undefined = NapiHelper::GetUndefinedValue(env);
463     bool defaultTransfer = true;
464     bool defaultCloneSendable = false;
465     napi_status status = napi_serialize_inner(env, result, undefined, undefined,
466                                               defaultTransfer, defaultCloneSendable, &resultData);
467     if ((status != napi_ok || resultData == nullptr) && task->success_) {
468         task->success_ = false;
469         std::string errMessage = "taskpool: failed to serialize result.";
470         HILOG_ERROR("%{public}s", errMessage.c_str());
471         napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
472         NotifyTaskResult(env, task, err);
473         return;
474     }
475     task->result_ = resultData;
476     NotifyHandleTaskResult(task);
477 }
478 
NotifyHandleTaskResult(Task* task)479 void Worker::NotifyHandleTaskResult(Task* task)
480 {
481     if (!task->IsReadyToHandle()) {
482         return;
483     }
484     Worker* worker = reinterpret_cast<Worker*>(task->worker_);
485     if (worker != nullptr) {
486         std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
487         auto iter = std::find(worker->currentTaskId_.begin(), worker->currentTaskId_.end(), task->taskId_);
488         if (iter != worker->currentTaskId_.end()) {
489             worker->currentTaskId_.erase(iter);
490         }
491     } else {
492         HILOG_FATAL("taskpool:: worker is nullptr");
493         return;
494     }
495     if (!task->VerifyAndPostResult(worker->priority_)) {
496         if (task->ShouldDeleteTask()) {
497             delete task;
498         }
499     }
500     worker->NotifyTaskFinished();
501 }
502 
TaskResultCallback(napi_env env, napi_value result, bool success, void* data)503 void Worker::TaskResultCallback(napi_env env, napi_value result, bool success, void* data)
504 {
505     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
506     if (env == nullptr) { // LCOV_EXCL_BR_LINE
507         HILOG_FATAL("taskpool:: TaskResultCallback engine is null");
508         return;
509     }
510     if (data == nullptr) { // LCOV_EXCL_BR_LINE
511         HILOG_FATAL("taskpool:: data is nullptr");
512         return;
513     }
514     Task* task = static_cast<Task*>(data);
515     auto taskId = reinterpret_cast<uint64_t>(task);
516     if (TaskManager::GetInstance().GetTask(taskId) == nullptr) {
517         HILOG_FATAL("taskpool:: task is nullptr");
518         return;
519     }
520     auto worker = static_cast<Worker*>(task->worker_);
521     worker->isExecutingLongTask_ = task->IsLongTask();
522     task->DecreaseRefCount();
523     task->ioTime_ = ConcurrentHelper::GetMilliseconds();
524     if (task->cpuTime_ != 0) {
525         uint64_t ioDuration = task->ioTime_ - task->startTime_;
526         uint64_t cpuDuration = task->cpuTime_ - task->startTime_;
527         TaskManager::GetInstance().StoreTaskDuration(task->taskId_, std::max(ioDuration, cpuDuration), cpuDuration);
528     }
529     task->success_ = success;
530     NotifyTaskResult(env, task, result);
531 }
532 
533 // reset qos_user_initiated after perform task
ResetWorkerPriority()534 void Worker::ResetWorkerPriority()
535 {
536     if (priority_ != Priority::HIGH) {
537         if (TaskManager::GetInstance().EnableFfrt()) {
538 #if defined(ENABLE_TASKPOOL_FFRT)
539             if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(Priority::HIGH)) != 0) {
540                 SetWorkerPriority(Priority::HIGH);
541             }
542 #endif
543         } else {
544             SetWorkerPriority(Priority::HIGH);
545         }
546         priority_ = Priority::HIGH;
547     }
548 }
549 
StoreTaskId(uint64_t taskId)550 void Worker::StoreTaskId(uint64_t taskId)
551 {
552     std::lock_guard<std::mutex> lock(currentTaskIdMutex_);
553     currentTaskId_.emplace_back(taskId);
554 }
555 
InitTaskPoolFunc(napi_env env, napi_value func, Task* task)556 bool Worker::InitTaskPoolFunc(napi_env env, napi_value func, Task* task)
557 {
558     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
559     bool success = workerEngine->InitTaskPoolFunc(env, func, task);
560     napi_value exception;
561     napi_get_and_clear_last_exception(env, &exception);
562     if (exception != nullptr) {
563         HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception");
564         task->success_ = false;
565         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
566         NotifyTaskResult(env, task, errorEvent);
567         return false;
568     }
569     if (!success) {
570         HILOG_ERROR("taskpool:: InitTaskPoolFunc fail");
571         napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR,
572                                                "taskpool:: function may not be concurrent.");
573         task->success_ = false;
574         NotifyTaskResult(env, task, err);
575         return false;
576     }
577     return true;
578 }
579 
UpdateExecutedInfo()580 void Worker::UpdateExecutedInfo()
581 {
582     // if the worker is blocked, just skip
583     if (LIKELY(state_ != WorkerState::BLOCKED)) {
584         uint64_t duration = ConcurrentHelper::GetMilliseconds() - startTime_;
585         TaskManager::GetInstance().UpdateExecutedInfo(duration);
586     }
587 }
588 
589 // Only when the worker has no longTask can it be released.
TerminateTask(uint64_t taskId)590 void Worker::TerminateTask(uint64_t taskId)
591 {
592     HILOG_DEBUG("taskpool:: TerminateTask task:%{public}s", std::to_string(taskId).c_str());
593     std::lock_guard<std::mutex> lock(longMutex_);
594     longTasksSet_.erase(taskId);
595     if (longTasksSet_.empty()) {
596         hasLongTask_ = false;
597     }
598 }
599 
600 // to store longTasks' state
UpdateLongTaskInfo(Task* task)601 void Worker::UpdateLongTaskInfo(Task* task)
602 {
603     HILOG_DEBUG("taskpool:: UpdateLongTaskInfo task:%{public}s", std::to_string(task->taskId_).c_str());
604     TaskManager::GetInstance().StoreLongTaskInfo(task->taskId_, this);
605     std::lock_guard<std::mutex> lock(longMutex_);
606     hasLongTask_ = true;
607     isExecutingLongTask_ = true;
608     longTasksSet_.emplace(task->taskId_);
609 }
610 
IsExecutingLongTask()611 bool Worker::IsExecutingLongTask()
612 {
613     return isExecutingLongTask_;
614 }
615 
HasLongTask()616 bool Worker::HasLongTask()
617 {
618     return hasLongTask_;
619 }
620 
HandleFunctionException(napi_env env, Task* task)621 void Worker::HandleFunctionException(napi_env env, Task* task)
622 {
623     napi_value exception;
624     napi_get_and_clear_last_exception(env, &exception);
625     if (exception != nullptr) {
626         HILOG_ERROR("taskpool::PerformTask occur exception");
627         task->DecreaseRefCount();
628         task->success_ = false;
629         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
630         NotifyTaskResult(env, task, errorEvent);
631         return;
632     }
633     NotifyHandleTaskResult(task);
634 }
635 
PostReleaseSignal()636 void Worker::PostReleaseSignal()
637 {
638     if (UNLIKELY(clearWorkerSignal_ == nullptr || uv_is_closing(
639         reinterpret_cast<uv_handle_t*>(clearWorkerSignal_)))) {
640         HILOG_ERROR("taskpool:: clearWorkerSignal_ is nullptr or closed");
641         return;
642     }
643     uv_async_send(clearWorkerSignal_);
644 }
645 } // namespace Commonlibrary::Concurrent::TaskPoolModule