1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "worker.h"
17
18 #if defined(ENABLE_TASKPOOL_FFRT)
19 #include "c/executor_task.h"
20 #include "ffrt_inner.h"
21 #endif
22 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
23 #include "helper/hitrace_helper.h"
24 #include "process_helper.h"
25 #include "task_group.h"
26 #include "task_manager.h"
27 #include "taskpool.h"
28 #include "tools/log.h"
29
30 namespace Commonlibrary::Concurrent::TaskPoolModule {
31 using namespace OHOS::JsSysModule;
32 using namespace Commonlibrary::Platform;
33
PriorityScope(Worker* worker, Priority taskPriority)34 Worker::PriorityScope::PriorityScope(Worker* worker, Priority taskPriority) : worker_(worker)
35 {
36 if (taskPriority != worker->priority_) {
37 HILOG_DEBUG("taskpool:: reset worker priority to match task priority");
38 if (TaskManager::GetInstance().EnableFfrt()) {
39 #if defined(ENABLE_TASKPOOL_FFRT)
40 if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(taskPriority)) != 0) {
41 SetWorkerPriority(taskPriority);
42 }
43 #endif
44 } else {
45 SetWorkerPriority(taskPriority);
46 }
47 worker->priority_ = taskPriority;
48 }
49 }
50
~RunningScope()51 Worker::RunningScope::~RunningScope()
52 {
53 HILOG_DEBUG("taskpool:: RunningScope destruction");
54 if (scope_ != nullptr) {
55 napi_close_handle_scope(worker_->workerEnv_, scope_);
56 }
57 worker_->NotifyIdle();
58 worker_->idleState_ = true;
59 }
60
WorkerConstructor(napi_env env)61 Worker* Worker::WorkerConstructor(napi_env env)
62 {
63 HITRACE_HELPER_METER_NAME("TaskWorkerConstructor: [Add Thread]");
64 Worker* worker = new Worker(env);
65 worker->StartExecuteInThread();
66 return worker;
67 }
68
CloseHandles()69 void Worker::CloseHandles()
70 {
71 // set all handles to nullptr so that they can not be used even when the loop is re-running
72 ConcurrentHelper::UvHandleClose(performTaskSignal_);
73 performTaskSignal_ = nullptr;
74 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
75 ConcurrentHelper::UvHandleClose(debuggerOnPostTaskSignal_);
76 debuggerOnPostTaskSignal_ = nullptr;
77 #endif
78 ConcurrentHelper::UvHandleClose(clearWorkerSignal_);
79 clearWorkerSignal_ = nullptr;
80 ConcurrentHelper::UvHandleClose(triggerGCCheckSignal_);
81 triggerGCCheckSignal_ = nullptr;
82 }
83
ReleaseWorkerHandles(const uv_async_t* req)84 void Worker::ReleaseWorkerHandles(const uv_async_t* req)
85 {
86 auto worker = static_cast<Worker*>(req->data);
87 HILOG_DEBUG("taskpool:: enter the worker loop and try to release thread: %{public}d", worker->tid_);
88 if (!worker->CheckFreeConditions()) {
89 return;
90 }
91
92 TaskManager::GetInstance().RemoveWorker(worker);
93 HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]");
94 HILOG_INFO("taskpool:: the thread is idle and will be released, and the total num is %{public}u now",
95 TaskManager::GetInstance().GetThreadNum());
96 // when there is no active handle, worker loop will stop automatically.
97 worker->CloseHandles();
98
99 uv_loop_t* loop = worker->GetWorkerLoop();
100 if (loop != nullptr) {
101 uv_stop(loop);
102 }
103 }
104
CheckFreeConditions()105 bool Worker::CheckFreeConditions()
106 {
107 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
108 // only when all conditions are met can the worker be freed
109 if (HasRunningTasks()) {
110 HILOG_DEBUG("taskpool:: async callbacks may exist, the worker thread will not exit");
111 } else if (workerEngine->HasListeningCounter()) {
112 HILOG_DEBUG("taskpool:: listening operation exists, the worker thread will not exit");
113 } else if (Timer::HasTimer(workerEnv_)) {
114 HILOG_DEBUG("taskpool:: timer exists, the worker thread will not exit");
115 } else if (workerEngine->HasWaitingRequest()) {
116 HILOG_DEBUG("taskpool:: waiting request exists, the worker thread will not exit");
117 } else if (workerEngine->HasSubEnv()) {
118 HILOG_DEBUG("taskpool:: sub env exists, the worker thread will not exit");
119 } else if (workerEngine->HasPendingJob()) {
120 HILOG_DEBUG("taskpool:: pending job exists, the worker thread will not exit");
121 } else if (workerEngine->IsProfiling()) {
122 HILOG_DEBUG("taskpool:: the worker thread will not exit during profiling");
123 } else {
124 return true;
125 }
126 HILOG_DEBUG("taskpool:: the worker %{public}d can't be released due to not meeting the conditions", tid_);
127 TaskManager& taskManager = TaskManager::GetInstance();
128 taskManager.RestoreWorker(this);
129 taskManager.CountTraceForWorker();
130 return false;
131 }
132
StartExecuteInThread()133 void Worker::StartExecuteInThread()
134 {
135 if (!runner_) {
136 runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
137 }
138 if (runner_) {
139 runner_->Execute(); // start a new thread
140 } else {
141 HILOG_ERROR("taskpool:: runner_ is nullptr");
142 }
143 }
144
145 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t* req)146 void Worker::HandleDebuggerTask(const uv_async_t* req)
147 {
148 Worker* worker = reinterpret_cast<Worker*>(req->data);
149 if (worker == nullptr) {
150 HILOG_ERROR("taskpool:: worker is null");
151 return;
152 }
153 worker->debuggerMutex_.lock();
154 auto task = std::move(worker->debuggerQueue_.front());
155 worker->debuggerQueue_.pop();
156 worker->debuggerMutex_.unlock();
157 task();
158 }
159
DebuggerOnPostTask(std::function<void()>&& task)160 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
161 {
162 if (debuggerOnPostTaskSignal_ != nullptr && !uv_is_closing(
163 reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_))) {
164 std::lock_guard<std::mutex> lock(debuggerMutex_);
165 debuggerQueue_.push(std::move(task));
166 uv_async_send(debuggerOnPostTaskSignal_);
167 }
168 }
169 #endif
170
171 #if defined(ENABLE_TASKPOOL_FFRT)
InitFfrtInfo()172 void Worker::InitFfrtInfo()
173 {
174 if (TaskManager::GetInstance().EnableFfrt()) {
175 static const std::map<int, Priority> FFRTQOS_WORKERPRIORITY_MAP = {
176 {ffrt::qos_background, Priority::IDLE},
177 {ffrt::qos_utility, Priority::LOW},
178 {ffrt::qos_default, Priority::DEFAULT},
179 {ffrt::qos_user_initiated, Priority::HIGH},
180 };
181 ffrt_qos_t qos = ffrt_this_task_get_qos();
182 priority_ = FFRTQOS_WORKERPRIORITY_MAP.at(qos);
183 ffrtTaskHandle_ = ffrt_get_cur_task();
184 }
185 }
186
InitLoopHandleNum()187 void Worker::InitLoopHandleNum()
188 {
189 if (ffrtTaskHandle_ == nullptr) {
190 return;
191 }
192
193 uv_loop_t* loop = GetWorkerLoop();
194 if (loop != nullptr) {
195 initActiveHandleNum_ = loop->active_handles;
196 } else {
197 HILOG_ERROR("taskpool:: worker loop is nullptr when init loop handle num.");
198 }
199 }
200
IsLoopActive()201 bool Worker::IsLoopActive()
202 {
203 uv_loop_t* loop = GetWorkerLoop();
204 if (loop != nullptr) {
205 return uv_loop_alive_taskpool(loop, initActiveHandleNum_);
206 } else {
207 HILOG_ERROR("taskpool:: worker loop is nullptr when judge loop alive.");
208 return false;
209 }
210 }
211
GetWaitTime()212 uint64_t Worker::GetWaitTime()
213 {
214 return ffrt_epoll_get_wait_time(ffrtTaskHandle_);
215 }
216 #endif
217
ExecuteInThread(const void* data)218 void Worker::ExecuteInThread(const void* data)
219 {
220 HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__);
221 auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
222 {
223 napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
224 if (worker->workerEnv_ == nullptr) {
225 HILOG_ERROR("taskpool:: worker create runtime failed");
226 return;
227 }
228 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
229 // mark worker env is taskpoolThread
230 workerEngine->MarkTaskPoolThread();
231 workerEngine->InitTaskPoolThread(worker->workerEnv_, Worker::TaskResultCallback);
232 }
233 uv_loop_t* loop = worker->GetWorkerLoop();
234 if (loop == nullptr) {
235 HILOG_ERROR("taskpool:: loop is nullptr");
236 return;
237 }
238 // save the worker tid
239 worker->tid_ = GetThreadId();
240
241 // Init worker task execute signal
242 ConcurrentHelper::UvHandleInit(loop, worker->performTaskSignal_, Worker::PerformTask, worker);
243 ConcurrentHelper::UvHandleInit(loop, worker->clearWorkerSignal_, Worker::ReleaseWorkerHandles, worker);
244 ConcurrentHelper::UvHandleInit(loop, worker->triggerGCCheckSignal_, Worker::TriggerGCCheck, worker);
245
246 HITRACE_HELPER_FINISH_TRACE;
247 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
248 // Init debugger task post signal
249 ConcurrentHelper::UvHandleInit(loop, worker->debuggerOnPostTaskSignal_, Worker::HandleDebuggerTask, worker);
250 #endif
251 if (worker->PrepareForWorkerInstance()) {
252 // Call after uv_async_init
253 worker->NotifyWorkerCreated();
254 #if defined(ENABLE_TASKPOOL_FFRT)
255 worker->InitFfrtInfo();
256 worker->InitLoopHandleNum();
257 #endif
258 worker->RunLoop();
259 } else {
260 HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail");
261 }
262 TaskManager::GetInstance().RemoveWorker(worker);
263 TaskManager::GetInstance().CountTraceForWorker();
264 worker->ReleaseWorkerThreadContent();
265 delete worker;
266 worker = nullptr;
267 }
268
PrepareForWorkerInstance()269 bool Worker::PrepareForWorkerInstance()
270 {
271 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
272 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
273 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
274 workerEngine->SetDebuggerPostTaskFunc([this](std::function<void()>&& task) {
275 this->DebuggerOnPostTask(std::move(task));
276 });
277 #endif
278 if (!workerEngine->CallInitWorkerFunc(workerEngine)) {
279 HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail");
280 return false;
281 }
282 // register timer interface
283 Timer::RegisterTime(workerEnv_);
284
285 // Check exception after worker construction
286 if (NapiHelper::IsExceptionPending(workerEnv_)) {
287 HILOG_ERROR("taskpool:: Worker construction occur exception");
288 return false;
289 }
290 return true;
291 }
292
ReleaseWorkerThreadContent()293 void Worker::ReleaseWorkerThreadContent()
294 {
295 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
296 auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_);
297 if (workerEngine == nullptr) {
298 HILOG_ERROR("taskpool:: workerEngine is nullptr");
299 return;
300 }
301 if (hostEngine != nullptr) {
302 if (!hostEngine->DeleteWorker(workerEngine)) {
303 HILOG_ERROR("taskpool:: DeleteWorker fail");
304 }
305 }
306 if (state_ == WorkerState::BLOCKED) {
307 HITRACE_HELPER_METER_NAME("Thread Timeout Exit");
308 } else {
309 HITRACE_HELPER_METER_NAME("Thread Exit");
310 }
311
312 Timer::ClearEnvironmentTimer(workerEnv_);
313 // 2. delete NativeEngine created in worker thread
314 if (!workerEngine->CallOffWorkerFunc(workerEngine)) {
315 HILOG_ERROR("worker:: CallOffWorkerFunc error");
316 }
317 delete workerEngine;
318 workerEnv_ = nullptr;
319 }
320
NotifyExecuteTask()321 void Worker::NotifyExecuteTask()
322 {
323 if (LIKELY(performTaskSignal_ != nullptr && !uv_is_closing(reinterpret_cast<uv_handle_t*>(performTaskSignal_)))) {
324 uv_async_send(performTaskSignal_);
325 }
326 }
327
NotifyIdle()328 void Worker::NotifyIdle()
329 {
330 TaskManager::GetInstance().NotifyWorkerIdle(this);
331 }
332
NotifyWorkerCreated()333 void Worker::NotifyWorkerCreated()
334 {
335 TaskManager::GetInstance().NotifyWorkerCreated(this);
336 }
337
NotifyTaskBegin()338 void Worker::NotifyTaskBegin()
339 {
340 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
341 workerEngine->NotifyTaskBegin();
342 }
343
TriggerGCCheck(const uv_async_t* req)344 void Worker::TriggerGCCheck(const uv_async_t* req)
345 {
346 if (req == nullptr || req->data == nullptr) {
347 HILOG_ERROR("taskpool:: req handle is invalid");
348 return;
349 }
350 auto worker = reinterpret_cast<Worker*>(req->data);
351 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
352 workerEngine->NotifyTaskFinished();
353 }
354
NotifyTaskFinished()355 void Worker::NotifyTaskFinished()
356 {
357 // trigger gc check by uv and return immediately if the handle is invalid
358 if (UNLIKELY(triggerGCCheckSignal_ == nullptr || uv_is_closing(
359 reinterpret_cast<uv_handle_t*>(triggerGCCheckSignal_)))) {
360 HILOG_ERROR("taskpool:: triggerGCCheckSignal_ is nullptr or closed");
361 return;
362 } else {
363 uv_async_send(triggerGCCheckSignal_);
364 }
365
366 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
367 if (--runningCount_ != 0 || workerEngine->HasPendingJob()) {
368 // the worker state is still RUNNING and the start time will be updated
369 startTime_ = ConcurrentHelper::GetMilliseconds();
370 } else {
371 UpdateWorkerState(WorkerState::RUNNING, WorkerState::IDLE);
372 }
373 idlePoint_ = ConcurrentHelper::GetMilliseconds();
374 }
375
UpdateWorkerState(WorkerState expect, WorkerState desired)376 bool Worker::UpdateWorkerState(WorkerState expect, WorkerState desired)
377 {
378 return state_.compare_exchange_strong(expect, desired);
379 }
380
PerformTask(const uv_async_t* req)381 void Worker::PerformTask(const uv_async_t* req)
382 {
383 uint64_t startTime = ConcurrentHelper::GetMilliseconds();
384 auto worker = static_cast<Worker*>(req->data);
385 napi_env env = worker->workerEnv_;
386 TaskManager::GetInstance().NotifyWorkerRunning(worker);
387 auto taskInfo = TaskManager::GetInstance().DequeueTaskId();
388 if (taskInfo.first == 0) {
389 worker->NotifyIdle();
390 return;
391 }
392 RunningScope runningScope(worker);
393 PriorityScope priorityScope(worker, taskInfo.second);
394 Task* task = TaskManager::GetInstance().GetTask(taskInfo.first);
395 if (task == nullptr) {
396 HILOG_DEBUG("taskpool:: task has been released");
397 return;
398 } else if (!task->IsValid() && task->ShouldDeleteTask(false)) {
399 HILOG_WARN("taskpool:: task is invalid");
400 delete task;
401 return;
402 }
403 // try to record the memory data for gc
404 worker->NotifyTaskBegin();
405
406 if (!task->UpdateTask(startTime, worker)) {
407 worker->NotifyTaskFinished();
408 return;
409 }
410 if (task->IsGroupTask() && (!TaskGroupManager::GetInstance().UpdateGroupState(task->groupId_))) {
411 return;
412 }
413 if (task->IsLongTask()) {
414 worker->UpdateLongTaskInfo(task);
415 }
416 worker->StoreTaskId(task->taskId_);
417 // tag for trace parse: Task Perform
418 std::string strTrace = "Task Perform: name : " + task->name_ + ", taskId : " + std::to_string(task->taskId_)
419 + ", priority : " + std::to_string(taskInfo.second);
420 HITRACE_HELPER_METER_NAME(strTrace);
421 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
422
423 napi_value func = nullptr;
424 napi_value args = nullptr;
425 napi_value errorInfo = task->DeserializeValue(env, &func, &args);
426 if (UNLIKELY(func == nullptr || args == nullptr)) {
427 if (errorInfo != nullptr) {
428 worker->NotifyTaskResult(env, task, errorInfo);
429 }
430 return;
431 }
432 if (!worker->InitTaskPoolFunc(env, func, task)) {
433 return;
434 }
435 worker->hasExecuted_ = true;
436 uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
437 napi_value argsArray[argsNum];
438 for (size_t i = 0; i < argsNum; i++) {
439 argsArray[i] = NapiHelper::GetElement(env, args, i);
440 }
441
442 if (!task->CheckStartExecution(taskInfo.second)) {
443 if (task->ShouldDeleteTask()) {
444 delete task;
445 }
446 return;
447 }
448 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, nullptr);
449 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
450 workerEngine->ClearCurrentTaskInfo();
451 task->DecreaseRefCount();
452 task->StoreTaskDuration();
453 worker->UpdateExecutedInfo();
454 HandleFunctionException(env, task);
455 }
456
NotifyTaskResult(napi_env env, Task* task, napi_value result)457 void Worker::NotifyTaskResult(napi_env env, Task* task, napi_value result)
458 {
459 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
460 HILOG_DEBUG("taskpool:: NotifyTaskResult task:%{public}s", std::to_string(task->taskId_).c_str());
461 void* resultData = nullptr;
462 napi_value undefined = NapiHelper::GetUndefinedValue(env);
463 bool defaultTransfer = true;
464 bool defaultCloneSendable = false;
465 napi_status status = napi_serialize_inner(env, result, undefined, undefined,
466 defaultTransfer, defaultCloneSendable, &resultData);
467 if ((status != napi_ok || resultData == nullptr) && task->success_) {
468 task->success_ = false;
469 std::string errMessage = "taskpool: failed to serialize result.";
470 HILOG_ERROR("%{public}s", errMessage.c_str());
471 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
472 NotifyTaskResult(env, task, err);
473 return;
474 }
475 task->result_ = resultData;
476 NotifyHandleTaskResult(task);
477 }
478
NotifyHandleTaskResult(Task* task)479 void Worker::NotifyHandleTaskResult(Task* task)
480 {
481 if (!task->IsReadyToHandle()) {
482 return;
483 }
484 Worker* worker = reinterpret_cast<Worker*>(task->worker_);
485 if (worker != nullptr) {
486 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
487 auto iter = std::find(worker->currentTaskId_.begin(), worker->currentTaskId_.end(), task->taskId_);
488 if (iter != worker->currentTaskId_.end()) {
489 worker->currentTaskId_.erase(iter);
490 }
491 } else {
492 HILOG_FATAL("taskpool:: worker is nullptr");
493 return;
494 }
495 if (!task->VerifyAndPostResult(worker->priority_)) {
496 if (task->ShouldDeleteTask()) {
497 delete task;
498 }
499 }
500 worker->NotifyTaskFinished();
501 }
502
TaskResultCallback(napi_env env, napi_value result, bool success, void* data)503 void Worker::TaskResultCallback(napi_env env, napi_value result, bool success, void* data)
504 {
505 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
506 if (env == nullptr) { // LCOV_EXCL_BR_LINE
507 HILOG_FATAL("taskpool:: TaskResultCallback engine is null");
508 return;
509 }
510 if (data == nullptr) { // LCOV_EXCL_BR_LINE
511 HILOG_FATAL("taskpool:: data is nullptr");
512 return;
513 }
514 Task* task = static_cast<Task*>(data);
515 auto taskId = reinterpret_cast<uint64_t>(task);
516 if (TaskManager::GetInstance().GetTask(taskId) == nullptr) {
517 HILOG_FATAL("taskpool:: task is nullptr");
518 return;
519 }
520 auto worker = static_cast<Worker*>(task->worker_);
521 worker->isExecutingLongTask_ = task->IsLongTask();
522 task->DecreaseRefCount();
523 task->ioTime_ = ConcurrentHelper::GetMilliseconds();
524 if (task->cpuTime_ != 0) {
525 uint64_t ioDuration = task->ioTime_ - task->startTime_;
526 uint64_t cpuDuration = task->cpuTime_ - task->startTime_;
527 TaskManager::GetInstance().StoreTaskDuration(task->taskId_, std::max(ioDuration, cpuDuration), cpuDuration);
528 }
529 task->success_ = success;
530 NotifyTaskResult(env, task, result);
531 }
532
533 // reset qos_user_initiated after perform task
ResetWorkerPriority()534 void Worker::ResetWorkerPriority()
535 {
536 if (priority_ != Priority::HIGH) {
537 if (TaskManager::GetInstance().EnableFfrt()) {
538 #if defined(ENABLE_TASKPOOL_FFRT)
539 if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(Priority::HIGH)) != 0) {
540 SetWorkerPriority(Priority::HIGH);
541 }
542 #endif
543 } else {
544 SetWorkerPriority(Priority::HIGH);
545 }
546 priority_ = Priority::HIGH;
547 }
548 }
549
StoreTaskId(uint64_t taskId)550 void Worker::StoreTaskId(uint64_t taskId)
551 {
552 std::lock_guard<std::mutex> lock(currentTaskIdMutex_);
553 currentTaskId_.emplace_back(taskId);
554 }
555
InitTaskPoolFunc(napi_env env, napi_value func, Task* task)556 bool Worker::InitTaskPoolFunc(napi_env env, napi_value func, Task* task)
557 {
558 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
559 bool success = workerEngine->InitTaskPoolFunc(env, func, task);
560 napi_value exception;
561 napi_get_and_clear_last_exception(env, &exception);
562 if (exception != nullptr) {
563 HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception");
564 task->success_ = false;
565 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
566 NotifyTaskResult(env, task, errorEvent);
567 return false;
568 }
569 if (!success) {
570 HILOG_ERROR("taskpool:: InitTaskPoolFunc fail");
571 napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR,
572 "taskpool:: function may not be concurrent.");
573 task->success_ = false;
574 NotifyTaskResult(env, task, err);
575 return false;
576 }
577 return true;
578 }
579
UpdateExecutedInfo()580 void Worker::UpdateExecutedInfo()
581 {
582 // if the worker is blocked, just skip
583 if (LIKELY(state_ != WorkerState::BLOCKED)) {
584 uint64_t duration = ConcurrentHelper::GetMilliseconds() - startTime_;
585 TaskManager::GetInstance().UpdateExecutedInfo(duration);
586 }
587 }
588
589 // Only when the worker has no longTask can it be released.
TerminateTask(uint64_t taskId)590 void Worker::TerminateTask(uint64_t taskId)
591 {
592 HILOG_DEBUG("taskpool:: TerminateTask task:%{public}s", std::to_string(taskId).c_str());
593 std::lock_guard<std::mutex> lock(longMutex_);
594 longTasksSet_.erase(taskId);
595 if (longTasksSet_.empty()) {
596 hasLongTask_ = false;
597 }
598 }
599
600 // to store longTasks' state
UpdateLongTaskInfo(Task* task)601 void Worker::UpdateLongTaskInfo(Task* task)
602 {
603 HILOG_DEBUG("taskpool:: UpdateLongTaskInfo task:%{public}s", std::to_string(task->taskId_).c_str());
604 TaskManager::GetInstance().StoreLongTaskInfo(task->taskId_, this);
605 std::lock_guard<std::mutex> lock(longMutex_);
606 hasLongTask_ = true;
607 isExecutingLongTask_ = true;
608 longTasksSet_.emplace(task->taskId_);
609 }
610
IsExecutingLongTask()611 bool Worker::IsExecutingLongTask()
612 {
613 return isExecutingLongTask_;
614 }
615
HasLongTask()616 bool Worker::HasLongTask()
617 {
618 return hasLongTask_;
619 }
620
HandleFunctionException(napi_env env, Task* task)621 void Worker::HandleFunctionException(napi_env env, Task* task)
622 {
623 napi_value exception;
624 napi_get_and_clear_last_exception(env, &exception);
625 if (exception != nullptr) {
626 HILOG_ERROR("taskpool::PerformTask occur exception");
627 task->DecreaseRefCount();
628 task->success_ = false;
629 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
630 NotifyTaskResult(env, task, errorEvent);
631 return;
632 }
633 NotifyHandleTaskResult(task);
634 }
635
PostReleaseSignal()636 void Worker::PostReleaseSignal()
637 {
638 if (UNLIKELY(clearWorkerSignal_ == nullptr || uv_is_closing(
639 reinterpret_cast<uv_handle_t*>(clearWorkerSignal_)))) {
640 HILOG_ERROR("taskpool:: clearWorkerSignal_ is nullptr or closed");
641 return;
642 }
643 uv_async_send(clearWorkerSignal_);
644 }
645 } // namespace Commonlibrary::Concurrent::TaskPoolModule