1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef JS_CONCURRENT_MODULE_WORKER_WORKER_H
17 #define JS_CONCURRENT_MODULE_WORKER_WORKER_H
18 
19 #include <condition_variable>
20 #include <list>
21 #include <map>
22 #include <mutex>
23 
24 #include "helper/napi_helper.h"
25 #include "helper/object_helper.h"
26 #include "message_queue.h"
27 #include "napi/native_api.h"
28 #include "napi/native_node_api.h"
29 #include "native_engine/native_engine.h"
30 #include "worker_runner.h"
31 #if defined(ENABLE_WORKER_EVENTHANDLER)
32 #include "event_handler.h"
33 #endif
34 
35 namespace Commonlibrary::Concurrent::WorkerModule {
36 using namespace Commonlibrary::Concurrent::Common::Helper;
37 
38 class Worker {
39 public:
40     enum RunnerState { STARTING, RUNNING, TERMINATEING, TERMINATED };
41     enum HostState { ACTIVE, INACTIVE };
42     enum ListenerMode { ONCE, PERMANENT };
43     enum ScriptMode { CLASSIC, MODULE };
44 
45     using DebuggerPostTask = std::function<void()>;
46 
47     struct WorkerListener {
WorkerListenerCommonlibrary::Concurrent::WorkerModule::Worker::WorkerListener48         WorkerListener(napi_env env, napi_ref callback, ListenerMode mode)
49             : env_(env), callback_(callback), mode_(mode)
50         {}
51 
~WorkerListenerCommonlibrary::Concurrent::WorkerModule::Worker::WorkerListener52         ~WorkerListener()
53         {
54             NapiHelper::DeleteReference(env_, callback_);
55             callback_ = nullptr;
56         }
57 
NextIsAvailableCommonlibrary::Concurrent::WorkerModule::Worker::WorkerListener58         bool NextIsAvailable() const
59         {
60             return mode_ != ONCE;
61         }
62 
SetModeCommonlibrary::Concurrent::WorkerModule::Worker::WorkerListener63         void SetMode(ListenerMode mode)
64         {
65             mode_ = mode;
66         }
67 
68         bool operator==(const WorkerListener& listener) const;
69 
70         napi_env env_ {NULL};
71         napi_ref callback_ {NULL};
72         ListenerMode mode_ {PERMANENT};
73     };
74 
75     struct FindWorkerListener {
FindWorkerListenerCommonlibrary::Concurrent::WorkerModule::Worker::FindWorkerListener76         FindWorkerListener(napi_env env, napi_ref ref) : env_(env), ref_(ref) {}
77 
operator ()Commonlibrary::Concurrent::WorkerModule::Worker::FindWorkerListener78         bool operator()(const WorkerListener* listener) const
79         {
80             napi_value compareObj = NapiHelper::GetReferenceValue(env_, listener->callback_);
81             napi_value obj = NapiHelper::GetReferenceValue(env_, ref_);
82             // the env of listener and cmp listener must be same env because of Synchronization method
83             return NapiHelper::StrictEqual(env_, compareObj, obj);
84         }
85 
86         napi_env env_ {nullptr};
87         napi_ref ref_ {nullptr};
88     };
89 
90     struct WorkerParams {
91         std::string name_ {};
92         ScriptMode type_ {CLASSIC};
93     };
94 
95     /**
96     * Creates a worker instance.
97     *
98     * @param env NAPI environment parameters.
99     * @param thisVar URL of the script to be executed by the worker.
100     */
101     Worker(napi_env env, napi_ref thisVar);
102 
103     /**
104         * The destructor of the Worker.
105         */
106     ~Worker();
107 
108     /**
109      * The host thread receives the information.
110      *
111      * @param req The value of the object passed in by the js layer.
112      */
113     static void HostOnMessage(const uv_async_t* req);
114 
115     /**
116      * The host thread receives the information.
117      *
118      * @param req The value of the object passed in by the js layer.
119      */
120     static void HostOnError(const uv_async_t* req);
121 
122     /**
123      * The worker thread receives the information.
124      *
125      * @param req The value of the object passed in by the js layer.
126      */
127     static void WorkerOnMessage(const uv_async_t* req);
128 
129     /**
130      * ExecuteIn in thread.
131      *
132      * @param data The worker pointer.
133      */
134     static void ExecuteInThread(const void* data);
135 
136     /**
137     * Post a message.
138     *
139     * @param env NAPI environment parameters.
140     * @param thisVar The callback information of the js layer.
141     */
142     static napi_value PostMessage(napi_env env, napi_callback_info cbinfo);
143 
144     /**
145     * Post a message, if has sendable objects in it pass sendable objects' reference.
146     *
147     * @param env NAPI environment parameters.
148     * @param thisVar The callback information of the js layer.
149     */
150     static napi_value PostMessageWithSharedSendable(napi_env env, napi_callback_info cbinfo);
151 
152     /**
153     * postMessage implementation
154     *
155     * @param env NAPI environment parameters.
156     * @param thisVar The callback information of the js layer.
157     */
158     static napi_value CommonPostMessage(napi_env env, napi_callback_info cbinfo, bool cloneSendable);
159 
160     /**
161      * Add event listeners to host.
162      *
163      * @param env NAPI environment parameters.
164      * @param cbinfo The callback information of the js layer.
165      */
166     static napi_value PostMessageToHost(napi_env env, napi_callback_info cbinfo);
167 
168     /**
169     * Post a message, if has sendable objects in it pass sendable objects' reference.
170     *
171     * @param env NAPI environment parameters.
172     * @param thisVar The callback information of the js layer.
173     */
174     static napi_value PostMessageWithSharedSendableToHost(napi_env env, napi_callback_info cbinfo);
175 
176     /**
177     * postMessage implementation
178     *
179     * @param env NAPI environment parameters.
180     * @param thisVar The callback information of the js layer.
181     */
182     static napi_value CommonPostMessageToHost(napi_env env, napi_callback_info cbinfo, bool cloneSendable);
183 
184     /**
185      * Terminates the worker thread to stop the worker from receiving messages.
186      *
187      * @param env NAPI environment parameters.
188      * @param cbinfo The callback information of the js layer.
189      */
190     static napi_value Terminate(napi_env env, napi_callback_info cbinfo);
191 
192     /**
193      * Close the worker.
194      *
195      * @param env NAPI environment parameters.
196      * @param cbinfo The callback information of the js layer.
197      */
198     static napi_value CloseWorker(napi_env env, napi_callback_info cbinfo);
199 
200     /**
201      * Adds an event listener to the worker.
202      *
203      * @param env NAPI environment parameters.
204      * @param cbinfo The callback information of the js layer.
205      */
206     static napi_value On(napi_env env, napi_callback_info cbinfo);
207 
208     /**
209      * Adds an event listener to the worker and removes the event listener automatically after it is invoked once.
210      *
211      * @param env NAPI environment parameters.
212      * @param cbinfo The callback information of the js layer.
213      */
214     static napi_value Once(napi_env env, napi_callback_info cbinfo);
215 
216     /**
217      * Removes an event listener to the worker.
218      *
219      * @param env NAPI environment parameters.
220      * @param cbinfo The callback information of the js layer.
221      */
222     static napi_value Off(napi_env env, napi_callback_info cbinfo);
223 
224     /**
225      * Add event listeners.
226      *
227      * @param env NAPI environment parameters.
228      * @param cbinfo The callback information of the js layer.
229      */
230     static napi_value AddEventListener(napi_env env, napi_callback_info cbinfo);
231 
232     /**
233      * Dispatch the event.
234      *
235      * @param env NAPI environment parameters.
236      * @param cbinfo The callback information of the js layer.
237      */
238     static napi_value DispatchEvent(napi_env env, napi_callback_info cbinfo);
239 
240     /**
241      * Remove the event listener.
242      *
243      * @param env NAPI environment parameters.
244      * @param cbinfo The callback information of the js layer.
245      */
246     static napi_value RemoveEventListener(napi_env env, napi_callback_info cbinfo);
247 
248     /**
249      * Remove all listener.
250      *
251      * @param env NAPI environment parameters.
252      * @param cbinfo The callback information of the js layer.
253      */
254     static napi_value RemoveAllListener(napi_env env, napi_callback_info cbinfo);
255 
256     /**
257      * Add the listener.
258      *
259      * @param env NAPI environment parameters.
260      * @param cbinfo The callback information of the js layer.
261      */
262     static napi_value AddListener(napi_env env, napi_callback_info cbinfo, ListenerMode mode);
263 
264     /**
265      * Remove the listener.
266      *
267      * @param env NAPI environment parameters.
268      * @param cbinfo The callback information of the js layer.
269      */
270     static napi_value RemoveListener(napi_env env, napi_callback_info cbinfo);
271 
272     /**
273      * The constructor of worker.
274      *
275      * @param env NAPI environment parameters.
276      * @param cbinfo The callback information of the js layer.
277      */
278     static napi_value LimitedWorkerConstructor(napi_env env, napi_callback_info cbinfo);
279     static napi_value ThreadWorkerConstructor(napi_env env, napi_callback_info cbinfo);
280     static napi_value WorkerConstructor(napi_env env, napi_callback_info cbinfo);
281     static napi_value Constructor(napi_env env, napi_callback_info cbinfo, bool limitSign = false,
282                                   WorkerVersion version = WorkerVersion::NONE);
283 
284     /**
285      * Initialize the worker and port.
286      *
287      * @param env NAPI environment parameters.
288      * @param cbinfo The callback information of the js layer.
289      */
290     static napi_value InitWorker(napi_env env, napi_value exports);
291     static napi_value InitPort(napi_env env, napi_value exports);
292 
293     /**
294      * Cancel the task.
295      *
296      * @param env NAPI environment parameters.
297      * @param cbinfo The callback information of the js layer.
298      */
299     static napi_value CancelTask(napi_env env, napi_callback_info cbinfo);
300 
301     /**
302      * The parent port cancels the task.
303      *
304      * @param env NAPI environment parameters.
305      * @param cbinfo The callback information of the js layer.
306      */
307     static napi_value ParentPortCancelTask(napi_env env, napi_callback_info cbinfo);
308 
309     /**
310      * The parent port adds an event listener.
311      *
312      * @param env NAPI environment parameters.
313      * @param cbinfo The callback information of the js layer.
314      */
315     static napi_value ParentPortAddEventListener(napi_env env, napi_callback_info cbinfo);
316 
317     /**
318      * The parent port removes all event listener.
319      *
320      * @param env NAPI environment parameters.
321      * @param cbinfo The callback information of the js layer.
322      */
323     static napi_value ParentPortRemoveAllListener(napi_env env, napi_callback_info cbinfo);
324 
325     /**
326      * The parent port dispatch the event listener.
327      *
328      * @param env NAPI environment parameters.
329      * @param cbinfo The callback information of the js layer.
330      */
331     static napi_value ParentPortDispatchEvent(napi_env env, napi_callback_info cbinfo);
332 
333     /**
334      * The parent port removes the event listener.
335      *
336      * @param env NAPI environment parameters.
337      * @param cbinfo The callback information of the js layer.
338      */
339     static napi_value ParentPortRemoveEventListener(napi_env env, napi_callback_info cbinfo);
340 
341     /**
342      * Register a globalCallObject on host side.
343      *
344      * @param env NAPI environment parameters.
345      * @param cbinfo The callback information of the js layer.
346      */
347     static napi_value RegisterGlobalCallObject(napi_env env, napi_callback_info cbinfo);
348 
349     /**
350      * Unregister the specific globalCallObject on host side.
351      *
352      * @param env NAPI environment parameters.
353      * @param cbinfo The callback information of the js layer.
354      */
355     static napi_value UnregisterGlobalCallObject(napi_env env, napi_callback_info cbinfo);
356 
357     /**
358      * Post a global synchronized call request to an object registered on host side.
359      *
360      * @param env NAPI environment parameters.
361      * @param cbinfo The callback information of the js layer.
362      */
363     static napi_value GlobalCall(napi_env env, napi_callback_info cbinfo);
364 
365     static void HostOnGlobalCall(const uv_async_t* req);
366 
367     static bool CanCreateWorker(napi_env env, WorkerVersion target);
368 
369     static WorkerParams* CheckWorkerArgs(napi_env env, napi_value argsValue);
370 
371     static void WorkerThrowError(napi_env env, int32_t errCode, const char* errMessage = nullptr);
372 
373     static void WorkerDestructor(napi_env env, void *data, void *hint);
374     static void HostEnvCleanCallback(void *data);
375 
376 #if defined(ENABLE_WORKER_EVENTHANDLER)
377     static std::shared_ptr<OHOS::AppExecFwk::EventHandler> GetMainThreadHandler();
378 #endif
379 
380     void StartExecuteInThread(napi_env env, const char* script);
381 
382     bool UpdateWorkerState(RunnerState state);
383     bool UpdateHostState(HostState state);
384 
IsNotTerminate() const385     bool IsNotTerminate() const
386     {
387         return runnerState_.load(std::memory_order_acquire) <= RUNNING;
388     }
389 
IsRunning() const390     bool IsRunning() const
391     {
392         return runnerState_.load(std::memory_order_acquire) == RUNNING;
393     }
394 
IsTerminated() const395     bool IsTerminated() const
396     {
397         return runnerState_.load(std::memory_order_acquire) >= TERMINATED;
398     }
399 
IsTerminating() const400     bool IsTerminating() const
401     {
402         return runnerState_.load(std::memory_order_acquire) == TERMINATEING;
403     }
404 
SetScriptMode(ScriptMode mode)405     void SetScriptMode(ScriptMode mode)
406     {
407         scriptMode_ = mode;
408     }
409 
410     void AddListenerInner(napi_env env, const char* type, const WorkerListener* listener);
411     void RemoveListenerInner(napi_env env, const char* type, napi_ref callback);
412     void RemoveAllListenerInner();
413     void EraseWorker();
GetWorkerLoop() const414     uv_loop_t* GetWorkerLoop() const
415     {
416         if (workerEnv_ != nullptr) {
417             return NapiHelper::GetLibUV(workerEnv_);
418         }
419         return nullptr;
420     }
421 
SetWorkerEnv(napi_env workerEnv)422     void SetWorkerEnv(napi_env workerEnv)
423     {
424         workerEnv_ = workerEnv;
425         if (workerEnvCallback_) {
426             workerEnvCallback_(workerEnv_);
427         }
428     }
429 
GetScript() const430     std::string GetScript() const
431     {
432         return script_;
433     }
434 
GetName() const435     std::string GetName() const
436     {
437         return name_;
438     }
439 
ClearWorkerTasks()440     bool ClearWorkerTasks()
441     {
442         if (hostEnv_ != nullptr) {
443             workerMessageQueue_.Clear(hostEnv_);
444             return true;
445         }
446         return false;
447     }
448 
HostIsStop() const449     bool HostIsStop() const
450     {
451         return hostState_.load(std::memory_order_acquire) == INACTIVE;
452     }
453 
IsSameWorkerEnv(napi_env env) const454     bool IsSameWorkerEnv(napi_env env) const
455     {
456         return workerEnv_ == env;
457     }
458 
Loop() const459     void Loop() const
460     {
461         uv_loop_t* loop = GetWorkerLoop();
462         if (loop != nullptr) {
463             uv_run(loop, UV_RUN_DEFAULT);
464         } else {
465             return;
466         }
467     }
468 
RegisterCallbackForWorkerEnv(std::function<void (napi_env)> callback)469     void RegisterCallbackForWorkerEnv(std::function<void (napi_env)> callback)
470     {
471         workerEnvCallback_ = callback;
472         if (workerEnv_ != nullptr) {
473             workerEnvCallback_(workerEnv_);
474         }
475     }
476 
GetWorkerEnv() const477     napi_env GetWorkerEnv() const
478     {
479         return workerEnv_;
480     }
481 
GetHostEnv() const482     napi_env GetHostEnv() const
483     {
484         return hostEnv_;
485     }
486 
487 private:
488     void WorkerOnMessageInner();
489     void HostOnMessageInner();
490     void HostOnErrorInner();
491     void HostOnMessageErrorInner();
492     void HostOnGlobalCallInner();
493     void WorkerOnMessageErrorInner();
494     void WorkerOnErrorInner(napi_value error);
495 
496     void HandleHostException() const;
497     void HandleException();
498     void HandleUncaughtException(napi_value exception);
499     bool CallWorkerFunction(size_t argc, const napi_value* argv, const char* methodName, bool tryCatch);
500     void CallHostFunction(size_t argc, const napi_value* argv, const char* methodName) const;
501 
502     bool HandleEventListeners(napi_env env, napi_value recv, size_t argc, const napi_value* argv, const char* type);
503     void ParentPortHandleEventListeners(napi_env env, napi_value recv, size_t argc,
504                                         const napi_value* argv, const char* type, bool tryCatch);
505     void TerminateInner();
506 
507     void PostMessageInner(MessageDataType data);
508     void PostMessageToHostInner(MessageDataType data);
509 
510     void TerminateWorker();
511 
512     void CloseInner();
513 
514     void PublishWorkerOverSignal();
515     void CloseWorkerCallback();
516     void CloseHostCallback();
517 
518     void PostWorkerOverTask();
519     void PostWorkerErrorTask();
520     void PostWorkerMessageTask();
521     void PostWorkerGlobalCallTask();
522     static bool IsValidWorker(Worker* worker);
523 
524     void InitHostHandle(uv_loop_t* loop);
525     void CloseHostHandle();
526 
527     void ReleaseWorkerThreadContent();
528     void ReleaseHostThreadContent();
529     bool PrepareForWorkerInstance();
530     void ParentPortAddListenerInner(napi_env env, const char* type, const WorkerListener* listener);
531     void ParentPortRemoveAllListenerInner();
532     void ParentPortRemoveListenerInner(napi_env env, const char* type, napi_ref callback);
533     void GetContainerScopeId(napi_env env);
534 
535     void AddGlobalCallObject(const std::string &instanceName, napi_ref obj);
536     bool RemoveGlobalCallObject(const std::string &instanceName);
537     void ClearGlobalCallObject();
538     void AddGlobalCallError(int32_t errCode, napi_value errData = nullptr);
539     void HandleGlobalCallError(napi_env env);
540     void ClearGlobalCallError(napi_env env);
541     void InitGlobalCallStatus(napi_env env);
542     void IncreaseGlobalCallId();
543 
544     void ClearHostMessage(napi_env env);
545 
546 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
547     static void HandleDebuggerTask(const uv_async_t* req);
548     void DebuggerOnPostTask(std::function<void()>&& task);
549 #endif
550 
551     std::string script_ {};
552     std::string fileName_ {};
553     std::string name_ {};
554     ScriptMode scriptMode_ {CLASSIC};
555     bool isLimitedWorker_ {false};
556     bool isRelativePath_ {false};
557     int32_t scopeId_ {-1};
558 
559     MessageQueue workerMessageQueue_ {};
560     MessageQueue hostMessageQueue_ {};
561     std::mutex globalCallMutex_;
562     MarkedMessageQueue hostGlobalCallQueue_ {};
563     MessageQueue workerGlobalCallQueue_ {};
564     MessageQueue errorQueue_ {};
565 
566     uv_async_t* workerOnMessageSignal_ = nullptr;
567     uv_async_t* hostOnMessageSignal_ = nullptr;
568     uv_async_t* hostOnErrorSignal_ = nullptr;
569     uv_async_t* hostOnGlobalCallSignal_ = nullptr;
570 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
571     uv_async_t debuggerOnPostTaskSignal_ {};
572     std::mutex debuggerMutex_;
573     std::queue<DebuggerPostTask> debuggerQueue_ {};
574 #endif
575 
576     std::atomic<RunnerState> runnerState_ {STARTING};
577     std::atomic<HostState> hostState_ {ACTIVE};
578     std::unique_ptr<WorkerRunner> runner_ {};
579 
580     std::atomic<bool> isErrorExit_ = false;
581 
582     napi_env hostEnv_ {nullptr};
583     napi_env workerEnv_ {nullptr};
584 
585     napi_ref workerRef_ {nullptr};
586     napi_ref workerPort_ {nullptr};
587 
588     std::map<std::string, std::list<WorkerListener*>> eventListeners_ {};
589     std::map<std::string, std::list<WorkerListener*>> parentPortEventListeners_ {};
590     std::unordered_map<std::string, napi_ref> globalCallObjects_ {};
591     std::queue<std::pair<int32_t, napi_value>> globalCallErrors_ {};
592     std::atomic<uint32_t> globalCallId_ = 1; // 0: reserved for error check
593 
594     std::recursive_mutex liveStatusLock_ {};
595     std::mutex workerOnmessageMutex_ {};
596 
597     std::condition_variable cv_;
598     std::atomic<bool> globalCallSuccess_ = true;
599     std::function<void(napi_env)> workerEnvCallback_;
600 
601     bool isMainThreadWorker_ = true;
602     bool isNewVersion_ = true;
603     std::atomic<bool> isTerminated_ = false;
604     std::atomic<bool> isHostEnvExited_ = false;
605 
606     friend class WorkersTest;
607 };
608 } // namespace Commonlibrary::Concurrent::WorkerModule
609 #endif // JS_CONCURRENT_MODULE_WORKER_WORKER_H
610