1/*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <cinttypes>
17
18#include "native_safe_async_work.h"
19
20#include "ecmascript/napi/include/jsnapi.h"
21#include "napi/native_api.h"
22#include "native_api_internal.h"
23#include "native_async_work.h"
24#include "native_engine.h"
25#include "native_value.h"
26#include "securec.h"
27#include "utils/log.h"
28
29#ifdef ENABLE_CONTAINER_SCOPE
30#include "core/common/container_scope.h"
31#endif
32
33#ifdef ENABLE_CONTAINER_SCOPE
34using OHOS::Ace::ContainerScope;
35#endif
36
37#if defined(ENABLE_EVENT_HANDLER)
38#include "event_handler.h"
39using namespace OHOS::AppExecFwk;
40#endif
41
42// static methods start
43void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
44{
45    HILOG_DEBUG("NativeSafeAsyncWork::AsyncCallback called");
46    NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
47    if (that == nullptr) {
48        HILOG_ERROR("NativeSafeAsyncWork:: DereferenceOf failed!");
49        return;
50    }
51    that->ProcessAsyncHandle();
52}
53
54void NativeSafeAsyncWork::CallJs(NativeEngine* engine, napi_value js_call_func, void* context, void* data)
55{
56    if (engine == nullptr || js_call_func == nullptr) {
57        HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
58        return;
59    }
60    napi_value value = nullptr;
61    napi_get_undefined(reinterpret_cast<napi_env>(engine), &value);
62    if (value == nullptr) {
63        HILOG_ERROR("CreateUndefined failed");
64        return;
65    }
66
67    auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
68    if (resultValue == nullptr) {
69        HILOG_ERROR("CallFunction failed");
70    }
71}
72
73NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
74                                         napi_value func,
75                                         napi_value asyncResource,
76                                         napi_value asyncResourceName,
77                                         size_t maxQueueSize,
78                                         size_t threadCount,
79                                         void* finalizeData,
80                                         NativeFinalize finalizeCallback,
81                                         void* context,
82                                         NativeThreadSafeFunctionCallJs callJsCallback)
83    :engine_(engine), engineId_(engine->GetId()), maxQueueSize_(maxQueueSize),
84    threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
85    context_(context), callJsCallback_(callJsCallback)
86{
87    asyncContext_.napiAsyncResource = asyncResource;
88    asyncContext_.napiAsyncResourceName = asyncResourceName;
89
90    errno_t err = EOK;
91    err = memset_s(&asyncHandler_, sizeof(asyncHandler_), 0, sizeof(asyncHandler_));
92    if (err != EOK) {
93        HILOG_ERROR("faild to init asyncHandler_");
94        return;
95    }
96
97    if (func != nullptr) {
98        uint32_t initialRefcount = 1;
99        ref_ = engine->CreateReference(func, initialRefcount);
100    }
101
102#ifdef ENABLE_CONTAINER_SCOPE
103    containerScopeId_ = ContainerScope::CurrentId();
104#endif
105
106#if defined(ENABLE_EVENT_HANDLER)
107    std::shared_ptr<EventRunner> runner = EventRunner::Current();
108    if (runner != nullptr) {
109        eventHandler_ = std::make_shared<EventHandler>(runner);
110    }
111#endif
112}
113
114NativeSafeAsyncWork::~NativeSafeAsyncWork()
115{
116    if (ref_ != nullptr) {
117        delete ref_;
118        ref_ = nullptr;
119    }
120}
121
122bool NativeSafeAsyncWork::Init()
123{
124    HILOG_DEBUG("NativeSafeAsyncWork::Init called");
125
126    uv_loop_t* loop = engine_->GetUVLoop();
127    if (loop == nullptr) {
128        HILOG_ERROR("Get loop failed");
129        return false;
130    }
131
132    int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
133    if (ret != 0) {
134        HILOG_ERROR("uv async init failed %d", ret);
135        return false;
136    }
137
138    status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
139    return true;
140}
141
142bool NativeSafeAsyncWork::IsMaxQueueSize()
143{
144    return (queue_.size() > maxQueueSize_ &&
145           maxQueueSize_ > 0 &&
146           status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
147           status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED);
148}
149
150SafeAsyncCode NativeSafeAsyncWork::ValidEngineCheck()
151{
152    if (!NativeEngine::IsAlive(engine_)) {
153        HILOG_ERROR("napi_env has been destoryed");
154        return SafeAsyncCode::SAFE_ASYNC_FAILED;
155    } else if (engineId_ != engine_->GetId()) {
156        LOG_IF_SPECIAL(UNLIKELY(engine_->IsCrossThreadCheckEnabled()),
157                       "current tsfn was created by dead env, "
158                       "owner id: %{public}" PRIu64 ", current id: %{public}" PRIu64,
159                       engineId_, engine_->GetId());
160        return SafeAsyncCode::SAFE_ASYNC_CLOSED;
161    }
162    return SafeAsyncCode::SAFE_ASYNC_OK;
163}
164
165SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
166{
167    std::unique_lock<std::mutex> lock(mutex_);
168    if (IsMaxQueueSize()) {
169        HILOG_INFO("queue size bigger than max queue size");
170        if (mode == NATIVE_TSFUNC_BLOCKING) {
171            while (IsMaxQueueSize()) {
172                condition_.wait(lock);
173            }
174        } else {
175            return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
176        }
177    }
178
179    if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
180        status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
181        if (threadCount_ == 0) {
182            return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
183        } else {
184            threadCount_--;
185            return SafeAsyncCode::SAFE_ASYNC_CLOSED;
186        }
187    } else {
188        SafeAsyncCode checkRet = ValidEngineCheck();
189        if (checkRet != SafeAsyncCode::SAFE_ASYNC_OK) {
190            return checkRet;
191        }
192        queue_.emplace(data);
193        auto ret = uv_async_send(&asyncHandler_);
194        if (ret != 0) {
195            HILOG_ERROR("uv async send failed %d", ret);
196            return SafeAsyncCode::SAFE_ASYNC_FAILED;
197        }
198    }
199
200    return SafeAsyncCode::SAFE_ASYNC_OK;
201}
202
203SafeAsyncCode NativeSafeAsyncWork::Acquire()
204{
205    HILOG_DEBUG("NativeSafeAsyncWork::Acquire called");
206
207    std::unique_lock<std::mutex> lock(mutex_);
208
209    if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
210        status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
211        HILOG_WARN("Do not acquire, thread is closed!");
212        return SafeAsyncCode::SAFE_ASYNC_CLOSED;
213    }
214
215    // increase thread count
216    threadCount_++;
217
218    return SafeAsyncCode::SAFE_ASYNC_OK;
219}
220
221SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
222{
223    HILOG_DEBUG("NativeSafeAsyncWork::Release called");
224
225    std::unique_lock<std::mutex> lock(mutex_);
226
227    if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
228        status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
229        HILOG_WARN("Do not release, thread is closed!");
230        return SafeAsyncCode::SAFE_ASYNC_CLOSED;
231    }
232
233    if (threadCount_ == 0) {
234        HILOG_ERROR("Do not release, thread count is zero.");
235        return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
236    }
237
238    // decrease thread count
239    threadCount_--;
240
241    if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
242        status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
243        if (maxQueueSize_ > 0) {
244            condition_.notify_one();
245        }
246    }
247
248    if (threadCount_ == 0 ||
249        mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
250        SafeAsyncCode checkRet = ValidEngineCheck();
251        if (checkRet != SafeAsyncCode::SAFE_ASYNC_OK) {
252            return checkRet;
253        }
254        // trigger async handle
255        auto ret = uv_async_send(&asyncHandler_);
256        if (ret != 0) {
257            HILOG_ERROR("uv async send failed %d", ret);
258            return SafeAsyncCode::SAFE_ASYNC_FAILED;
259        }
260    }
261
262    return SafeAsyncCode::SAFE_ASYNC_OK;
263}
264
265bool NativeSafeAsyncWork::Ref()
266{
267    if (!IsSameTid()) {
268        HILOG_ERROR("tid not same");
269        return false;
270    }
271
272    uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
273
274    return true;
275}
276
277bool NativeSafeAsyncWork::Unref()
278{
279    if (!IsSameTid()) {
280        HILOG_ERROR("tid not same");
281        return false;
282    }
283
284    uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
285
286    return true;
287}
288
289void* NativeSafeAsyncWork::GetContext()
290{
291    return context_;
292}
293
294void NativeSafeAsyncWork::ProcessAsyncHandle()
295{
296    std::unique_lock<std::mutex> lock(mutex_);
297    if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
298        HILOG_ERROR("Process failed, thread is closed!");
299        return;
300    }
301
302    if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
303        HILOG_ERROR("thread is closing!");
304        CloseHandles();
305        return;
306    }
307
308    size_t size = queue_.size();
309    void* data = nullptr;
310
311    auto vm = engine_->GetEcmaVm();
312    panda::LocalScope scope(vm);
313#ifdef ENABLE_CONTAINER_SCOPE
314    ContainerScope containerScope(containerScopeId_);
315#endif
316    TryCatch tryCatch(reinterpret_cast<napi_env>(engine_));
317    while (size > 0) {
318        data = queue_.front();
319
320        // when queue is full, notify send.
321        if (size == maxQueueSize_ && maxQueueSize_ > 0) {
322            condition_.notify_one();
323        }
324
325        napi_value func_ = (ref_ == nullptr) ? nullptr : ref_->Get(engine_);
326        lock.unlock();
327        if (callJsCallback_ != nullptr) {
328            callJsCallback_(engine_, func_, context_, data);
329        } else {
330            CallJs(engine_, func_, context_, data);
331        }
332        lock.lock();
333
334        if (tryCatch.HasCaught()) {
335            engine_->HandleUncaughtException();
336        }
337        queue_.pop();
338        size--;
339    }
340
341    if (size == 0 && threadCount_ == 0) {
342        CloseHandles();
343    }
344}
345
346SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
347{
348    HILOG_DEBUG("NativeSafeAsyncWork::CloseHandles called");
349
350    if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
351        HILOG_INFO("Close failed, thread is closed!");
352        return SafeAsyncCode::SAFE_ASYNC_CLOSED;
353    }
354
355    status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
356
357    // close async handler
358    uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
359        NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
360            reinterpret_cast<uv_async_t*>(handle));
361        that->CleanUp();
362    });
363
364    return SafeAsyncCode::SAFE_ASYNC_OK;
365}
366
367void NativeSafeAsyncWork::CleanUp()
368{
369    HILOG_DEBUG("NativeSafeAsyncWork::CleanUp called");
370
371    if (finalizeCallback_ != nullptr) {
372        finalizeCallback_(engine_, finalizeData_, context_);
373    }
374
375    // clean data
376    while (!queue_.empty()) {
377        if (callJsCallback_ != nullptr) {
378            callJsCallback_(nullptr, nullptr, context_, queue_.front());
379        } else {
380            CallJs(nullptr, nullptr, context_, queue_.front());
381        }
382        queue_.pop();
383    }
384    delete this;
385}
386
387bool NativeSafeAsyncWork::IsSameTid()
388{
389    auto tid = pthread_self();
390    return (tid == engine_->GetTid()) ? true : false;
391}
392
393napi_status NativeSafeAsyncWork::PostTask(void *data, int32_t priority, bool isTail)
394{
395#if defined(ENABLE_EVENT_HANDLER)
396    HILOG_DEBUG("NativeSafeAsyncWork::PostTask called");
397    std::unique_lock<std::mutex> lock(eventHandlerMutex_);
398    if (engine_ == nullptr || eventHandler_ == nullptr) {
399        HILOG_ERROR("post task failed due to nullptr engine or eventHandler");
400        return napi_status::napi_generic_failure;
401    }
402    // the task will be execute at main thread or worker thread
403    auto task = [this, data]() {
404        HILOG_DEBUG("The task is executing in main thread or worker thread");
405        napi_value func_ = (this->ref_ == nullptr) ? nullptr : this->ref_->Get(engine_);
406        if (this->callJsCallback_ != nullptr) {
407            this->callJsCallback_(engine_, func_, context_, data);
408        } else {
409            CallJs(engine_, func_, context_, data);
410        }
411    };
412
413    bool res = false;
414    if (isTail) {
415        HILOG_DEBUG("The task is posted from tail");
416        res = eventHandler_->PostTask(task, static_cast<EventQueue::Priority>(priority));
417    } else {
418        HILOG_DEBUG("The task is posted from head");
419        res = eventHandler_->PostTaskAtFront(task, std::string(), static_cast<EventQueue::Priority>(priority));
420    }
421
422    return res ? napi_status::napi_ok : napi_status::napi_generic_failure;
423#else
424    HILOG_WARN("EventHandler feature is not supported");
425    return napi_status::napi_generic_failure;
426#endif
427}
428
429napi_status NativeSafeAsyncWork::SendEvent(const std::function<void()> &cb, napi_event_priority priority)
430{
431#ifdef ENABLE_EVENT_HANDLER
432    if (eventHandler_) {
433        auto task = [eng = engine_, cb]() {
434            auto vm = eng->GetEcmaVm();
435            panda::LocalScope scope(vm);
436            cb();
437        };
438        if (eventHandler_->PostTask(task, static_cast<EventQueue::Priority>(priority)))
439            return napi_status::napi_ok;
440        else
441            return napi_status::napi_generic_failure;
442    }
443#endif
444    CallbackWrapper *cbw = new (std::nothrow) CallbackWrapper();
445    if (!cbw) {
446        HILOG_ERROR("malloc failed!");
447        return napi_status::napi_generic_failure;
448    }
449    cbw->cb = cb;
450    auto code = Send(reinterpret_cast<void *>(cbw), NATIVE_TSFUNC_NONBLOCKING);
451
452    napi_status status = napi_status::napi_ok;
453    switch (code) {
454        case SafeAsyncCode::SAFE_ASYNC_OK:
455            status = napi_status::napi_ok;
456            break;
457        case SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL:
458            status = napi_status::napi_queue_full;
459            break;
460        case SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS:
461            status = napi_status::napi_invalid_arg;
462            break;
463        case SafeAsyncCode::SAFE_ASYNC_CLOSED:
464            status = napi_status::napi_closing;
465            break;
466        case SafeAsyncCode::SAFE_ASYNC_FAILED:
467            status = napi_status::napi_generic_failure;
468            break;
469        default:
470            HILOG_FATAL("this branch is unreachable, code is %{public}d", code);
471            status = napi_status::napi_generic_failure;
472            break;
473    }
474    if (status != napi_status::napi_ok) {
475        HILOG_ERROR("send event failed(%{public}d)", status);
476        delete cbw;
477        cbw = nullptr;
478    }
479    return status;
480}