1// Flags: --expose-internals --no-warnings 2'use strict'; 3 4const common = require('../common'); 5 6const { 7 ReadableStream, 8 WritableStream, 9 TransformStream, 10} = require('stream/web'); 11 12const { 13 Worker 14} = require('worker_threads'); 15 16const { 17 isReadableStream, 18 isReadableByteStreamController, 19} = require('internal/webstreams/readablestream'); 20 21const { 22 isWritableStream, 23} = require('internal/webstreams/writablestream'); 24 25const { 26 isTransformStream, 27} = require('internal/webstreams/transformstream'); 28 29const { 30 kState, 31} = require('internal/webstreams/util'); 32 33const { 34 makeTransferable, 35 kClone, 36 kTransfer, 37 kDeserialize, 38} = require('internal/worker/js_transferable'); 39 40const assert = require('assert'); 41 42const theData = 'hello'; 43 44{ 45 const { port1, port2 } = new MessageChannel(); 46 port1.onmessageerror = common.mustNotCall(); 47 port2.onmessageerror = common.mustNotCall(); 48 49 // This test takes the ReadableStream and transfers it to the 50 // port1 first, then again to port2, which reads the data. 51 // Internally, this sets up a pipelined data flow that is 52 // important to understand in case this test fails.. 53 // 54 // Specifically: 55 // 56 // 1. We start with ReadableStream R1, 57 // 2. Calling port2.postMessage causes a new internal WritableStream W1 58 // and a new ReadableStream R2 to be created, both of which are coupled 59 // to each other via a pair of MessagePorts P1 and P2. 60 // 3. ReadableStream R2 is passed to the port1.onmessage callback as the 61 // data property of the MessageEvent, and R1 is configured to pipeTo W1. 62 // 4. Within port1.onmessage, we transfer ReadableStream R2 to port1, which 63 // creates a new internal WritableStream W2 and a new ReadableStream R3, 64 // both of which are coupled to each other via a pair of MessagePorts 65 // P3 and P4. 66 // 5. ReadableStream R3 is passed to the port2.onmessage callback as the 67 // data property of the MessageEvent, and R2 is configured to pipeTo W2. 68 // 6. Once the reader is attached to R3 in the port2.onmessage callback, 69 // a message is sent along the path: R3 -> P4 -> P3 -> R2 -> P2 -> P1 -> R1 70 // to begin pulling the data. The data is then pushed along the pipeline 71 // R1 -> W1 -> P1 -> P2 -> R2 -> W2 -> P3 -> P4 -> R3 72 // 7. The MessagePorts P1, P2, P3, and P4 serve as a control channel for 73 // passing data and control instructions, potentially across realms, 74 // to the other ReadableStream and WritableStream instances. 75 // 76 // If this test experiences timeouts (hangs without finishing), it's most 77 // likely because the control instructions are somehow broken and the 78 // MessagePorts are not being closed properly or it could be caused by 79 // failing the close R1's controller which signals the end of the data 80 // flow. 81 82 const readable = new ReadableStream({ 83 start: common.mustCall((controller) => { 84 controller.enqueue(theData); 85 controller.close(); 86 }), 87 }); 88 89 port2.onmessage = common.mustCall(({ data }) => { 90 assert(isReadableStream(data)); 91 92 const reader = data.getReader(); 93 reader.read().then(common.mustCall((chunk) => { 94 assert.deepStrictEqual(chunk, { done: false, value: theData }); 95 })); 96 97 port2.close(); 98 }); 99 100 port1.onmessage = common.mustCall(({ data }) => { 101 assert(isReadableStream(data)); 102 assert(!data.locked); 103 port1.postMessage(data, [data]); 104 assert(data.locked); 105 }); 106 107 assert.throws(() => port2.postMessage(readable), { 108 code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST', 109 }); 110 111 port2.postMessage(readable, [readable]); 112 assert(readable.locked); 113} 114 115{ 116 const { port1, port2 } = new MessageChannel(); 117 port1.onmessageerror = common.mustNotCall(); 118 port2.onmessageerror = common.mustNotCall(); 119 120 // This test repeats the test above, but with a readable byte stream. 121 // Note transferring a readable byte stream results in a regular 122 // value-oriented stream on the other side: 123 // https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable 124 125 const theByteData = new Uint8Array([1, 2, 3]); 126 127 const readable = new ReadableStream({ 128 type: 'bytes', 129 start: common.mustCall((controller) => { 130 // `enqueue` will detach its argument's buffer, so clone first 131 controller.enqueue(theByteData.slice()); 132 controller.close(); 133 }), 134 }); 135 assert(isReadableByteStreamController(readable[kState].controller)); 136 137 port2.onmessage = common.mustCall(({ data }) => { 138 assert(isReadableStream(data)); 139 assert(!isReadableByteStreamController(data[kState].controller)); 140 141 const reader = data.getReader(); 142 reader.read().then(common.mustCall((chunk) => { 143 assert.deepStrictEqual(chunk, { done: false, value: theByteData }); 144 })); 145 146 port2.close(); 147 }); 148 149 port1.onmessage = common.mustCall(({ data }) => { 150 assert(isReadableStream(data)); 151 assert(!isReadableByteStreamController(data[kState].controller)); 152 assert(!data.locked); 153 port1.postMessage(data, [data]); 154 assert(data.locked); 155 }); 156 157 assert.throws(() => port2.postMessage(readable), { 158 code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST', 159 }); 160 161 port2.postMessage(readable, [readable]); 162 assert(readable.locked); 163} 164 165{ 166 const { port1, port2 } = new MessageChannel(); 167 port1.onmessageerror = common.mustNotCall(); 168 port2.onmessageerror = common.mustNotCall(); 169 170 // Like the ReadableStream test above, this sets up a pipeline 171 // through which the data flows... 172 // 173 // We start with WritableStream W1, which is transferred to port1. 174 // Doing so creates an internal ReadableStream R1 and WritableStream W2, 175 // which are coupled together with MessagePorts P1 and P2. 176 // The port1.onmessage callback receives WritableStream W2 and 177 // immediately transfers that to port2. Doing so creates an internal 178 // ReadableStream R2 and WritableStream W3, which are coupled together 179 // with MessagePorts P3 and P4. WritableStream W3 is handed off to 180 // port2.onmessage. 181 // 182 // When the writer on port2.onmessage writes the chunk of data, it 183 // gets passed along the pipeline: 184 // W3 -> P4 -> P3 -> R2 -> W2 -> P2 -> P1 -> R1 -> W1 185 186 const writable = new WritableStream({ 187 write: common.mustCall((chunk) => { 188 assert.strictEqual(chunk, theData); 189 }), 190 }); 191 192 port2.onmessage = common.mustCall(({ data }) => { 193 assert(isWritableStream(data)); 194 assert(!data.locked); 195 const writer = data.getWriter(); 196 writer.write(theData).then(common.mustCall()); 197 writer.close(); 198 port2.close(); 199 }); 200 201 port1.onmessage = common.mustCall(({ data }) => { 202 assert(isWritableStream(data)); 203 assert(!data.locked); 204 port1.postMessage(data, [data]); 205 assert(data.locked); 206 }); 207 208 assert.throws(() => port2.postMessage(writable), { 209 code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST', 210 }); 211 212 port2.postMessage(writable, [writable]); 213 assert(writable.locked); 214} 215 216{ 217 const { port1, port2 } = new MessageChannel(); 218 port1.onmessageerror = common.mustNotCall(); 219 port2.onmessageerror = common.mustNotCall(); 220 221 // The data flow here is actually quite complicated, and is a combination 222 // of the WritableStream and ReadableStream examples above. 223 // 224 // We start with TransformStream T1, which creates ReadableStream R1, 225 // and WritableStream W1. 226 // 227 // When T1 is transferred to port1.onmessage, R1 and W1 are individually 228 // transferred. 229 // 230 // When R1 is transferred, it creates internal WritableStream W2, and 231 // new ReadableStream R2, coupled together via MessagePorts P1 and P2. 232 // 233 // When W1 is transferred, it creates internal ReadableStream R3 and 234 // new WritableStream W3, coupled together via MessagePorts P3 and P4. 235 // 236 // A new TransformStream T2 is created that owns ReadableStream R2 and 237 // WritableStream W3. The port1.onmessage callback immediately transfers 238 // that to port2.onmessage. 239 // 240 // When T2 is transferred, R2 and W3 are individually transferred. 241 // 242 // When R2 is transferred, it creates internal WritableStream W4, and 243 // ReadableStream R4, coupled together via MessagePorts P5 and P6. 244 // 245 // When W3 is transferred, it creates internal ReadableStream R5, and 246 // WritableStream W5, coupled together via MessagePorts P7 and P8. 247 // 248 // A new TransformStream T3 is created that owns ReadableStream R4 and 249 // WritableStream W5. 250 // 251 // port1.onmessage then writes a chunk of data. That chunk of data 252 // flows through the pipeline to T1: 253 // 254 // W5 -> P8 -> P7 -> R5 -> W3 -> P4 -> P3 -> R3 -> W1 -> T1 255 // 256 // T1 performs the transformation, then pushes the chunk back out 257 // along the pipeline: 258 // 259 // T1 -> R1 -> W2 -> P1 -> P2 -> R2 -> W4 -> P5 -> P6 -> R4 260 261 const transform = new TransformStream({ 262 transform(chunk, controller) { 263 controller.enqueue(chunk.toUpperCase()); 264 } 265 }); 266 267 port2.onmessage = common.mustCall(({ data }) => { 268 assert(isTransformStream(data)); 269 const writer = data.writable.getWriter(); 270 const reader = data.readable.getReader(); 271 Promise.all([ 272 writer.write(theData), 273 writer.close(), 274 reader.read().then(common.mustCall((result) => { 275 assert(!result.done); 276 assert.strictEqual(result.value, theData.toUpperCase()); 277 })), 278 reader.read().then(common.mustCall((result) => { 279 assert(result.done); 280 })), 281 ]).then(common.mustCall()); 282 port2.close(); 283 }); 284 285 port1.onmessage = common.mustCall(({ data }) => { 286 assert(isTransformStream(data)); 287 assert(!data.readable.locked); 288 assert(!data.writable.locked); 289 port1.postMessage(data, [data]); 290 assert(data.readable.locked); 291 assert(data.writable.locked); 292 }); 293 294 assert.throws(() => port2.postMessage(transform), { 295 code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST', 296 }); 297 298 port2.postMessage(transform, [transform]); 299 assert(transform.readable.locked); 300 assert(transform.writable.locked); 301} 302 303{ 304 const { port1, port2 } = new MessageChannel(); 305 let controller; 306 307 const readable = new ReadableStream({ 308 start(c) { controller = c; }, 309 310 cancel: common.mustCall((error) => { 311 assert.strictEqual(error.code, 25); 312 assert.strictEqual(error.name, 'DataCloneError'); 313 }), 314 }); 315 316 port1.onmessage = ({ data }) => { 317 const reader = data.getReader(); 318 assert.rejects(reader.read(), { 319 code: 25, 320 name: 'DataCloneError', 321 }); 322 port1.close(); 323 }; 324 325 port2.postMessage(readable, [readable]); 326 327 const notActuallyTransferable = makeTransferable({ 328 [kClone]() { 329 return { 330 data: {}, 331 deserializeInfo: 'nothing that will work', 332 }; 333 }, 334 [kDeserialize]: common.mustNotCall(), 335 }); 336 337 controller.enqueue(notActuallyTransferable); 338} 339 340{ 341 const { port1, port2 } = new MessageChannel(); 342 343 const source = { 344 abort: common.mustCall((error) => { 345 process.nextTick(() => { 346 assert.strictEqual(error.code, 25); 347 assert.strictEqual(error.name, 'DataCloneError'); 348 }); 349 }) 350 }; 351 352 const writable = new WritableStream(source); 353 354 const notActuallyTransferable = makeTransferable({ 355 [kClone]() { 356 return { 357 data: {}, 358 deserializeInfo: 'nothing that will work', 359 }; 360 }, 361 [kDeserialize]: common.mustNotCall(), 362 }); 363 364 port1.onmessage = common.mustCall(({ data }) => { 365 const writer = data.getWriter(); 366 367 assert.rejects(writer.closed, { 368 code: 25, 369 name: 'DataCloneError', 370 }); 371 372 writer.write(notActuallyTransferable).then(common.mustCall()); 373 374 port1.close(); 375 }); 376 377 port2.postMessage(writable, [writable]); 378} 379 380{ 381 const error = new Error('boom'); 382 const { port1, port2 } = new MessageChannel(); 383 384 const source = { 385 abort: common.mustCall((reason) => { 386 process.nextTick(() => { 387 assert.deepStrictEqual(reason, error); 388 389 // Reason is a clone of the original error. 390 assert.notStrictEqual(reason, error); 391 }); 392 }), 393 }; 394 395 const writable = new WritableStream(source); 396 397 port1.onmessage = common.mustCall(({ data }) => { 398 const writer = data.getWriter(); 399 400 assert.rejects(writer.closed, error); 401 402 writer.abort(error).then(common.mustCall()); 403 port1.close(); 404 }); 405 406 port2.postMessage(writable, [writable]); 407} 408 409{ 410 const { port1, port2 } = new MessageChannel(); 411 412 const source = { 413 abort: common.mustCall((error) => { 414 process.nextTick(() => { 415 assert.strictEqual(error.code, 25); 416 assert.strictEqual(error.name, 'DataCloneError'); 417 }); 418 }) 419 }; 420 421 const writable = new WritableStream(source); 422 423 port1.onmessage = common.mustCall(({ data }) => { 424 const writer = data.getWriter(); 425 426 const m = new WebAssembly.Memory({ initial: 1 }); 427 428 assert.rejects(writer.abort(m), { 429 code: 25, 430 name: 'DataCloneError', 431 }); 432 port1.close(); 433 }); 434 435 port2.postMessage(writable, [writable]); 436} 437 438{ 439 // Verify that the communication works across worker threads... 440 441 const worker = new Worker(` 442 const { 443 isReadableStream, 444 } = require('internal/webstreams/readablestream'); 445 446 const { 447 parentPort, 448 } = require('worker_threads'); 449 450 const assert = require('assert'); 451 452 const tracker = new assert.CallTracker(); 453 process.on('exit', () => { 454 tracker.verify(); 455 }); 456 457 parentPort.onmessage = tracker.calls(({ data }) => { 458 assert(isReadableStream(data)); 459 const reader = data.getReader(); 460 reader.read().then(tracker.calls((result) => { 461 assert(!result.done); 462 assert(result.value instanceof Uint8Array); 463 })); 464 parentPort.close(); 465 }); 466 parentPort.onmessageerror = () => assert.fail('should not be called'); 467 `, { eval: true }); 468 469 worker.on('error', common.mustNotCall()); 470 471 const readable = new ReadableStream({ 472 start(controller) { 473 controller.enqueue(new Uint8Array(10)); 474 controller.close(); 475 } 476 }); 477 478 worker.postMessage(readable, [readable]); 479} 480 481{ 482 const source = { 483 cancel: common.mustCall(), 484 }; 485 486 const readable = new ReadableStream(source); 487 488 const { port1, port2 } = new MessageChannel(); 489 490 port1.onmessage = common.mustCall(({ data }) => { 491 data.cancel().then(common.mustCall()); 492 port1.close(); 493 }); 494 495 port2.postMessage(readable, [readable]); 496} 497 498{ 499 const source = { 500 cancel: common.mustCall((error) => { 501 process.nextTick(() => { 502 assert.strictEqual(error.code, 25); 503 assert.strictEqual(error.name, 'DataCloneError'); 504 }); 505 }), 506 }; 507 508 const readable = new ReadableStream(source); 509 510 const { port1, port2 } = new MessageChannel(); 511 512 port1.onmessage = common.mustCall(({ data }) => { 513 const m = new WebAssembly.Memory({ initial: 1 }); 514 515 const reader = data.getReader(); 516 517 const cancel = reader.cancel(m); 518 519 reader.closed.then(common.mustCall()); 520 521 assert.rejects(cancel, { 522 code: 25, 523 name: 'DataCloneError', 524 }); 525 526 port1.close(); 527 }); 528 529 port2.postMessage(readable, [readable]); 530} 531 532{ 533 const source = { 534 abort: common.mustCall((error) => { 535 process.nextTick(() => { 536 assert.strictEqual(error.code, 25); 537 assert.strictEqual(error.name, 'DataCloneError'); 538 }); 539 }), 540 }; 541 542 const writable = new WritableStream(source); 543 544 const { port1, port2 } = new MessageChannel(); 545 546 port1.onmessage = common.mustCall(({ data }) => { 547 const m = new WebAssembly.Memory({ initial: 1 }); 548 const writer = data.getWriter(); 549 const write = writer.write(m); 550 assert.rejects(write, { code: 25, name: 'DataCloneError' }); 551 port1.close(); 552 }); 553 554 port2.postMessage(writable, [writable]); 555} 556 557{ 558 const readable = new ReadableStream(); 559 readable.getReader(); 560 assert.throws(() => readable[kTransfer](), { 561 code: 25, 562 name: 'DataCloneError', 563 }); 564 565 const writable = new WritableStream(); 566 writable.getWriter(); 567 assert.throws(() => writable[kTransfer](), { 568 code: 25, 569 name: 'DataCloneError', 570 }); 571} 572