1'use strict'; 2 3const { 4 ArrayIsArray, 5 ArrayPrototypePush, 6 ArrayPrototypeReduce, 7 ArrayPrototypeSlice, 8 FunctionPrototype, 9 FunctionPrototypeCall, 10 ObjectDefineProperty, 11 ObjectSetPrototypeOf, 12 ReflectApply, 13 StringPrototypeSlice, 14 Symbol, 15 SymbolDispose, 16 Uint8Array, 17} = primordials; 18 19const { 20 errnoException, 21 codes: { 22 ERR_INVALID_ARG_TYPE, 23 ERR_INVALID_ARG_VALUE, 24 ERR_INVALID_HANDLE_TYPE, 25 ERR_INVALID_SYNC_FORK_INPUT, 26 ERR_IPC_CHANNEL_CLOSED, 27 ERR_IPC_DISCONNECTED, 28 ERR_IPC_ONE_PIPE, 29 ERR_IPC_SYNC_FORK, 30 ERR_MISSING_ARGS, 31 }, 32} = require('internal/errors'); 33const { 34 validateArray, 35 validateObject, 36 validateOneOf, 37 validateString, 38} = require('internal/validators'); 39const EventEmitter = require('events'); 40const net = require('net'); 41const dgram = require('dgram'); 42const inspect = require('internal/util/inspect').inspect; 43const assert = require('internal/assert'); 44 45const { Process } = internalBinding('process_wrap'); 46const { 47 WriteWrap, 48 kReadBytesOrError, 49 kArrayBufferOffset, 50 kLastWriteWasAsync, 51 streamBaseState, 52} = internalBinding('stream_wrap'); 53const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap'); 54const { TCP } = internalBinding('tcp_wrap'); 55const { TTY } = internalBinding('tty_wrap'); 56const { UDP } = internalBinding('udp_wrap'); 57const SocketList = require('internal/socket_list'); 58const { owner_symbol } = require('internal/async_hooks').symbols; 59const { convertToValidSignal, deprecate } = require('internal/util'); 60const { isArrayBufferView } = require('internal/util/types'); 61const spawn_sync = internalBinding('spawn_sync'); 62const { kStateSymbol } = require('internal/dgram'); 63 64const { 65 UV_EACCES, 66 UV_EAGAIN, 67 UV_EINVAL, 68 UV_EMFILE, 69 UV_ENFILE, 70 UV_ENOENT, 71 UV_ENOSYS, 72 UV_ESRCH, 73} = internalBinding('uv'); 74 75const { SocketListSend, SocketListReceive } = SocketList; 76 77// Lazy loaded for startup performance and to allow monkey patching of 78// internalBinding('http_parser').HTTPParser. 79let freeParser; 80let HTTPParser; 81 82const MAX_HANDLE_RETRANSMISSIONS = 3; 83const kChannelHandle = Symbol('kChannelHandle'); 84const kIsUsedAsStdio = Symbol('kIsUsedAsStdio'); 85const kPendingMessages = Symbol('kPendingMessages'); 86 87// This object contain function to convert TCP objects to native handle objects 88// and back again. 89const handleConversion = { 90 'net.Native': { 91 simultaneousAccepts: true, 92 93 send(message, handle, options) { 94 return handle; 95 }, 96 97 got(message, handle, emit) { 98 emit(handle); 99 }, 100 }, 101 102 'net.Server': { 103 simultaneousAccepts: true, 104 105 send(message, server, options) { 106 return server._handle; 107 }, 108 109 got(message, handle, emit) { 110 const server = new net.Server(); 111 server.listen(handle, () => { 112 emit(server); 113 }); 114 }, 115 }, 116 117 'net.Socket': { 118 send(message, socket, options) { 119 if (!socket._handle) 120 return; 121 122 // If the socket was created by net.Server 123 if (socket.server) { 124 // The worker should keep track of the socket 125 message.key = socket.server._connectionKey; 126 127 const firstTime = !this[kChannelHandle].sockets.send[message.key]; 128 const socketList = getSocketList('send', this, message.key); 129 130 // The server should no longer expose a .connection property 131 // and when asked to close it should query the socket status from 132 // the workers 133 if (firstTime) socket.server._setupWorker(socketList); 134 135 // Act like socket is detached 136 if (!options.keepOpen) 137 socket.server._connections--; 138 } 139 140 const handle = socket._handle; 141 142 // Remove handle from socket object, it will be closed when the socket 143 // will be sent 144 if (!options.keepOpen) { 145 handle.onread = nop; 146 socket._handle = null; 147 socket.setTimeout(0); 148 149 if (freeParser === undefined) 150 freeParser = require('_http_common').freeParser; 151 if (HTTPParser === undefined) 152 HTTPParser = require('_http_common').HTTPParser; 153 154 // In case of an HTTP connection socket, release the associated 155 // resources 156 if (socket.parser && socket.parser instanceof HTTPParser) { 157 freeParser(socket.parser, null, socket); 158 if (socket._httpMessage) 159 socket._httpMessage.detachSocket(socket); 160 } 161 } 162 163 return handle; 164 }, 165 166 postSend(message, handle, options, callback, target) { 167 // Store the handle after successfully sending it, so it can be closed 168 // when the NODE_HANDLE_ACK is received. If the handle could not be sent, 169 // just close it. 170 if (handle && !options.keepOpen) { 171 if (target) { 172 // There can only be one _pendingMessage as passing handles are 173 // processed one at a time: handles are stored in _handleQueue while 174 // waiting for the NODE_HANDLE_ACK of the current passing handle. 175 assert(!target._pendingMessage); 176 target._pendingMessage = 177 { callback, message, handle, options, retransmissions: 0 }; 178 } else { 179 handle.close(); 180 } 181 } 182 }, 183 184 got(message, handle, emit) { 185 const socket = new net.Socket({ 186 handle: handle, 187 readable: true, 188 writable: true, 189 }); 190 191 // If the socket was created by net.Server we will track the socket 192 if (message.key) { 193 194 // Add socket to connections list 195 const socketList = getSocketList('got', this, message.key); 196 socketList.add({ 197 socket: socket, 198 }); 199 } 200 201 emit(socket); 202 }, 203 }, 204 205 'dgram.Native': { 206 simultaneousAccepts: false, 207 208 send(message, handle, options) { 209 return handle; 210 }, 211 212 got(message, handle, emit) { 213 emit(handle); 214 }, 215 }, 216 217 'dgram.Socket': { 218 simultaneousAccepts: false, 219 220 send(message, socket, options) { 221 message.dgramType = socket.type; 222 223 return socket[kStateSymbol].handle; 224 }, 225 226 got(message, handle, emit) { 227 const socket = new dgram.Socket(message.dgramType); 228 229 socket.bind(handle, () => { 230 emit(socket); 231 }); 232 }, 233 }, 234}; 235 236function stdioStringToArray(stdio, channel) { 237 const options = []; 238 239 switch (stdio) { 240 case 'ignore': 241 case 'overlapped': 242 case 'pipe': ArrayPrototypePush(options, stdio, stdio, stdio); break; 243 case 'inherit': ArrayPrototypePush(options, 0, 1, 2); break; 244 default: 245 throw new ERR_INVALID_ARG_VALUE('stdio', stdio); 246 } 247 248 if (channel) ArrayPrototypePush(options, channel); 249 250 return options; 251} 252 253function ChildProcess() { 254 FunctionPrototypeCall(EventEmitter, this); 255 256 this._closesNeeded = 1; 257 this._closesGot = 0; 258 this.connected = false; 259 260 this.signalCode = null; 261 this.exitCode = null; 262 this.killed = false; 263 this.spawnfile = null; 264 265 this._handle = new Process(); 266 this._handle[owner_symbol] = this; 267 268 this._handle.onexit = (exitCode, signalCode) => { 269 if (signalCode) { 270 this.signalCode = signalCode; 271 } else { 272 this.exitCode = exitCode; 273 } 274 275 if (this.stdin) { 276 this.stdin.destroy(); 277 } 278 279 this._handle.close(); 280 this._handle = null; 281 282 if (exitCode < 0) { 283 const syscall = this.spawnfile ? 'spawn ' + this.spawnfile : 'spawn'; 284 const err = errnoException(exitCode, syscall); 285 286 if (this.spawnfile) 287 err.path = this.spawnfile; 288 289 err.spawnargs = ArrayPrototypeSlice(this.spawnargs, 1); 290 this.emit('error', err); 291 } else { 292 this.emit('exit', this.exitCode, this.signalCode); 293 } 294 295 // If any of the stdio streams have not been touched, 296 // then pull all the data through so that it can get the 297 // eof and emit a 'close' event. 298 // Do it on nextTick so that the user has one last chance 299 // to consume the output, if for example they only want to 300 // start reading the data once the process exits. 301 process.nextTick(flushStdio, this); 302 303 maybeClose(this); 304 }; 305} 306ObjectSetPrototypeOf(ChildProcess.prototype, EventEmitter.prototype); 307ObjectSetPrototypeOf(ChildProcess, EventEmitter); 308 309 310function flushStdio(subprocess) { 311 const stdio = subprocess.stdio; 312 313 if (stdio == null) return; 314 315 for (let i = 0; i < stdio.length; i++) { 316 const stream = stdio[i]; 317 // TODO(addaleax): This doesn't necessarily account for all the ways in 318 // which data can be read from a stream, e.g. being consumed on the 319 // native layer directly as a StreamBase. 320 if (!stream || !stream.readable || stream[kIsUsedAsStdio]) { 321 continue; 322 } 323 stream.resume(); 324 } 325} 326 327 328function createSocket(pipe, readable) { 329 return net.Socket({ handle: pipe, readable }); 330} 331 332 333function getHandleWrapType(stream) { 334 if (stream instanceof Pipe) return 'pipe'; 335 if (stream instanceof TTY) return 'tty'; 336 if (stream instanceof TCP) return 'tcp'; 337 if (stream instanceof UDP) return 'udp'; 338 339 return false; 340} 341 342function closePendingHandle(target) { 343 target._pendingMessage.handle.close(); 344 target._pendingMessage = null; 345} 346 347 348ChildProcess.prototype.spawn = function(options) { 349 let i = 0; 350 351 validateObject(options, 'options'); 352 353 // If no `stdio` option was given - use default 354 let stdio = options.stdio || 'pipe'; 355 356 stdio = getValidStdio(stdio, false); 357 358 const ipc = stdio.ipc; 359 const ipcFd = stdio.ipcFd; 360 stdio = options.stdio = stdio.stdio; 361 362 363 validateOneOf(options.serialization, 'options.serialization', 364 [undefined, 'json', 'advanced']); 365 const serialization = options.serialization || 'json'; 366 367 if (ipc !== undefined) { 368 // Let child process know about opened IPC channel 369 if (options.envPairs === undefined) 370 options.envPairs = []; 371 else 372 validateArray(options.envPairs, 'options.envPairs'); 373 374 ArrayPrototypePush(options.envPairs, `NODE_CHANNEL_FD=${ipcFd}`); 375 ArrayPrototypePush(options.envPairs, 376 `NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`); 377 } 378 379 validateString(options.file, 'options.file'); 380 this.spawnfile = options.file; 381 382 if (options.args === undefined) { 383 this.spawnargs = []; 384 } else { 385 validateArray(options.args, 'options.args'); 386 this.spawnargs = options.args; 387 } 388 389 const err = this._handle.spawn(options); 390 391 // Run-time errors should emit an error, not throw an exception. 392 if (err === UV_EACCES || 393 err === UV_EAGAIN || 394 err === UV_EMFILE || 395 err === UV_ENFILE || 396 err === UV_ENOENT) { 397 process.nextTick(onErrorNT, this, err); 398 399 // There is no point in continuing when we've hit EMFILE or ENFILE 400 // because we won't be able to set up the stdio file descriptors. 401 if (err === UV_EMFILE || err === UV_ENFILE) 402 return err; 403 } else if (err) { 404 // Close all opened fds on error 405 for (i = 0; i < stdio.length; i++) { 406 const stream = stdio[i]; 407 if (stream.type === 'pipe') { 408 stream.handle.close(); 409 } 410 } 411 412 this._handle.close(); 413 this._handle = null; 414 throw errnoException(err, 'spawn'); 415 } else { 416 process.nextTick(onSpawnNT, this); 417 } 418 419 this.pid = this._handle.pid; 420 421 for (i = 0; i < stdio.length; i++) { 422 const stream = stdio[i]; 423 if (stream.type === 'ignore') continue; 424 425 if (stream.ipc) { 426 this._closesNeeded++; 427 continue; 428 } 429 430 // The stream is already cloned and piped, thus stop its readable side, 431 // otherwise we might attempt to read from the stream when at the same time 432 // the child process does. 433 if (stream.type === 'wrap') { 434 stream.handle.reading = false; 435 stream.handle.readStop(); 436 stream._stdio.pause(); 437 stream._stdio.readableFlowing = false; 438 stream._stdio._readableState.reading = false; 439 stream._stdio[kIsUsedAsStdio] = true; 440 continue; 441 } 442 443 if (stream.handle) { 444 stream.socket = createSocket(this.pid !== 0 ? 445 stream.handle : null, i > 0); 446 447 if (i > 0 && this.pid !== 0) { 448 this._closesNeeded++; 449 stream.socket.on('close', () => { 450 maybeClose(this); 451 }); 452 } 453 } 454 } 455 456 this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ? 457 stdio[0].socket : null; 458 this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ? 459 stdio[1].socket : null; 460 this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ? 461 stdio[2].socket : null; 462 463 this.stdio = []; 464 465 for (i = 0; i < stdio.length; i++) 466 ArrayPrototypePush(this.stdio, 467 stdio[i].socket === undefined ? null : stdio[i].socket); 468 469 // Add .send() method and start listening for IPC data 470 if (ipc !== undefined) setupChannel(this, ipc, serialization); 471 472 return err; 473}; 474 475 476function onErrorNT(self, err) { 477 self._handle.onexit(err); 478} 479 480 481function onSpawnNT(self) { 482 self.emit('spawn'); 483} 484 485 486ChildProcess.prototype.kill = function(sig) { 487 488 const signal = sig === 0 ? sig : 489 convertToValidSignal(sig === undefined ? 'SIGTERM' : sig); 490 491 if (this._handle) { 492 const err = this._handle.kill(signal); 493 if (err === 0) { 494 /* Success. */ 495 this.killed = true; 496 return true; 497 } 498 if (err === UV_ESRCH) { 499 /* Already dead. */ 500 } else if (err === UV_EINVAL || err === UV_ENOSYS) { 501 /* The underlying platform doesn't support this signal. */ 502 throw errnoException(err, 'kill'); 503 } else { 504 /* Other error, almost certainly EPERM. */ 505 this.emit('error', errnoException(err, 'kill')); 506 } 507 } 508 509 /* Kill didn't succeed. */ 510 return false; 511}; 512 513ChildProcess.prototype[SymbolDispose] = function() { 514 if (!this.killed) { 515 this.kill(); 516 } 517}; 518 519 520ChildProcess.prototype.ref = function() { 521 if (this._handle) this._handle.ref(); 522}; 523 524 525ChildProcess.prototype.unref = function() { 526 if (this._handle) this._handle.unref(); 527}; 528 529class Control extends EventEmitter { 530 #channel = null; 531 #refs = 0; 532 #refExplicitlySet = false; 533 534 constructor(channel) { 535 super(); 536 this.#channel = channel; 537 this[kPendingMessages] = []; 538 } 539 540 // The methods keeping track of the counter are being used to track the 541 // listener count on the child process object as well as when writes are 542 // in progress. Once the user has explicitly requested a certain state, these 543 // methods become no-ops in order to not interfere with the user's intentions. 544 refCounted() { 545 if (++this.#refs === 1 && !this.#refExplicitlySet) { 546 this.#channel.ref(); 547 } 548 } 549 550 unrefCounted() { 551 if (--this.#refs === 0 && !this.#refExplicitlySet) { 552 this.#channel.unref(); 553 this.emit('unref'); 554 } 555 } 556 557 ref() { 558 this.#refExplicitlySet = true; 559 this.#channel.ref(); 560 } 561 562 unref() { 563 this.#refExplicitlySet = true; 564 this.#channel.unref(); 565 } 566 567 get fd() { 568 return this.#channel ? this.#channel.fd : undefined; 569 } 570} 571 572const channelDeprecationMsg = '_channel is deprecated. ' + 573 'Use ChildProcess.channel instead.'; 574 575let serialization; 576function setupChannel(target, channel, serializationMode) { 577 const control = new Control(channel); 578 target.channel = control; 579 target[kChannelHandle] = channel; 580 581 ObjectDefineProperty(target, '_channel', { 582 __proto__: null, 583 get: deprecate(() => { 584 return target.channel; 585 }, channelDeprecationMsg, 'DEP0129'), 586 set: deprecate((val) => { 587 target.channel = val; 588 }, channelDeprecationMsg, 'DEP0129'), 589 configurable: true, 590 enumerable: false, 591 }); 592 593 target._handleQueue = null; 594 target._pendingMessage = null; 595 596 if (serialization === undefined) 597 serialization = require('internal/child_process/serialization'); 598 const { 599 initMessageChannel, 600 parseChannelMessages, 601 writeChannelMessage, 602 } = serialization[serializationMode]; 603 604 let pendingHandle = null; 605 initMessageChannel(channel); 606 channel.pendingHandle = null; 607 channel.onread = function(arrayBuffer) { 608 const recvHandle = channel.pendingHandle; 609 channel.pendingHandle = null; 610 if (arrayBuffer) { 611 const nread = streamBaseState[kReadBytesOrError]; 612 const offset = streamBaseState[kArrayBufferOffset]; 613 const pool = new Uint8Array(arrayBuffer, offset, nread); 614 if (recvHandle) 615 pendingHandle = recvHandle; 616 617 for (const message of parseChannelMessages(channel, pool)) { 618 // There will be at most one NODE_HANDLE message in every chunk we 619 // read because SCM_RIGHTS messages don't get coalesced. Make sure 620 // that we deliver the handle with the right message however. 621 if (isInternal(message)) { 622 if (message.cmd === 'NODE_HANDLE') { 623 handleMessage(message, pendingHandle, true); 624 pendingHandle = null; 625 } else { 626 handleMessage(message, undefined, true); 627 } 628 } else { 629 handleMessage(message, undefined, false); 630 } 631 } 632 } else { 633 this.buffering = false; 634 target.disconnect(); 635 channel.onread = nop; 636 channel.close(); 637 target.channel = null; 638 maybeClose(target); 639 } 640 }; 641 642 // Object where socket lists will live 643 channel.sockets = { got: {}, send: {} }; 644 645 // Handlers will go through this 646 target.on('internalMessage', function(message, handle) { 647 // Once acknowledged - continue sending handles. 648 if (message.cmd === 'NODE_HANDLE_ACK' || 649 message.cmd === 'NODE_HANDLE_NACK') { 650 651 if (target._pendingMessage) { 652 if (message.cmd === 'NODE_HANDLE_ACK') { 653 closePendingHandle(target); 654 } else if (target._pendingMessage.retransmissions++ === 655 MAX_HANDLE_RETRANSMISSIONS) { 656 closePendingHandle(target); 657 process.emitWarning('Handle did not reach the receiving process ' + 658 'correctly', 'SentHandleNotReceivedWarning'); 659 } 660 } 661 662 assert(ArrayIsArray(target._handleQueue)); 663 const queue = target._handleQueue; 664 target._handleQueue = null; 665 666 if (target._pendingMessage) { 667 target._send(target._pendingMessage.message, 668 target._pendingMessage.handle, 669 target._pendingMessage.options, 670 target._pendingMessage.callback); 671 } 672 673 for (let i = 0; i < queue.length; i++) { 674 const args = queue[i]; 675 target._send(args.message, args.handle, args.options, args.callback); 676 } 677 678 // Process a pending disconnect (if any). 679 if (!target.connected && target.channel && !target._handleQueue) 680 target._disconnect(); 681 682 return; 683 } 684 685 if (message.cmd !== 'NODE_HANDLE') return; 686 687 // It is possible that the handle is not received because of some error on 688 // ancillary data reception such as MSG_CTRUNC. In this case, report the 689 // sender about it by sending a NODE_HANDLE_NACK message. 690 if (!handle) 691 return target._send({ cmd: 'NODE_HANDLE_NACK' }, null, true); 692 693 // Acknowledge handle receival. Don't emit error events (for example if 694 // the other side has disconnected) because this call to send() is not 695 // initiated by the user and it shouldn't be fatal to be unable to ACK 696 // a message. 697 target._send({ cmd: 'NODE_HANDLE_ACK' }, null, true); 698 699 const obj = handleConversion[message.type]; 700 701 // Update simultaneous accepts on Windows 702 if (process.platform === 'win32') { 703 handle.setSimultaneousAccepts(false); 704 } 705 706 // Convert handle object 707 obj.got.call(this, message, handle, (handle) => { 708 handleMessage(message.msg, handle, isInternal(message.msg)); 709 }); 710 }); 711 712 target.on('newListener', function() { 713 714 process.nextTick(() => { 715 if (!target.channel || !target.listenerCount('message')) 716 return; 717 718 const messages = target.channel[kPendingMessages]; 719 const { length } = messages; 720 if (!length) return; 721 722 for (let i = 0; i < length; i++) { 723 ReflectApply(target.emit, target, messages[i]); 724 } 725 726 target.channel[kPendingMessages] = []; 727 }); 728 }); 729 730 target.send = function(message, handle, options, callback) { 731 if (typeof handle === 'function') { 732 callback = handle; 733 handle = undefined; 734 options = undefined; 735 } else if (typeof options === 'function') { 736 callback = options; 737 options = undefined; 738 } else if (options !== undefined) { 739 validateObject(options, 'options'); 740 } 741 742 options = { swallowErrors: false, ...options }; 743 744 if (this.connected) { 745 return this._send(message, handle, options, callback); 746 } 747 const ex = new ERR_IPC_CHANNEL_CLOSED(); 748 if (typeof callback === 'function') { 749 process.nextTick(callback, ex); 750 } else { 751 process.nextTick(() => this.emit('error', ex)); 752 } 753 return false; 754 }; 755 756 target._send = function(message, handle, options, callback) { 757 assert(this.connected || this.channel); 758 759 if (message === undefined) 760 throw new ERR_MISSING_ARGS('message'); 761 762 // Non-serializable messages should not reach the remote 763 // end point; as any failure in the stringification there 764 // will result in error message that is weakly consumable. 765 // So perform a final check on message prior to sending. 766 if (typeof message !== 'string' && 767 typeof message !== 'object' && 768 typeof message !== 'number' && 769 typeof message !== 'boolean') { 770 throw new ERR_INVALID_ARG_TYPE( 771 'message', ['string', 'object', 'number', 'boolean'], message); 772 } 773 774 // Support legacy function signature 775 if (typeof options === 'boolean') { 776 options = { swallowErrors: options }; 777 } 778 779 let obj; 780 781 // Package messages with a handle object 782 if (handle) { 783 // This message will be handled by an internalMessage event handler 784 message = { 785 cmd: 'NODE_HANDLE', 786 type: null, 787 msg: message, 788 }; 789 790 if (handle instanceof net.Socket) { 791 message.type = 'net.Socket'; 792 } else if (handle instanceof net.Server) { 793 message.type = 'net.Server'; 794 } else if (handle instanceof TCP || handle instanceof Pipe) { 795 message.type = 'net.Native'; 796 } else if (handle instanceof dgram.Socket) { 797 message.type = 'dgram.Socket'; 798 } else if (handle instanceof UDP) { 799 message.type = 'dgram.Native'; 800 } else { 801 throw new ERR_INVALID_HANDLE_TYPE(); 802 } 803 804 // Queue-up message and handle if we haven't received ACK yet. 805 if (this._handleQueue) { 806 ArrayPrototypePush(this._handleQueue, { 807 callback: callback, 808 handle: handle, 809 options: options, 810 message: message.msg, 811 }); 812 return this._handleQueue.length === 1; 813 } 814 815 obj = handleConversion[message.type]; 816 817 // convert TCP object to native handle object 818 handle = ReflectApply(handleConversion[message.type].send, 819 target, [message, handle, options]); 820 821 // If handle was sent twice, or it is impossible to get native handle 822 // out of it - just send a text without the handle. 823 if (!handle) 824 message = message.msg; 825 826 // Update simultaneous accepts on Windows 827 if (obj.simultaneousAccepts && process.platform === 'win32') { 828 handle.setSimultaneousAccepts(true); 829 } 830 } else if (this._handleQueue && 831 !(message && (message.cmd === 'NODE_HANDLE_ACK' || 832 message.cmd === 'NODE_HANDLE_NACK'))) { 833 // Queue request anyway to avoid out-of-order messages. 834 ArrayPrototypePush(this._handleQueue, { 835 callback: callback, 836 handle: null, 837 options: options, 838 message: message, 839 }); 840 return this._handleQueue.length === 1; 841 } 842 843 const req = new WriteWrap(); 844 845 const err = writeChannelMessage(channel, req, message, handle); 846 const wasAsyncWrite = streamBaseState[kLastWriteWasAsync]; 847 848 if (err === 0) { 849 if (handle) { 850 if (!this._handleQueue) 851 this._handleQueue = []; 852 if (obj && obj.postSend) 853 obj.postSend(message, handle, options, callback, target); 854 } 855 856 if (wasAsyncWrite) { 857 req.oncomplete = () => { 858 control.unrefCounted(); 859 if (typeof callback === 'function') 860 callback(null); 861 }; 862 control.refCounted(); 863 } else if (typeof callback === 'function') { 864 process.nextTick(callback, null); 865 } 866 } else { 867 // Cleanup handle on error 868 if (obj && obj.postSend) 869 obj.postSend(message, handle, options, callback); 870 871 if (!options.swallowErrors) { 872 const ex = errnoException(err, 'write'); 873 if (typeof callback === 'function') { 874 process.nextTick(callback, ex); 875 } else { 876 process.nextTick(() => this.emit('error', ex)); 877 } 878 } 879 } 880 881 /* If the primary is > 2 read() calls behind, please stop sending. */ 882 return channel.writeQueueSize < (65536 * 2); 883 }; 884 885 // Connected will be set to false immediately when a disconnect() is 886 // requested, even though the channel might still be alive internally to 887 // process queued messages. The three states are distinguished as follows: 888 // - disconnect() never requested: channel is not null and connected 889 // is true 890 // - disconnect() requested, messages in the queue: channel is not null 891 // and connected is false 892 // - disconnect() requested, channel actually disconnected: channel is 893 // null and connected is false 894 target.connected = true; 895 896 target.disconnect = function() { 897 if (!this.connected) { 898 this.emit('error', new ERR_IPC_DISCONNECTED()); 899 return; 900 } 901 902 // Do not allow any new messages to be written. 903 this.connected = false; 904 905 // If there are no queued messages, disconnect immediately. Otherwise, 906 // postpone the disconnect so that it happens internally after the 907 // queue is flushed. 908 if (!this._handleQueue) 909 this._disconnect(); 910 }; 911 912 target._disconnect = function() { 913 assert(this.channel); 914 915 // This marks the fact that the channel is actually disconnected. 916 this.channel = null; 917 this[kChannelHandle] = null; 918 919 if (this._pendingMessage) 920 closePendingHandle(this); 921 922 let fired = false; 923 function finish() { 924 if (fired) return; 925 fired = true; 926 927 channel.close(); 928 target.emit('disconnect'); 929 } 930 931 // If a message is being read, then wait for it to complete. 932 if (channel.buffering) { 933 this.once('message', finish); 934 this.once('internalMessage', finish); 935 936 return; 937 } 938 939 process.nextTick(finish); 940 }; 941 942 function emit(event, message, handle) { 943 if ('internalMessage' === event || target.listenerCount('message')) { 944 target.emit(event, message, handle); 945 return; 946 } 947 948 ArrayPrototypePush( 949 target.channel[kPendingMessages], 950 [event, message, handle], 951 ); 952 } 953 954 function handleMessage(message, handle, internal) { 955 if (!target.channel) 956 return; 957 958 const eventName = (internal ? 'internalMessage' : 'message'); 959 960 process.nextTick(emit, eventName, message, handle); 961 } 962 963 channel.readStart(); 964 return control; 965} 966 967const INTERNAL_PREFIX = 'NODE_'; 968function isInternal(message) { 969 return (message !== null && 970 typeof message === 'object' && 971 typeof message.cmd === 'string' && 972 message.cmd.length > INTERNAL_PREFIX.length && 973 StringPrototypeSlice(message.cmd, 0, INTERNAL_PREFIX.length) === 974 INTERNAL_PREFIX); 975} 976 977const nop = FunctionPrototype; 978 979function getValidStdio(stdio, sync) { 980 let ipc; 981 let ipcFd; 982 983 // Replace shortcut with an array 984 if (typeof stdio === 'string') { 985 stdio = stdioStringToArray(stdio); 986 } else if (!ArrayIsArray(stdio)) { 987 throw new ERR_INVALID_ARG_VALUE('stdio', stdio); 988 } 989 990 // At least 3 stdio will be created 991 // Don't concat() a new Array() because it would be sparse, and 992 // stdio.reduce() would skip the sparse elements of stdio. 993 // See https://stackoverflow.com/a/5501711/3561 994 while (stdio.length < 3) ArrayPrototypePush(stdio, undefined); 995 996 // Translate stdio into C++-readable form 997 // (i.e. PipeWraps or fds) 998 stdio = ArrayPrototypeReduce(stdio, (acc, stdio, i) => { 999 function cleanup() { 1000 for (let i = 0; i < acc.length; i++) { 1001 if ((acc[i].type === 'pipe' || acc[i].type === 'ipc') && acc[i].handle) 1002 acc[i].handle.close(); 1003 } 1004 } 1005 1006 // Defaults 1007 if (stdio == null) { 1008 stdio = i < 3 ? 'pipe' : 'ignore'; 1009 } 1010 1011 if (stdio === 'ignore') { 1012 ArrayPrototypePush(acc, { type: 'ignore' }); 1013 } else if (stdio === 'pipe' || stdio === 'overlapped' || 1014 (typeof stdio === 'number' && stdio < 0)) { 1015 const a = { 1016 type: stdio === 'overlapped' ? 'overlapped' : 'pipe', 1017 readable: i === 0, 1018 writable: i !== 0, 1019 }; 1020 1021 if (!sync) 1022 a.handle = new Pipe(PipeConstants.SOCKET); 1023 1024 ArrayPrototypePush(acc, a); 1025 } else if (stdio === 'ipc') { 1026 if (sync || ipc !== undefined) { 1027 // Cleanup previously created pipes 1028 cleanup(); 1029 if (!sync) 1030 throw new ERR_IPC_ONE_PIPE(); 1031 else 1032 throw new ERR_IPC_SYNC_FORK(); 1033 } 1034 1035 ipc = new Pipe(PipeConstants.IPC); 1036 ipcFd = i; 1037 1038 ArrayPrototypePush(acc, { 1039 type: 'pipe', 1040 handle: ipc, 1041 ipc: true, 1042 }); 1043 } else if (stdio === 'inherit') { 1044 ArrayPrototypePush(acc, { 1045 type: 'inherit', 1046 fd: i, 1047 }); 1048 } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') { 1049 ArrayPrototypePush(acc, { 1050 type: 'fd', 1051 fd: typeof stdio === 'number' ? stdio : stdio.fd, 1052 }); 1053 } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) || 1054 getHandleWrapType(stdio._handle)) { 1055 const handle = getHandleWrapType(stdio) ? 1056 stdio : 1057 getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle; 1058 1059 ArrayPrototypePush(acc, { 1060 type: 'wrap', 1061 wrapType: getHandleWrapType(handle), 1062 handle: handle, 1063 _stdio: stdio, 1064 }); 1065 } else if (isArrayBufferView(stdio) || typeof stdio === 'string') { 1066 if (!sync) { 1067 cleanup(); 1068 throw new ERR_INVALID_SYNC_FORK_INPUT(inspect(stdio)); 1069 } 1070 } else { 1071 // Cleanup 1072 cleanup(); 1073 throw new ERR_INVALID_ARG_VALUE('stdio', stdio); 1074 } 1075 1076 return acc; 1077 }, []); 1078 1079 return { stdio, ipc, ipcFd }; 1080} 1081 1082 1083function getSocketList(type, worker, key) { 1084 const sockets = worker[kChannelHandle].sockets[type]; 1085 let socketList = sockets[key]; 1086 if (!socketList) { 1087 const Construct = type === 'send' ? SocketListSend : SocketListReceive; 1088 socketList = sockets[key] = new Construct(worker, key); 1089 } 1090 return socketList; 1091} 1092 1093 1094function maybeClose(subprocess) { 1095 subprocess._closesGot++; 1096 1097 if (subprocess._closesGot === subprocess._closesNeeded) { 1098 subprocess.emit('close', subprocess.exitCode, subprocess.signalCode); 1099 } 1100} 1101 1102function spawnSync(options) { 1103 const result = spawn_sync.spawn(options); 1104 1105 if (result.output && options.encoding && options.encoding !== 'buffer') { 1106 for (let i = 0; i < result.output.length; i++) { 1107 if (!result.output[i]) 1108 continue; 1109 result.output[i] = result.output[i].toString(options.encoding); 1110 } 1111 } 1112 1113 result.stdout = result.output && result.output[1]; 1114 result.stderr = result.output && result.output[2]; 1115 1116 if (result.error) { 1117 result.error = errnoException(result.error, 'spawnSync ' + options.file); 1118 result.error.path = options.file; 1119 result.error.spawnargs = ArrayPrototypeSlice(options.args, 1); 1120 } 1121 1122 return result; 1123} 1124 1125module.exports = { 1126 ChildProcess, 1127 kChannelHandle, 1128 setupChannel, 1129 getValidStdio, 1130 stdioStringToArray, 1131 spawnSync, 1132}; 1133