1#include "node_worker.h" 2#include "debug_utils-inl.h" 3#include "histogram-inl.h" 4#include "memory_tracker-inl.h" 5#include "node_errors.h" 6#include "node_external_reference.h" 7#include "node_buffer.h" 8#include "node_options-inl.h" 9#include "node_perf.h" 10#include "node_snapshot_builder.h" 11#include "util-inl.h" 12#include "async_wrap-inl.h" 13 14#include <memory> 15#include <string> 16#include <vector> 17 18using node::kAllowedInEnvvar; 19using node::kDisallowedInEnvvar; 20using v8::Array; 21using v8::ArrayBuffer; 22using v8::Boolean; 23using v8::Context; 24using v8::Float64Array; 25using v8::FunctionCallbackInfo; 26using v8::FunctionTemplate; 27using v8::HandleScope; 28using v8::Integer; 29using v8::Isolate; 30using v8::Local; 31using v8::Locker; 32using v8::Maybe; 33using v8::MaybeLocal; 34using v8::Null; 35using v8::Number; 36using v8::Object; 37using v8::ResourceConstraints; 38using v8::SealHandleScope; 39using v8::String; 40using v8::TryCatch; 41using v8::Value; 42 43namespace node { 44namespace worker { 45 46constexpr double kMB = 1024 * 1024; 47 48Worker::Worker(Environment* env, 49 Local<Object> wrap, 50 const std::string& url, 51 const std::string& name, 52 std::shared_ptr<PerIsolateOptions> per_isolate_opts, 53 std::vector<std::string>&& exec_argv, 54 std::shared_ptr<KVStore> env_vars, 55 const SnapshotData* snapshot_data) 56 : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER), 57 per_isolate_opts_(per_isolate_opts), 58 exec_argv_(exec_argv), 59 platform_(env->isolate_data()->platform()), 60 thread_id_(AllocateEnvironmentThreadId()), 61 name_(name), 62 env_vars_(env_vars), 63 snapshot_data_(snapshot_data) { 64 Debug(this, "Creating new worker instance with thread id %llu", 65 thread_id_.id); 66 67 // Set up everything that needs to be set up in the parent environment. 68 MessagePort* parent_port = MessagePort::New(env, env->context()); 69 if (parent_port == nullptr) { 70 // This can happen e.g. because execution is terminating. 71 return; 72 } 73 74 child_port_data_ = std::make_unique<MessagePortData>(nullptr); 75 MessagePort::Entangle(parent_port, child_port_data_.get()); 76 77 object() 78 ->Set(env->context(), env->message_port_string(), parent_port->object()) 79 .Check(); 80 81 object()->Set(env->context(), 82 env->thread_id_string(), 83 Number::New(env->isolate(), static_cast<double>(thread_id_.id))) 84 .Check(); 85 86 inspector_parent_handle_ = 87 GetInspectorParentHandle(env, thread_id_, url.c_str(), name.c_str()); 88 89 argv_ = std::vector<std::string>{env->argv()[0]}; 90 // Mark this Worker object as weak until we actually start the thread. 91 MakeWeak(); 92 93 Debug(this, "Preparation for worker %llu finished", thread_id_.id); 94} 95 96bool Worker::is_stopped() const { 97 Mutex::ScopedLock lock(mutex_); 98 if (env_ != nullptr) 99 return env_->is_stopping(); 100 return stopped_; 101} 102 103void Worker::UpdateResourceConstraints(ResourceConstraints* constraints) { 104 constraints->set_stack_limit(reinterpret_cast<uint32_t*>(stack_base_)); 105 106 if (resource_limits_[kMaxYoungGenerationSizeMb] > 0) { 107 constraints->set_max_young_generation_size_in_bytes( 108 static_cast<size_t>(resource_limits_[kMaxYoungGenerationSizeMb] * kMB)); 109 } else { 110 resource_limits_[kMaxYoungGenerationSizeMb] = 111 constraints->max_young_generation_size_in_bytes() / kMB; 112 } 113 114 if (resource_limits_[kMaxOldGenerationSizeMb] > 0) { 115 constraints->set_max_old_generation_size_in_bytes( 116 static_cast<size_t>(resource_limits_[kMaxOldGenerationSizeMb] * kMB)); 117 } else { 118 resource_limits_[kMaxOldGenerationSizeMb] = 119 constraints->max_old_generation_size_in_bytes() / kMB; 120 } 121 122 if (resource_limits_[kCodeRangeSizeMb] > 0) { 123 constraints->set_code_range_size_in_bytes( 124 static_cast<size_t>(resource_limits_[kCodeRangeSizeMb] * kMB)); 125 } else { 126 resource_limits_[kCodeRangeSizeMb] = 127 constraints->code_range_size_in_bytes() / kMB; 128 } 129} 130 131// This class contains data that is only relevant to the child thread itself, 132// and only while it is running. 133// (Eventually, the Environment instance should probably also be moved here.) 134class WorkerThreadData { 135 public: 136 explicit WorkerThreadData(Worker* w) 137 : w_(w) { 138 int ret = uv_loop_init(&loop_); 139 if (ret != 0) { 140 char err_buf[128]; 141 uv_err_name_r(ret, err_buf, sizeof(err_buf)); 142 w->Exit(1, "ERR_WORKER_INIT_FAILED", err_buf); 143 return; 144 } 145 loop_init_failed_ = false; 146 uv_loop_configure(&loop_, UV_METRICS_IDLE_TIME); 147 148 std::shared_ptr<ArrayBufferAllocator> allocator = 149 ArrayBufferAllocator::Create(); 150 Isolate::CreateParams params; 151 SetIsolateCreateParamsForNode(¶ms); 152 w->UpdateResourceConstraints(¶ms.constraints); 153 params.array_buffer_allocator_shared = allocator; 154 Isolate* isolate = 155 NewIsolate(¶ms, &loop_, w->platform_, w->snapshot_data()); 156 if (isolate == nullptr) { 157 w->Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Isolate"); 158 return; 159 } 160 161 SetIsolateUpForNode(isolate); 162 163 // Be sure it's called before Environment::InitializeDiagnostics() 164 // so that this callback stays when the callback of 165 // --heapsnapshot-near-heap-limit gets is popped. 166 isolate->AddNearHeapLimitCallback(Worker::NearHeapLimit, w); 167 168 { 169 Locker locker(isolate); 170 Isolate::Scope isolate_scope(isolate); 171 // V8 computes its stack limit the first time a `Locker` is used based on 172 // --stack-size. Reset it to the correct value. 173 isolate->SetStackLimit(w->stack_base_); 174 175 HandleScope handle_scope(isolate); 176 isolate_data_.reset(CreateIsolateData(isolate, 177 &loop_, 178 w_->platform_, 179 allocator.get())); 180 CHECK(isolate_data_); 181 if (w_->per_isolate_opts_) 182 isolate_data_->set_options(std::move(w_->per_isolate_opts_)); 183 isolate_data_->set_worker_context(w_); 184 isolate_data_->max_young_gen_size = 185 params.constraints.max_young_generation_size_in_bytes(); 186 } 187 188 Mutex::ScopedLock lock(w_->mutex_); 189 w_->isolate_ = isolate; 190 } 191 192 ~WorkerThreadData() { 193 Debug(w_, "Worker %llu dispose isolate", w_->thread_id_.id); 194 Isolate* isolate; 195 { 196 Mutex::ScopedLock lock(w_->mutex_); 197 isolate = w_->isolate_; 198 w_->isolate_ = nullptr; 199 } 200 201 if (isolate != nullptr) { 202 CHECK(!loop_init_failed_); 203 bool platform_finished = false; 204 205 isolate_data_.reset(); 206 207 w_->platform_->AddIsolateFinishedCallback(isolate, [](void* data) { 208 *static_cast<bool*>(data) = true; 209 }, &platform_finished); 210 211 // The order of these calls is important; if the Isolate is first disposed 212 // and then unregistered, there is a race condition window in which no 213 // new Isolate at the same address can successfully be registered with 214 // the platform. 215 // (Refs: https://github.com/nodejs/node/issues/30846) 216 w_->platform_->UnregisterIsolate(isolate); 217 isolate->Dispose(); 218 219 // Wait until the platform has cleaned up all relevant resources. 220 while (!platform_finished) { 221 uv_run(&loop_, UV_RUN_ONCE); 222 } 223 } 224 if (!loop_init_failed_) { 225 CheckedUvLoopClose(&loop_); 226 } 227 } 228 229 bool loop_is_usable() const { return !loop_init_failed_; } 230 231 private: 232 Worker* const w_; 233 uv_loop_t loop_; 234 bool loop_init_failed_ = true; 235 DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_; 236 const SnapshotData* snapshot_data_ = nullptr; 237 friend class Worker; 238}; 239 240size_t Worker::NearHeapLimit(void* data, size_t current_heap_limit, 241 size_t initial_heap_limit) { 242 Worker* worker = static_cast<Worker*>(data); 243 // Give the current GC some extra leeway to let it finish rather than 244 // crash hard. We are not going to perform further allocations anyway. 245 constexpr size_t kExtraHeapAllowance = 16 * 1024 * 1024; 246 size_t new_limit = current_heap_limit + kExtraHeapAllowance; 247 Environment* env = worker->env(); 248 if (env != nullptr) { 249 DCHECK(!env->is_in_heapsnapshot_heap_limit_callback()); 250 Debug(env, 251 DebugCategory::DIAGNOSTICS, 252 "Throwing ERR_WORKER_OUT_OF_MEMORY, " 253 "new_limit=%" PRIu64 "\n", 254 static_cast<uint64_t>(new_limit)); 255 } 256 worker->Exit(1, "ERR_WORKER_OUT_OF_MEMORY", "JS heap out of memory"); 257 return new_limit; 258} 259 260void Worker::Run() { 261 std::string trace_name = "[worker " + std::to_string(thread_id_.id) + "]" + 262 (name_ == "" ? "" : " " + name_); 263 TRACE_EVENT_METADATA1( 264 "__metadata", "thread_name", "name", TRACE_STR_COPY(trace_name.c_str())); 265 CHECK_NOT_NULL(platform_); 266 267 Debug(this, "Creating isolate for worker with id %llu", thread_id_.id); 268 269 WorkerThreadData data(this); 270 if (isolate_ == nullptr) return; 271 CHECK(data.loop_is_usable()); 272 273 Debug(this, "Starting worker with id %llu", thread_id_.id); 274 { 275 Locker locker(isolate_); 276 Isolate::Scope isolate_scope(isolate_); 277 SealHandleScope outer_seal(isolate_); 278 279 DeleteFnPtr<Environment, FreeEnvironment> env_; 280 auto cleanup_env = OnScopeLeave([&]() { 281 // TODO(addaleax): This call is harmless but should not be necessary. 282 // Figure out why V8 is raising a DCHECK() here without it 283 // (in test/parallel/test-async-hooks-worker-asyncfn-terminate-4.js). 284 isolate_->CancelTerminateExecution(); 285 286 if (!env_) return; 287 env_->set_can_call_into_js(false); 288 289 { 290 Mutex::ScopedLock lock(mutex_); 291 stopped_ = true; 292 this->env_ = nullptr; 293 } 294 295 env_.reset(); 296 }); 297 298 if (is_stopped()) return; 299 { 300 HandleScope handle_scope(isolate_); 301 Local<Context> context; 302 { 303 // We create the Context object before we have an Environment* in place 304 // that we could use for error handling. If creation fails due to 305 // resource constraints, we need something in place to handle it, 306 // though. 307 TryCatch try_catch(isolate_); 308 if (snapshot_data_ != nullptr) { 309 context = Context::FromSnapshot(isolate_, 310 SnapshotData::kNodeBaseContextIndex) 311 .ToLocalChecked(); 312 if (!context.IsEmpty() && 313 !InitializeContextRuntime(context).IsJust()) { 314 context = Local<Context>(); 315 } 316 } else { 317 context = NewContext(isolate_); 318 } 319 if (context.IsEmpty()) { 320 Exit(1, "ERR_WORKER_INIT_FAILED", "Failed to create new Context"); 321 return; 322 } 323 } 324 325 if (is_stopped()) return; 326 CHECK(!context.IsEmpty()); 327 Context::Scope context_scope(context); 328 { 329 env_.reset(CreateEnvironment( 330 data.isolate_data_.get(), 331 context, 332 std::move(argv_), 333 std::move(exec_argv_), 334 static_cast<EnvironmentFlags::Flags>(environment_flags_), 335 thread_id_, 336 std::move(inspector_parent_handle_))); 337 if (is_stopped()) return; 338 CHECK_NOT_NULL(env_); 339 env_->set_env_vars(std::move(env_vars_)); 340 SetProcessExitHandler(env_.get(), [this](Environment*, int exit_code) { 341 Exit(exit_code); 342 }); 343 } 344 { 345 Mutex::ScopedLock lock(mutex_); 346 if (stopped_) return; 347 this->env_ = env_.get(); 348 } 349 Debug(this, "Created Environment for worker with id %llu", thread_id_.id); 350 if (is_stopped()) return; 351 { 352 if (!CreateEnvMessagePort(env_.get())) { 353 return; 354 } 355 356 Debug(this, "Created message port for worker %llu", thread_id_.id); 357 if (LoadEnvironment(env_.get(), StartExecutionCallback{}).IsEmpty()) 358 return; 359 360 Debug(this, "Loaded environment for worker %llu", thread_id_.id); 361 } 362 } 363 364 { 365 Maybe<int> exit_code = SpinEventLoop(env_.get()); 366 Mutex::ScopedLock lock(mutex_); 367 if (exit_code_ == 0 && exit_code.IsJust()) { 368 exit_code_ = exit_code.FromJust(); 369 } 370 371 Debug(this, "Exiting thread for worker %llu with exit code %d", 372 thread_id_.id, exit_code_); 373 } 374 } 375 376 Debug(this, "Worker %llu thread stops", thread_id_.id); 377} 378 379bool Worker::CreateEnvMessagePort(Environment* env) { 380 HandleScope handle_scope(isolate_); 381 std::unique_ptr<MessagePortData> data; 382 { 383 Mutex::ScopedLock lock(mutex_); 384 data = std::move(child_port_data_); 385 } 386 387 // Set up the message channel for receiving messages in the child. 388 MessagePort* child_port = MessagePort::New(env, 389 env->context(), 390 std::move(data)); 391 // MessagePort::New() may return nullptr if execution is terminated 392 // within it. 393 if (child_port != nullptr) 394 env->set_message_port(child_port->object(isolate_)); 395 396 return child_port; 397} 398 399void Worker::JoinThread() { 400 if (!tid_.has_value()) 401 return; 402 CHECK_EQ(uv_thread_join(&tid_.value()), 0); 403 tid_.reset(); 404 405 env()->remove_sub_worker_context(this); 406 407 { 408 HandleScope handle_scope(env()->isolate()); 409 Context::Scope context_scope(env()->context()); 410 411 // Reset the parent port as we're closing it now anyway. 412 object()->Set(env()->context(), 413 env()->message_port_string(), 414 Undefined(env()->isolate())).Check(); 415 416 Local<Value> args[] = { 417 Integer::New(env()->isolate(), exit_code_), 418 custom_error_ != nullptr 419 ? OneByteString(env()->isolate(), custom_error_).As<Value>() 420 : Null(env()->isolate()).As<Value>(), 421 !custom_error_str_.empty() 422 ? OneByteString(env()->isolate(), custom_error_str_.c_str()) 423 .As<Value>() 424 : Null(env()->isolate()).As<Value>(), 425 }; 426 427 MakeCallback(env()->onexit_string(), arraysize(args), args); 428 } 429 430 // If we get here, the tid_.has_value() condition at the top of the function 431 // implies that the thread was running. In that case, its final action will 432 // be to schedule a callback on the parent thread which will delete this 433 // object, so there's nothing more to do here. 434} 435 436Worker::~Worker() { 437 Mutex::ScopedLock lock(mutex_); 438 439 CHECK(stopped_); 440 CHECK_NULL(env_); 441 CHECK(!tid_.has_value()); 442 443 Debug(this, "Worker %llu destroyed", thread_id_.id); 444} 445 446void Worker::New(const FunctionCallbackInfo<Value>& args) { 447 Environment* env = Environment::GetCurrent(args); 448 auto is_internal = args[5]; 449 CHECK(is_internal->IsBoolean()); 450 Isolate* isolate = args.GetIsolate(); 451 452 CHECK(args.IsConstructCall()); 453 454 if (env->isolate_data()->platform() == nullptr) { 455 THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env); 456 return; 457 } 458 459 std::string url; 460 std::string name; 461 std::shared_ptr<PerIsolateOptions> per_isolate_opts = nullptr; 462 std::shared_ptr<KVStore> env_vars = nullptr; 463 464 std::vector<std::string> exec_argv_out; 465 466 // Argument might be a string or URL 467 if (!args[0]->IsNullOrUndefined()) { 468 Utf8Value value( 469 isolate, args[0]->ToString(env->context()).FromMaybe(Local<String>())); 470 url.append(value.out(), value.length()); 471 } 472 473 if (!args[6]->IsNullOrUndefined()) { 474 Utf8Value value( 475 isolate, args[6]->ToString(env->context()).FromMaybe(Local<String>())); 476 name.append(value.out(), value.length()); 477 } 478 479 if (args[1]->IsNull()) { 480 // Means worker.env = { ...process.env }. 481 env_vars = env->env_vars()->Clone(isolate); 482 } else if (args[1]->IsObject()) { 483 // User provided env. 484 env_vars = KVStore::CreateMapKVStore(); 485 env_vars->AssignFromObject(isolate->GetCurrentContext(), 486 args[1].As<Object>()); 487 } else { 488 // Env is shared. 489 env_vars = env->env_vars(); 490 } 491 492 if (args[1]->IsObject() || args[2]->IsArray()) { 493 per_isolate_opts.reset(new PerIsolateOptions()); 494 495 HandleEnvOptions(per_isolate_opts->per_env, [&env_vars](const char* name) { 496 return env_vars->Get(name).FromMaybe(""); 497 }); 498 499#ifndef NODE_WITHOUT_NODE_OPTIONS 500 MaybeLocal<String> maybe_node_opts = 501 env_vars->Get(isolate, OneByteString(isolate, "NODE_OPTIONS")); 502 Local<String> node_opts; 503 if (maybe_node_opts.ToLocal(&node_opts)) { 504 std::string node_options(*String::Utf8Value(isolate, node_opts)); 505 std::vector<std::string> errors{}; 506 std::vector<std::string> env_argv = 507 ParseNodeOptionsEnvVar(node_options, &errors); 508 // [0] is expected to be the program name, add dummy string. 509 env_argv.insert(env_argv.begin(), ""); 510 std::vector<std::string> invalid_args{}; 511 options_parser::Parse(&env_argv, 512 nullptr, 513 &invalid_args, 514 per_isolate_opts.get(), 515 kAllowedInEnvvar, 516 &errors); 517 if (!errors.empty() && args[1]->IsObject()) { 518 // Only fail for explicitly provided env, this protects from failures 519 // when NODE_OPTIONS from parent's env is used (which is the default). 520 Local<Value> error; 521 if (!ToV8Value(env->context(), errors).ToLocal(&error)) return; 522 Local<String> key = 523 FIXED_ONE_BYTE_STRING(env->isolate(), "invalidNodeOptions"); 524 // Ignore the return value of Set() because exceptions bubble up to JS 525 // when we return anyway. 526 USE(args.This()->Set(env->context(), key, error)); 527 return; 528 } 529 } 530#endif // NODE_WITHOUT_NODE_OPTIONS 531 } 532 533 if (args[2]->IsArray()) { 534 Local<Array> array = args[2].As<Array>(); 535 // The first argument is reserved for program name, but we don't need it 536 // in workers. 537 std::vector<std::string> exec_argv = {""}; 538 uint32_t length = array->Length(); 539 for (uint32_t i = 0; i < length; i++) { 540 Local<Value> arg; 541 if (!array->Get(env->context(), i).ToLocal(&arg)) { 542 return; 543 } 544 Local<String> arg_v8; 545 if (!arg->ToString(env->context()).ToLocal(&arg_v8)) { 546 return; 547 } 548 Utf8Value arg_utf8_value(args.GetIsolate(), arg_v8); 549 std::string arg_string(arg_utf8_value.out(), arg_utf8_value.length()); 550 exec_argv.push_back(arg_string); 551 } 552 553 std::vector<std::string> invalid_args{}; 554 std::vector<std::string> errors{}; 555 // Using invalid_args as the v8_args argument as it stores unknown 556 // options for the per isolate parser. 557 options_parser::Parse(&exec_argv, 558 &exec_argv_out, 559 &invalid_args, 560 per_isolate_opts.get(), 561 kDisallowedInEnvvar, 562 &errors); 563 564 // The first argument is program name. 565 invalid_args.erase(invalid_args.begin()); 566 if (errors.size() > 0 || invalid_args.size() > 0) { 567 Local<Value> error; 568 if (!ToV8Value(env->context(), 569 errors.size() > 0 ? errors : invalid_args) 570 .ToLocal(&error)) { 571 return; 572 } 573 Local<String> key = 574 FIXED_ONE_BYTE_STRING(env->isolate(), "invalidExecArgv"); 575 // Ignore the return value of Set() because exceptions bubble up to JS 576 // when we return anyway. 577 USE(args.This()->Set(env->context(), key, error)); 578 return; 579 } 580 } else { 581 exec_argv_out = env->exec_argv(); 582 } 583 584 bool use_node_snapshot = per_process::cli_options->node_snapshot; 585 const SnapshotData* snapshot_data = 586 use_node_snapshot ? SnapshotBuilder::GetEmbeddedSnapshotData() : nullptr; 587 588 Worker* worker = new Worker(env, 589 args.This(), 590 url, 591 name, 592 per_isolate_opts, 593 std::move(exec_argv_out), 594 env_vars, 595 snapshot_data); 596 597 CHECK(args[3]->IsFloat64Array()); 598 Local<Float64Array> limit_info = args[3].As<Float64Array>(); 599 CHECK_EQ(limit_info->Length(), kTotalResourceLimitCount); 600 limit_info->CopyContents(worker->resource_limits_, 601 sizeof(worker->resource_limits_)); 602 603 CHECK(args[4]->IsBoolean()); 604 if (args[4]->IsTrue() || env->tracks_unmanaged_fds()) 605 worker->environment_flags_ |= EnvironmentFlags::kTrackUnmanagedFds; 606 if (env->hide_console_windows()) 607 worker->environment_flags_ |= EnvironmentFlags::kHideConsoleWindows; 608 if (env->no_native_addons()) 609 worker->environment_flags_ |= EnvironmentFlags::kNoNativeAddons; 610 if (env->no_global_search_paths()) 611 worker->environment_flags_ |= EnvironmentFlags::kNoGlobalSearchPaths; 612 if (env->no_browser_globals()) 613 worker->environment_flags_ |= EnvironmentFlags::kNoBrowserGlobals; 614} 615 616void Worker::StartThread(const FunctionCallbackInfo<Value>& args) { 617 Worker* w; 618 ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); 619 Mutex::ScopedLock lock(w->mutex_); 620 621 w->stopped_ = false; 622 623 if (w->resource_limits_[kStackSizeMb] > 0) { 624 if (w->resource_limits_[kStackSizeMb] * kMB < kStackBufferSize) { 625 w->resource_limits_[kStackSizeMb] = kStackBufferSize / kMB; 626 w->stack_size_ = kStackBufferSize; 627 } else { 628 w->stack_size_ = 629 static_cast<size_t>(w->resource_limits_[kStackSizeMb] * kMB); 630 } 631 } else { 632 w->resource_limits_[kStackSizeMb] = w->stack_size_ / kMB; 633 } 634 635 uv_thread_options_t thread_options; 636 thread_options.flags = UV_THREAD_HAS_STACK_SIZE; 637 thread_options.stack_size = w->stack_size_; 638 639 uv_thread_t* tid = &w->tid_.emplace(); // Create uv_thread_t instance 640 int ret = uv_thread_create_ex(tid, &thread_options, [](void* arg) { 641 // XXX: This could become a std::unique_ptr, but that makes at least 642 // gcc 6.3 detect undefined behaviour when there shouldn't be any. 643 // gcc 7+ handles this well. 644 Worker* w = static_cast<Worker*>(arg); 645 const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg); 646 647 // Leave a few kilobytes just to make sure we're within limits and have 648 // some space to do work in C++ land. 649 w->stack_base_ = stack_top - (w->stack_size_ - kStackBufferSize); 650 651 w->Run(); 652 653 Mutex::ScopedLock lock(w->mutex_); 654 w->env()->SetImmediateThreadsafe( 655 [w = std::unique_ptr<Worker>(w)](Environment* env) { 656 if (w->has_ref_) 657 env->add_refs(-1); 658 w->JoinThread(); 659 // implicitly delete w 660 }); 661 }, static_cast<void*>(w)); 662 663 if (ret == 0) { 664 // The object now owns the created thread and should not be garbage 665 // collected until that finishes. 666 w->ClearWeak(); 667 668 if (w->has_ref_) 669 w->env()->add_refs(1); 670 671 w->env()->add_sub_worker_context(w); 672 } else { 673 w->stopped_ = true; 674 w->tid_.reset(); 675 676 char err_buf[128]; 677 uv_err_name_r(ret, err_buf, sizeof(err_buf)); 678 { 679 Isolate* isolate = w->env()->isolate(); 680 HandleScope handle_scope(isolate); 681 THROW_ERR_WORKER_INIT_FAILED(isolate, err_buf); 682 } 683 } 684} 685 686void Worker::StopThread(const FunctionCallbackInfo<Value>& args) { 687 Worker* w; 688 ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); 689 690 Debug(w, "Worker %llu is getting stopped by parent", w->thread_id_.id); 691 w->Exit(1); 692} 693 694void Worker::Ref(const FunctionCallbackInfo<Value>& args) { 695 Worker* w; 696 ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); 697 if (!w->has_ref_ && w->tid_.has_value()) { 698 w->has_ref_ = true; 699 w->env()->add_refs(1); 700 } 701} 702 703void Worker::HasRef(const FunctionCallbackInfo<Value>& args) { 704 Worker* w; 705 ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); 706 args.GetReturnValue().Set(w->has_ref_); 707} 708 709void Worker::Unref(const FunctionCallbackInfo<Value>& args) { 710 Worker* w; 711 ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); 712 if (w->has_ref_ && w->tid_.has_value()) { 713 w->has_ref_ = false; 714 w->env()->add_refs(-1); 715 } 716} 717 718void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) { 719 Worker* w; 720 ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); 721 args.GetReturnValue().Set(w->GetResourceLimits(args.GetIsolate())); 722} 723 724Local<Float64Array> Worker::GetResourceLimits(Isolate* isolate) const { 725 Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, sizeof(resource_limits_)); 726 727 memcpy(ab->Data(), resource_limits_, sizeof(resource_limits_)); 728 return Float64Array::New(ab, 0, kTotalResourceLimitCount); 729} 730 731void Worker::Exit(int code, const char* error_code, const char* error_message) { 732 Mutex::ScopedLock lock(mutex_); 733 Debug(this, "Worker %llu called Exit(%d, %s, %s)", 734 thread_id_.id, code, error_code, error_message); 735 736 if (error_code != nullptr) { 737 custom_error_ = error_code; 738 custom_error_str_ = error_message; 739 } 740 741 if (env_ != nullptr) { 742 exit_code_ = code; 743 Stop(env_); 744 } else { 745 stopped_ = true; 746 } 747} 748 749bool Worker::IsNotIndicativeOfMemoryLeakAtExit() const { 750 // Worker objects always stay alive as long as the child thread, regardless 751 // of whether they are being referenced in the parent thread. 752 return true; 753} 754 755class WorkerHeapSnapshotTaker : public AsyncWrap { 756 public: 757 WorkerHeapSnapshotTaker(Environment* env, Local<Object> obj) 758 : AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSNAPSHOT) {} 759 760 SET_NO_MEMORY_INFO() 761 SET_MEMORY_INFO_NAME(WorkerHeapSnapshotTaker) 762 SET_SELF_SIZE(WorkerHeapSnapshotTaker) 763}; 764 765void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) { 766 Worker* w; 767 ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); 768 769 Debug(w, "Worker %llu taking heap snapshot", w->thread_id_.id); 770 771 Environment* env = w->env(); 772 AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w); 773 Local<Object> wrap; 774 if (!env->worker_heap_snapshot_taker_template() 775 ->NewInstance(env->context()).ToLocal(&wrap)) { 776 return; 777 } 778 779 // The created WorkerHeapSnapshotTaker is an object owned by main 780 // thread's Isolate, it can not be accessed by worker thread 781 std::unique_ptr<BaseObjectPtr<WorkerHeapSnapshotTaker>> taker = 782 std::make_unique<BaseObjectPtr<WorkerHeapSnapshotTaker>>( 783 MakeDetachedBaseObject<WorkerHeapSnapshotTaker>(env, wrap)); 784 785 // Interrupt the worker thread and take a snapshot, then schedule a call 786 // on the parent thread that turns that snapshot into a readable stream. 787 bool scheduled = w->RequestInterrupt([taker = std::move(taker), 788 env](Environment* worker_env) mutable { 789 heap::HeapSnapshotPointer snapshot{ 790 worker_env->isolate()->GetHeapProfiler()->TakeHeapSnapshot()}; 791 CHECK(snapshot); 792 793 // Here, the worker thread temporarily owns the WorkerHeapSnapshotTaker 794 // object. 795 796 env->SetImmediateThreadsafe( 797 [taker = std::move(taker), 798 snapshot = std::move(snapshot)](Environment* env) mutable { 799 HandleScope handle_scope(env->isolate()); 800 Context::Scope context_scope(env->context()); 801 802 AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get()); 803 BaseObjectPtr<AsyncWrap> stream = 804 heap::CreateHeapSnapshotStream(env, std::move(snapshot)); 805 Local<Value> args[] = {stream->object()}; 806 taker->get()->MakeCallback( 807 env->ondone_string(), arraysize(args), args); 808 // implicitly delete `taker` 809 }, 810 CallbackFlags::kUnrefed); 811 812 // Now, the lambda is delivered to the main thread, as a result, the 813 // WorkerHeapSnapshotTaker object is delivered to the main thread, too. 814 }); 815 816 if (scheduled) { 817 args.GetReturnValue().Set(wrap); 818 } else { 819 args.GetReturnValue().Set(Local<Object>()); 820 } 821} 822 823void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) { 824 Worker* w; 825 ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); 826 827 Mutex::ScopedLock lock(w->mutex_); 828 // Using w->is_stopped() here leads to a deadlock, and checking is_stopped() 829 // before locking the mutex is a race condition. So manually do the same 830 // check. 831 if (w->stopped_ || w->env_ == nullptr) 832 return args.GetReturnValue().Set(-1); 833 834 uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop()); 835 args.GetReturnValue().Set(1.0 * idle_time / 1e6); 836} 837 838void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) { 839 Worker* w; 840 ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); 841 842 Mutex::ScopedLock lock(w->mutex_); 843 // Using w->is_stopped() here leads to a deadlock, and checking is_stopped() 844 // before locking the mutex is a race condition. So manually do the same 845 // check. 846 if (w->stopped_ || w->env_ == nullptr) 847 return args.GetReturnValue().Set(-1); 848 849 double loop_start_time = w->env_->performance_state()->milestones[ 850 node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START]; 851 CHECK_GE(loop_start_time, 0); 852 args.GetReturnValue().Set( 853 (loop_start_time - node::performance::timeOrigin) / 1e6); 854} 855 856namespace { 857 858// Return the MessagePort that is global for this Environment and communicates 859// with the internal [kPort] port of the JS Worker class in the parent thread. 860void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) { 861 Environment* env = Environment::GetCurrent(args); 862 Local<Object> port = env->message_port(); 863 CHECK_IMPLIES(!env->is_main_thread(), !port.IsEmpty()); 864 if (!port.IsEmpty()) { 865 CHECK_EQ(port->GetCreationContext().ToLocalChecked()->GetIsolate(), 866 args.GetIsolate()); 867 args.GetReturnValue().Set(port); 868 } 869} 870 871void InitWorker(Local<Object> target, 872 Local<Value> unused, 873 Local<Context> context, 874 void* priv) { 875 Environment* env = Environment::GetCurrent(context); 876 Isolate* isolate = env->isolate(); 877 878 { 879 Local<FunctionTemplate> w = NewFunctionTemplate(isolate, Worker::New); 880 881 w->InstanceTemplate()->SetInternalFieldCount( 882 Worker::kInternalFieldCount); 883 w->Inherit(AsyncWrap::GetConstructorTemplate(env)); 884 885 SetProtoMethod(isolate, w, "startThread", Worker::StartThread); 886 SetProtoMethod(isolate, w, "stopThread", Worker::StopThread); 887 SetProtoMethod(isolate, w, "hasRef", Worker::HasRef); 888 SetProtoMethod(isolate, w, "ref", Worker::Ref); 889 SetProtoMethod(isolate, w, "unref", Worker::Unref); 890 SetProtoMethod(isolate, w, "getResourceLimits", Worker::GetResourceLimits); 891 SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot); 892 SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime); 893 SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime); 894 895 SetConstructorFunction(context, target, "Worker", w); 896 } 897 898 { 899 Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr); 900 901 wst->InstanceTemplate()->SetInternalFieldCount( 902 WorkerHeapSnapshotTaker::kInternalFieldCount); 903 wst->Inherit(AsyncWrap::GetConstructorTemplate(env)); 904 905 Local<String> wst_string = 906 FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapSnapshotTaker"); 907 wst->SetClassName(wst_string); 908 env->set_worker_heap_snapshot_taker_template(wst->InstanceTemplate()); 909 } 910 911 SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort); 912 913 target 914 ->Set(env->context(), 915 env->thread_id_string(), 916 Number::New(isolate, static_cast<double>(env->thread_id()))) 917 .Check(); 918 919 target 920 ->Set(env->context(), 921 FIXED_ONE_BYTE_STRING(isolate, "isMainThread"), 922 Boolean::New(isolate, env->is_main_thread())) 923 .Check(); 924 925 target 926 ->Set(env->context(), 927 FIXED_ONE_BYTE_STRING(isolate, "ownsProcessState"), 928 Boolean::New(isolate, env->owns_process_state())) 929 .Check(); 930 931 if (!env->is_main_thread()) { 932 target 933 ->Set(env->context(), 934 FIXED_ONE_BYTE_STRING(isolate, "resourceLimits"), 935 env->worker_context()->GetResourceLimits(isolate)) 936 .Check(); 937 } 938 939 NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb); 940 NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb); 941 NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb); 942 NODE_DEFINE_CONSTANT(target, kStackSizeMb); 943 NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount); 944} 945 946void RegisterExternalReferences(ExternalReferenceRegistry* registry) { 947 registry->Register(GetEnvMessagePort); 948 registry->Register(Worker::New); 949 registry->Register(Worker::StartThread); 950 registry->Register(Worker::StopThread); 951 registry->Register(Worker::HasRef); 952 registry->Register(Worker::Ref); 953 registry->Register(Worker::Unref); 954 registry->Register(Worker::GetResourceLimits); 955 registry->Register(Worker::TakeHeapSnapshot); 956 registry->Register(Worker::LoopIdleTime); 957 registry->Register(Worker::LoopStartTime); 958} 959 960} // anonymous namespace 961} // namespace worker 962} // namespace node 963 964NODE_BINDING_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker) 965NODE_BINDING_EXTERNAL_REFERENCE(worker, 966 node::worker::RegisterExternalReferences) 967