xref: /third_party/node/src/node_worker.cc (revision 1cb0ef41)
1#include "node_worker.h"
2#include "debug_utils-inl.h"
3#include "histogram-inl.h"
4#include "memory_tracker-inl.h"
5#include "node_errors.h"
6#include "node_external_reference.h"
7#include "node_buffer.h"
8#include "node_options-inl.h"
9#include "node_perf.h"
10#include "node_snapshot_builder.h"
11#include "util-inl.h"
12#include "async_wrap-inl.h"
13
14#include <memory>
15#include <string>
16#include <vector>
17
18using node::kAllowedInEnvvar;
19using node::kDisallowedInEnvvar;
20using v8::Array;
21using v8::ArrayBuffer;
22using v8::Boolean;
23using v8::Context;
24using v8::Float64Array;
25using v8::FunctionCallbackInfo;
26using v8::FunctionTemplate;
27using v8::HandleScope;
28using v8::Integer;
29using v8::Isolate;
30using v8::Local;
31using v8::Locker;
32using v8::Maybe;
33using v8::MaybeLocal;
34using v8::Null;
35using v8::Number;
36using v8::Object;
37using v8::ResourceConstraints;
38using v8::SealHandleScope;
39using v8::String;
40using v8::TryCatch;
41using v8::Value;
42
43namespace node {
44namespace worker {
45
46constexpr double kMB = 1024 * 1024;
47
48Worker::Worker(Environment* env,
49               Local<Object> wrap,
50               const std::string& url,
51               const std::string& name,
52               std::shared_ptr<PerIsolateOptions> per_isolate_opts,
53               std::vector<std::string>&& exec_argv,
54               std::shared_ptr<KVStore> env_vars,
55               const SnapshotData* snapshot_data)
56    : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
57      per_isolate_opts_(per_isolate_opts),
58      exec_argv_(exec_argv),
59      platform_(env->isolate_data()->platform()),
60      thread_id_(AllocateEnvironmentThreadId()),
61      name_(name),
62      env_vars_(env_vars),
63      snapshot_data_(snapshot_data) {
64  Debug(this, "Creating new worker instance with thread id %llu",
65        thread_id_.id);
66
67  // Set up everything that needs to be set up in the parent environment.
68  MessagePort* parent_port = MessagePort::New(env, env->context());
69  if (parent_port == nullptr) {
70    // This can happen e.g. because execution is terminating.
71    return;
72  }
73
74  child_port_data_ = std::make_unique<MessagePortData>(nullptr);
75  MessagePort::Entangle(parent_port, child_port_data_.get());
76
77  object()
78      ->Set(env->context(), env->message_port_string(), parent_port->object())
79      .Check();
80
81  object()->Set(env->context(),
82                env->thread_id_string(),
83                Number::New(env->isolate(), static_cast<double>(thread_id_.id)))
84      .Check();
85
86  inspector_parent_handle_ =
87      GetInspectorParentHandle(env, thread_id_, url.c_str(), name.c_str());
88
89  argv_ = std::vector<std::string>{env->argv()[0]};
90  // Mark this Worker object as weak until we actually start the thread.
91  MakeWeak();
92
93  Debug(this, "Preparation for worker %llu finished", thread_id_.id);
94}
95
96bool Worker::is_stopped() const {
97  Mutex::ScopedLock lock(mutex_);
98  if (env_ != nullptr)
99    return env_->is_stopping();
100  return stopped_;
101}
102
103void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) {
104  constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_));
105
106  if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) {
107    constraints->set_max_young_generation_size_in_bytes(
108        static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB));
109  } else {
110    resource_limits_[kMaxYoungGenerationSizeMb] =
111        constraints->max_young_generation_size_in_bytes() / kMB;
112  }
113
114  if (resource_limits_[kMaxOldGenerationSizeMb] > 0) {
115    constraints->set_max_old_generation_size_in_bytes(
116        static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB));
117  } else {
118    resource_limits_[kMaxOldGenerationSizeMb] =
119        constraints->max_old_generation_size_in_bytes() / kMB;
120  }
121
122  if (resource_limits_[kCodeRangeSizeMb] > 0) {
123    constraints->set_code_range_size_in_bytes(
124        static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB));
125  } else {
126    resource_limits_[kCodeRangeSizeMb] =
127        constraints->code_range_size_in_bytes() / kMB;
128  }
129}
130
131// This class contains data that is only relevant to the child thread itself,
132// and only while it is running.
133// (Eventually, the Environment instance should probably also be moved here.)
134class WorkerThreadData {
135 public:
136  explicit WorkerThreadData(Worker* w)
137    : w_(w) {
138    int ret = uv_loop_init(&loop_);
139    if (ret != 0) {
140      char err_buf[128];
141      uv_err_name_r(ret, err_buf, sizeof(err_buf));
142      w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf);
143      return;
144    }
145    loop_init_failed_ = false;
146    uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME);
147
148    std::shared_ptr<ArrayBufferAllocator> allocator =
149        ArrayBufferAllocator::Create();
150    Isolate::CreateParams params;
151    SetIsolateCreateParamsForNode(&params);
152    w->UpdateResourceConstraints(&params.constraints);
153    params.array_buffer_allocator_shared = allocator;
154    Isolate* isolate =
155        NewIsolate(&params, &loop_, w->platform_, w->snapshot_data());
156    if (isolate == nullptr) {
157      w->Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Isolate");
158      return;
159    }
160
161    SetIsolateUpForNode(isolate);
162
163    // Be sure it's called before Environment::InitializeDiagnostics()
164    // so that this callback stays when the callback of
165    // --heapsnapshot-near-heap-limit gets is popped.
166    isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w);
167
168    {
169      Locker locker(isolate);
170      Isolate::Scope isolate_scope(isolate);
171      // V8 computes its stack limit the first time a `Locker` is used based on
172      // --stack-size. Reset it to the correct value.
173      isolate->SetStackLimit(w->stack_base_);
174
175      HandleScope handle_scope(isolate);
176      isolate_data_.reset(CreateIsolateData(isolate,
177                                            &loop_,
178                                            w_->platform_,
179                                            allocator.get()));
180      CHECK(isolate_data_);
181      if (w_->per_isolate_opts_)
182        isolate_data_->set_options(std::move(w_->per_isolate_opts_));
183      isolate_data_->set_worker_context(w_);
184      isolate_data_->max_young_gen_size =
185          params.constraints.max_young_generation_size_in_bytes();
186    }
187
188    Mutex::ScopedLock lock(w_->mutex_);
189    w_->isolate_ = isolate;
190  }
191
192  ~WorkerThreadData() {
193    Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id);
194    Isolate* isolate;
195    {
196      Mutex::ScopedLock lock(w_->mutex_);
197      isolate = w_->isolate_;
198      w_->isolate_ = nullptr;
199    }
200
201    if (isolate != nullptr) {
202      CHECK(!loop_init_failed_);
203      bool platform_finished = false;
204
205      isolate_data_.reset();
206
207      w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) {
208        *static_cast<bool*>(data) = true;
209      }, &platform_finished);
210
211      // The order of these calls is important; if the Isolate is first disposed
212      // and then unregistered, there is a race condition window in which no
213      // new Isolate at the same address can successfully be registered with
214      // the platform.
215      // (Refs: https://github.com/nodejs/node/issues/30846)
216      w_->platform_->UnregisterIsolate(isolate);
217      isolate->Dispose();
218
219      // Wait until the platform has cleaned up all relevant resources.
220      while (!platform_finished) {
221        uv_run(&loop_, UV_RUN_ONCE);
222      }
223    }
224    if (!loop_init_failed_) {
225      CheckedUvLoopClose(&loop_);
226    }
227  }
228
229  bool loop_is_usable() const { return !loop_init_failed_; }
230
231 private:
232  Worker* const w_;
233  uv_loop_t loop_;
234  bool loop_init_failed_ = true;
235  DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
236  const SnapshotData* snapshot_data_ = nullptr;
237  friend class Worker;
238};
239
240size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit,
241                             size_t initial_heap_limit) {
242  Worker* worker = static_cast<Worker*>(data);
243  // Give the current GC some extra leeway to let it finish rather than
244  // crash hard. We are not going to perform further allocations anyway.
245  constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024;
246  size_t new_limit = current_heap_limit + kExtraHeapAllowance;
247  Environment* env = worker->env();
248  if (env != nullptr) {
249    DCHECK(!env->is_in_heapsnapshot_heap_limit_callback());
250    Debug(env,
251          DebugCategory::DIAGNOSTICS,
252          "Throwing ERR_WORKER_OUT_OF_MEMORY, "
253          "new_limit=%" PRIu64 "\n",
254          static_cast<uint64_t>(new_limit));
255  }
256  worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory");
257  return new_limit;
258}
259
260void Worker::Run() {
261  std::string trace_name = "[worker " + std::to_string(thread_id_.id) + "]" +
262                           (name_ == "" ? "" : " " + name_);
263  TRACE_EVENT_METADATA1(
264      "__metadata", "thread_name", "name", TRACE_STR_COPY(trace_name.c_str()));
265  CHECK_NOT_NULL(platform_);
266
267  Debug(this, "Creating isolate for worker with id %llu", thread_id_.id);
268
269  WorkerThreadData data(this);
270  if (isolate_ == nullptr) return;
271  CHECK(data.loop_is_usable());
272
273  Debug(this, "Starting worker with id %llu", thread_id_.id);
274  {
275    Locker locker(isolate_);
276    Isolate::Scope isolate_scope(isolate_);
277    SealHandleScope outer_seal(isolate_);
278
279    DeleteFnPtr<Environment, FreeEnvironment> env_;
280    auto cleanup_env = OnScopeLeave([&]() {
281      // TODO(addaleax): This call is harmless but should not be necessary.
282      // Figure out why V8 is raising a DCHECK() here without it
283      // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js).
284      isolate_->CancelTerminateExecution();
285
286      if (!env_) return;
287      env_->set_can_call_into_js(false);
288
289      {
290        Mutex::ScopedLock lock(mutex_);
291        stopped_ = true;
292        this->env_ = nullptr;
293      }
294
295      env_.reset();
296    });
297
298    if (is_stopped()) return;
299    {
300      HandleScope handle_scope(isolate_);
301      Local<Context> context;
302      {
303        // We create the Context object before we have an Environment* in place
304        // that we could use for error handling. If creation fails due to
305        // resource constraints, we need something in place to handle it,
306        // though.
307        TryCatch try_catch(isolate_);
308        if (snapshot_data_ != nullptr) {
309          context = Context::FromSnapshot(isolate_,
310                                          SnapshotData::kNodeBaseContextIndex)
311                        .ToLocalChecked();
312          if (!context.IsEmpty() &&
313              !InitializeContextRuntime(context).IsJust()) {
314            context = Local<Context>();
315          }
316        } else {
317          context = NewContext(isolate_);
318        }
319        if (context.IsEmpty()) {
320          Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Context");
321          return;
322        }
323      }
324
325      if (is_stopped()) return;
326      CHECK(!context.IsEmpty());
327      Context::Scope context_scope(context);
328      {
329        env_.reset(CreateEnvironment(
330            data.isolate_data_.get(),
331            context,
332            std::move(argv_),
333            std::move(exec_argv_),
334            static_cast<EnvironmentFlags::Flags>(environment_flags_),
335            thread_id_,
336            std::move(inspector_parent_handle_)));
337        if (is_stopped()) return;
338        CHECK_NOT_NULL(env_);
339        env_->set_env_vars(std::move(env_vars_));
340        SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) {
341          Exit(exit_code);
342        });
343      }
344      {
345        Mutex::ScopedLock lock(mutex_);
346        if (stopped_) return;
347        this->env_ = env_.get();
348      }
349      Debug(this, "Created Environment for worker with id %llu", thread_id_.id);
350      if (is_stopped()) return;
351      {
352        if (!CreateEnvMessagePort(env_.get())) {
353          return;
354        }
355
356        Debug(this, "Created message port for worker %llu", thread_id_.id);
357        if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty())
358          return;
359
360        Debug(this, "Loaded environment for worker %llu", thread_id_.id);
361      }
362    }
363
364    {
365      Maybe<int> exit_code = SpinEventLoop(env_.get());
366      Mutex::ScopedLock lock(mutex_);
367      if (exit_code_ == 0 && exit_code.IsJust()) {
368        exit_code_ = exit_code.FromJust();
369      }
370
371      Debug(this, "Exiting thread for worker %llu with exit code %d",
372            thread_id_.id, exit_code_);
373    }
374  }
375
376  Debug(this, "Worker %llu thread stops", thread_id_.id);
377}
378
379bool Worker::CreateEnvMessagePort(Environment* env) {
380  HandleScope handle_scope(isolate_);
381  std::unique_ptr<MessagePortData> data;
382  {
383    Mutex::ScopedLock lock(mutex_);
384    data = std::move(child_port_data_);
385  }
386
387  // Set up the message channel for receiving messages in the child.
388  MessagePort* child_port = MessagePort::New(env,
389                                             env->context(),
390                                             std::move(data));
391  // MessagePort::New() may return nullptr if execution is terminated
392  // within it.
393  if (child_port != nullptr)
394    env->set_message_port(child_port->object(isolate_));
395
396  return child_port;
397}
398
399void Worker::JoinThread() {
400  if (!tid_.has_value())
401    return;
402  CHECK_EQ(uv_thread_join(&tid_.value()), 0);
403  tid_.reset();
404
405  env()->remove_sub_worker_context(this);
406
407  {
408    HandleScope handle_scope(env()->isolate());
409    Context::Scope context_scope(env()->context());
410
411    // Reset the parent port as we're closing it now anyway.
412    object()->Set(env()->context(),
413                  env()->message_port_string(),
414                  Undefined(env()->isolate())).Check();
415
416    Local<Value> args[] = {
417        Integer::New(env()->isolate(), exit_code_),
418        custom_error_ != nullptr
419            ? OneByteString(env()->isolate(), custom_error_).As<Value>()
420            : Null(env()->isolate()).As<Value>(),
421        !custom_error_str_.empty()
422            ? OneByteString(env()->isolate(), custom_error_str_.c_str())
423                  .As<Value>()
424            : Null(env()->isolate()).As<Value>(),
425    };
426
427    MakeCallback(env()->onexit_string(), arraysize(args), args);
428  }
429
430  // If we get here, the tid_.has_value() condition at the top of the function
431  // implies that the thread was running. In that case, its final action will
432  // be to schedule a callback on the parent thread which will delete this
433  // object, so there's nothing more to do here.
434}
435
436Worker::~Worker() {
437  Mutex::ScopedLock lock(mutex_);
438
439  CHECK(stopped_);
440  CHECK_NULL(env_);
441  CHECK(!tid_.has_value());
442
443  Debug(this, "Worker %llu destroyed", thread_id_.id);
444}
445
446void Worker::New(const FunctionCallbackInfo<Value>& args) {
447  Environment* env = Environment::GetCurrent(args);
448  auto is_internal = args[5];
449  CHECK(is_internal->IsBoolean());
450  Isolate* isolate = args.GetIsolate();
451
452  CHECK(args.IsConstructCall());
453
454  if (env->isolate_data()->platform() == nullptr) {
455    THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
456    return;
457  }
458
459  std::string url;
460  std::string name;
461  std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr;
462  std::shared_ptr<KVStore> env_vars = nullptr;
463
464  std::vector<std::string> exec_argv_out;
465
466  // Argument might be a string or URL
467  if (!args[0]->IsNullOrUndefined()) {
468    Utf8Value value(
469        isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>()));
470    url.append(value.out(), value.length());
471  }
472
473  if (!args[6]->IsNullOrUndefined()) {
474    Utf8Value value(
475        isolate, args[6]->ToString(env->context()).FromMaybe(Local<String>()));
476    name.append(value.out(), value.length());
477  }
478
479  if (args[1]->IsNull()) {
480    // Means worker.env = { ...process.env }.
481    env_vars = env->env_vars()->Clone(isolate);
482  } else if (args[1]->IsObject()) {
483    // User provided env.
484    env_vars = KVStore::CreateMapKVStore();
485    env_vars->AssignFromObject(isolate->GetCurrentContext(),
486                               args[1].As<Object>());
487  } else {
488    // Env is shared.
489    env_vars = env->env_vars();
490  }
491
492  if (args[1]->IsObject() || args[2]->IsArray()) {
493    per_isolate_opts.reset(new PerIsolateOptions());
494
495    HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) {
496      return env_vars->Get(name).FromMaybe("");
497    });
498
499#ifndef NODE_WITHOUT_NODE_OPTIONS
500    MaybeLocal<String> maybe_node_opts =
501        env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS"));
502    Local<String> node_opts;
503    if (maybe_node_opts.ToLocal(&node_opts)) {
504      std::string node_options(*String::Utf8Value(isolate, node_opts));
505      std::vector<std::string> errors{};
506      std::vector<std::string> env_argv =
507          ParseNodeOptionsEnvVar(node_options, &errors);
508      // [0] is expected to be the program name, add dummy string.
509      env_argv.insert(env_argv.begin(), "");
510      std::vector<std::string> invalid_args{};
511      options_parser::Parse(&env_argv,
512                            nullptr,
513                            &invalid_args,
514                            per_isolate_opts.get(),
515                            kAllowedInEnvvar,
516                            &errors);
517      if (!errors.empty() && args[1]->IsObject()) {
518        // Only fail for explicitly provided env, this protects from failures
519        // when NODE_OPTIONS from parent's env is used (which is the default).
520        Local<Value> error;
521        if (!ToV8Value(env->context(), errors).ToLocal(&error)) return;
522        Local<String> key =
523            FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions");
524        // Ignore the return value of Set() because exceptions bubble up to JS
525        // when we return anyway.
526        USE(args.This()->Set(env->context(), key, error));
527        return;
528      }
529    }
530#endif  // NODE_WITHOUT_NODE_OPTIONS
531  }
532
533  if (args[2]->IsArray()) {
534    Local<Array> array = args[2].As<Array>();
535    // The first argument is reserved for program name, but we don't need it
536    // in workers.
537    std::vector<std::string> exec_argv = {""};
538    uint32_t length = array->Length();
539    for (uint32_t i = 0; i < length; i++) {
540      Local<Value> arg;
541      if (!array->Get(env->context(), i).ToLocal(&arg)) {
542        return;
543      }
544      Local<String> arg_v8;
545      if (!arg->ToString(env->context()).ToLocal(&arg_v8)) {
546        return;
547      }
548      Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8);
549      std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length());
550      exec_argv.push_back(arg_string);
551    }
552
553    std::vector<std::string> invalid_args{};
554    std::vector<std::string> errors{};
555    // Using invalid_args as the v8_args argument as it stores unknown
556    // options for the per isolate parser.
557    options_parser::Parse(&exec_argv,
558                          &exec_argv_out,
559                          &invalid_args,
560                          per_isolate_opts.get(),
561                          kDisallowedInEnvvar,
562                          &errors);
563
564    // The first argument is program name.
565    invalid_args.erase(invalid_args.begin());
566    if (errors.size() > 0 || invalid_args.size() > 0) {
567      Local<Value> error;
568      if (!ToV8Value(env->context(),
569                     errors.size() > 0 ? errors : invalid_args)
570                         .ToLocal(&error)) {
571        return;
572      }
573      Local<String> key =
574          FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv");
575      // Ignore the return value of Set() because exceptions bubble up to JS
576      // when we return anyway.
577      USE(args.This()->Set(env->context(), key, error));
578      return;
579    }
580  } else {
581    exec_argv_out = env->exec_argv();
582  }
583
584  bool use_node_snapshot = per_process::cli_options->node_snapshot;
585  const SnapshotData* snapshot_data =
586      use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData() : nullptr;
587
588  Worker* worker = new Worker(env,
589                              args.This(),
590                              url,
591                              name,
592                              per_isolate_opts,
593                              std::move(exec_argv_out),
594                              env_vars,
595                              snapshot_data);
596
597  CHECK(args[3]->IsFloat64Array());
598  Local<Float64Array> limit_info = args[3].As<Float64Array>();
599  CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount);
600  limit_info->CopyContents(worker->resource_limits_,
601                           sizeof(worker->resource_limits_));
602
603  CHECK(args[4]->IsBoolean());
604  if (args[4]->IsTrue() || env->tracks_unmanaged_fds())
605    worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds;
606  if (env->hide_console_windows())
607    worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows;
608  if (env->no_native_addons())
609    worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons;
610  if (env->no_global_search_paths())
611    worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths;
612  if (env->no_browser_globals())
613    worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals;
614}
615
616void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
617  Worker* w;
618  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
619  Mutex::ScopedLock lock(w->mutex_);
620
621  w->stopped_ = false;
622
623  if (w->resource_limits_[kStackSizeMb] > 0) {
624    if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) {
625      w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB;
626      w->stack_size_ = kStackBufferSize;
627    } else {
628      w->stack_size_ =
629          static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB);
630    }
631  } else {
632    w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB;
633  }
634
635  uv_thread_options_t thread_options;
636  thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
637  thread_options.stack_size = w->stack_size_;
638
639  uv_thread_t* tid = &w->tid_.emplace();  // Create uv_thread_t instance
640  int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) {
641    // XXX: This could become a std::unique_ptr, but that makes at least
642    // gcc 6.3 detect undefined behaviour when there shouldn't be any.
643    // gcc 7+ handles this well.
644    Worker* w = static_cast<Worker*>(arg);
645    const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
646
647    // Leave a few kilobytes just to make sure we're within limits and have
648    // some space to do work in C++ land.
649    w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize);
650
651    w->Run();
652
653    Mutex::ScopedLock lock(w->mutex_);
654    w->env()->SetImmediateThreadsafe(
655        [w = std::unique_ptr<Worker>(w)](Environment* env) {
656          if (w->has_ref_)
657            env->add_refs(-1);
658          w->JoinThread();
659          // implicitly delete w
660        });
661  }, static_cast<void*>(w));
662
663  if (ret == 0) {
664    // The object now owns the created thread and should not be garbage
665    // collected until that finishes.
666    w->ClearWeak();
667
668    if (w->has_ref_)
669      w->env()->add_refs(1);
670
671    w->env()->add_sub_worker_context(w);
672  } else {
673    w->stopped_ = true;
674    w->tid_.reset();
675
676    char err_buf[128];
677    uv_err_name_r(ret, err_buf, sizeof(err_buf));
678    {
679      Isolate* isolate = w->env()->isolate();
680      HandleScope handle_scope(isolate);
681      THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf);
682    }
683  }
684}
685
686void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
687  Worker* w;
688  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
689
690  Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id);
691  w->Exit(1);
692}
693
694void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
695  Worker* w;
696  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
697  if (!w->has_ref_ && w->tid_.has_value()) {
698    w->has_ref_ = true;
699    w->env()->add_refs(1);
700  }
701}
702
703void Worker::HasRef(const FunctionCallbackInfo<Value>& args) {
704  Worker* w;
705  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
706  args.GetReturnValue().Set(w->has_ref_);
707}
708
709void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
710  Worker* w;
711  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
712  if (w->has_ref_ && w->tid_.has_value()) {
713    w->has_ref_ = false;
714    w->env()->add_refs(-1);
715  }
716}
717
718void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
719  Worker* w;
720  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
721  args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate()));
722}
723
724Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const {
725  Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_));
726
727  memcpy(ab->Data(), resource_limits_, sizeof(resource_limits_));
728  return Float64Array::New(ab, 0, kTotalResourceLimitCount);
729}
730
731void Worker::Exit(int code, const char* error_code, const char* error_message) {
732  Mutex::ScopedLock lock(mutex_);
733  Debug(this, "Worker %llu called Exit(%d, %s, %s)",
734        thread_id_.id, code, error_code, error_message);
735
736  if (error_code != nullptr) {
737    custom_error_ = error_code;
738    custom_error_str_ = error_message;
739  }
740
741  if (env_ != nullptr) {
742    exit_code_ = code;
743    Stop(env_);
744  } else {
745    stopped_ = true;
746  }
747}
748
749bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const {
750  // Worker objects always stay alive as long as the child thread, regardless
751  // of whether they are being referenced in the parent thread.
752  return true;
753}
754
755class WorkerHeapSnapshotTaker : public AsyncWrap {
756 public:
757  WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj)
758    : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {}
759
760  SET_NO_MEMORY_INFO()
761  SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker)
762  SET_SELF_SIZE(WorkerHeapSnapshotTaker)
763};
764
765void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
766  Worker* w;
767  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
768
769  Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id);
770
771  Environment* env = w->env();
772  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
773  Local<Object> wrap;
774  if (!env->worker_heap_snapshot_taker_template()
775      ->NewInstance(env->context()).ToLocal(&wrap)) {
776    return;
777  }
778
779  // The created WorkerHeapSnapshotTaker is an object owned by main
780  // thread's Isolate, it can not be accessed by worker thread
781  std::unique_ptr<BaseObjectPtr<WorkerHeapSnapshotTaker>> taker =
782      std::make_unique<BaseObjectPtr<WorkerHeapSnapshotTaker>>(
783          MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap));
784
785  // Interrupt the worker thread and take a snapshot, then schedule a call
786  // on the parent thread that turns that snapshot into a readable stream.
787  bool scheduled = w->RequestInterrupt([taker = std::move(taker),
788                                        env](Environment* worker_env) mutable {
789    heap::HeapSnapshotPointer snapshot{
790        worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot()};
791    CHECK(snapshot);
792
793    // Here, the worker thread temporarily owns the WorkerHeapSnapshotTaker
794    // object.
795
796    env->SetImmediateThreadsafe(
797        [taker = std::move(taker),
798         snapshot = std::move(snapshot)](Environment* env) mutable {
799          HandleScope handle_scope(env->isolate());
800          Context::Scope context_scope(env->context());
801
802          AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
803          BaseObjectPtr<AsyncWrap> stream =
804              heap::CreateHeapSnapshotStream(env, std::move(snapshot));
805          Local<Value> args[] = {stream->object()};
806          taker->get()->MakeCallback(
807              env->ondone_string(), arraysize(args), args);
808          // implicitly delete `taker`
809        },
810        CallbackFlags::kUnrefed);
811
812    // Now, the lambda is delivered to the main thread, as a result, the
813    // WorkerHeapSnapshotTaker object is delivered to the main thread, too.
814  });
815
816  if (scheduled) {
817    args.GetReturnValue().Set(wrap);
818  } else {
819    args.GetReturnValue().Set(Local<Object>());
820  }
821}
822
823void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) {
824  Worker* w;
825  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
826
827  Mutex::ScopedLock lock(w->mutex_);
828  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
829  // before locking the mutex is a race condition. So manually do the same
830  // check.
831  if (w->stopped_ || w->env_ == nullptr)
832    return args.GetReturnValue().Set(-1);
833
834  uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop());
835  args.GetReturnValue().Set(1.0 * idle_time / 1e6);
836}
837
838void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) {
839  Worker* w;
840  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
841
842  Mutex::ScopedLock lock(w->mutex_);
843  // Using w->is_stopped() here leads to a deadlock, and checking is_stopped()
844  // before locking the mutex is a race condition. So manually do the same
845  // check.
846  if (w->stopped_ || w->env_ == nullptr)
847    return args.GetReturnValue().Set(-1);
848
849  double loop_start_time = w->env_->performance_state()->milestones[
850      node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START];
851  CHECK_GE(loop_start_time, 0);
852  args.GetReturnValue().Set(
853      (loop_start_time - node::performance::timeOrigin) / 1e6);
854}
855
856namespace {
857
858// Return the MessagePort that is global for this Environment and communicates
859// with the internal [kPort] port of the JS Worker class in the parent thread.
860void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
861  Environment* env = Environment::GetCurrent(args);
862  Local<Object> port = env->message_port();
863  CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty());
864  if (!port.IsEmpty()) {
865    CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(),
866             args.GetIsolate());
867    args.GetReturnValue().Set(port);
868  }
869}
870
871void InitWorker(Local<Object> target,
872                Local<Value> unused,
873                Local<Context> context,
874                void* priv) {
875  Environment* env = Environment::GetCurrent(context);
876  Isolate* isolate = env->isolate();
877
878  {
879    Local<FunctionTemplate> w = NewFunctionTemplate(isolate, Worker::New);
880
881    w->InstanceTemplate()->SetInternalFieldCount(
882        Worker::kInternalFieldCount);
883    w->Inherit(AsyncWrap::GetConstructorTemplate(env));
884
885    SetProtoMethod(isolate, w, "startThread", Worker::StartThread);
886    SetProtoMethod(isolate, w, "stopThread", Worker::StopThread);
887    SetProtoMethod(isolate, w, "hasRef", Worker::HasRef);
888    SetProtoMethod(isolate, w, "ref", Worker::Ref);
889    SetProtoMethod(isolate, w, "unref", Worker::Unref);
890    SetProtoMethod(isolate, w, "getResourceLimits", Worker::GetResourceLimits);
891    SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
892    SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
893    SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
894
895    SetConstructorFunction(context, target, "Worker", w);
896  }
897
898  {
899    Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
900
901    wst->InstanceTemplate()->SetInternalFieldCount(
902        WorkerHeapSnapshotTaker::kInternalFieldCount);
903    wst->Inherit(AsyncWrap::GetConstructorTemplate(env));
904
905    Local<String> wst_string =
906        FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapSnapshotTaker");
907    wst->SetClassName(wst_string);
908    env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate());
909  }
910
911  SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort);
912
913  target
914      ->Set(env->context(),
915            env->thread_id_string(),
916            Number::New(isolate, static_cast<double>(env->thread_id())))
917      .Check();
918
919  target
920      ->Set(env->context(),
921            FIXED_ONE_BYTE_STRING(isolate, "isMainThread"),
922            Boolean::New(isolate, env->is_main_thread()))
923      .Check();
924
925  target
926      ->Set(env->context(),
927            FIXED_ONE_BYTE_STRING(isolate, "ownsProcessState"),
928            Boolean::New(isolate, env->owns_process_state()))
929      .Check();
930
931  if (!env->is_main_thread()) {
932    target
933        ->Set(env->context(),
934              FIXED_ONE_BYTE_STRING(isolate, "resourceLimits"),
935              env->worker_context()->GetResourceLimits(isolate))
936        .Check();
937  }
938
939  NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
940  NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
941  NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
942  NODE_DEFINE_CONSTANT(target, kStackSizeMb);
943  NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
944}
945
946void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
947  registry->Register(GetEnvMessagePort);
948  registry->Register(Worker::New);
949  registry->Register(Worker::StartThread);
950  registry->Register(Worker::StopThread);
951  registry->Register(Worker::HasRef);
952  registry->Register(Worker::Ref);
953  registry->Register(Worker::Unref);
954  registry->Register(Worker::GetResourceLimits);
955  registry->Register(Worker::TakeHeapSnapshot);
956  registry->Register(Worker::LoopIdleTime);
957  registry->Register(Worker::LoopStartTime);
958}
959
960}  // anonymous namespace
961}  // namespace worker
962}  // namespace node
963
964NODE_BINDING_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
965NODE_BINDING_EXTERNAL_REFERENCE(worker,
966                                node::worker::RegisterExternalReferences)
967