1// Flags: --expose-internals --no-warnings
2'use strict';
3
4const common = require('../common');
5
6const {
7  ReadableStream,
8  WritableStream,
9  TransformStream,
10} = require('stream/web');
11
12const {
13  Worker
14} = require('worker_threads');
15
16const {
17  isReadableStream,
18  isReadableByteStreamController,
19} = require('internal/webstreams/readablestream');
20
21const {
22  isWritableStream,
23} = require('internal/webstreams/writablestream');
24
25const {
26  isTransformStream,
27} = require('internal/webstreams/transformstream');
28
29const {
30  kState,
31} = require('internal/webstreams/util');
32
33const {
34  makeTransferable,
35  kClone,
36  kTransfer,
37  kDeserialize,
38} = require('internal/worker/js_transferable');
39
40const assert = require('assert');
41
42const theData = 'hello';
43
44{
45  const { port1, port2 } = new MessageChannel();
46  port1.onmessageerror = common.mustNotCall();
47  port2.onmessageerror = common.mustNotCall();
48
49  // This test takes the ReadableStream and transfers it to the
50  // port1 first, then again to port2, which reads the data.
51  // Internally, this sets up a pipelined data flow that is
52  // important to understand in case this test fails..
53  //
54  // Specifically:
55  //
56  // 1. We start with ReadableStream R1,
57  // 2. Calling port2.postMessage causes a new internal WritableStream W1
58  //    and a new ReadableStream R2 to be created, both of which are coupled
59  //    to each other via a pair of MessagePorts P1 and P2.
60  // 3. ReadableStream R2 is passed to the port1.onmessage callback as the
61  //    data property of the MessageEvent, and R1 is configured to pipeTo W1.
62  // 4. Within port1.onmessage, we transfer ReadableStream R2 to port1, which
63  //    creates a new internal WritableStream W2 and a new ReadableStream R3,
64  //    both of which are coupled to each other via a pair of MessagePorts
65  //    P3 and P4.
66  // 5. ReadableStream R3 is passed to the port2.onmessage callback as the
67  //    data property of the MessageEvent, and R2 is configured to pipeTo W2.
68  // 6. Once the reader is attached to R3 in the port2.onmessage callback,
69  //    a message is sent along the path: R3 -> P4 -> P3 -> R2 -> P2 -> P1 -> R1
70  //    to begin pulling the data. The data is then pushed along the pipeline
71  //    R1 -> W1 -> P1 -> P2 -> R2 -> W2 -> P3 -> P4 -> R3
72  // 7. The MessagePorts P1, P2, P3, and P4 serve as a control channel for
73  //    passing data and control instructions, potentially across realms,
74  //    to the other ReadableStream and WritableStream instances.
75  //
76  // If this test experiences timeouts (hangs without finishing), it's most
77  // likely because the control instructions are somehow broken and the
78  // MessagePorts are not being closed properly or it could be caused by
79  // failing the close R1's controller which signals the end of the data
80  // flow.
81
82  const readable = new ReadableStream({
83    start: common.mustCall((controller) => {
84      controller.enqueue(theData);
85      controller.close();
86    }),
87  });
88
89  port2.onmessage = common.mustCall(({ data }) => {
90    assert(isReadableStream(data));
91
92    const reader = data.getReader();
93    reader.read().then(common.mustCall((chunk) => {
94      assert.deepStrictEqual(chunk, { done: false, value: theData });
95    }));
96
97    port2.close();
98  });
99
100  port1.onmessage = common.mustCall(({ data }) => {
101    assert(isReadableStream(data));
102    assert(!data.locked);
103    port1.postMessage(data, [data]);
104    assert(data.locked);
105  });
106
107  assert.throws(() => port2.postMessage(readable), {
108    code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST',
109  });
110
111  port2.postMessage(readable, [readable]);
112  assert(readable.locked);
113}
114
115{
116  const { port1, port2 } = new MessageChannel();
117  port1.onmessageerror = common.mustNotCall();
118  port2.onmessageerror = common.mustNotCall();
119
120  // This test repeats the test above, but with a readable byte stream.
121  // Note transferring a readable byte stream results in a regular
122  // value-oriented stream on the other side:
123  // https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable
124
125  const theByteData = new Uint8Array([1, 2, 3]);
126
127  const readable = new ReadableStream({
128    type: 'bytes',
129    start: common.mustCall((controller) => {
130      // `enqueue` will detach its argument's buffer, so clone first
131      controller.enqueue(theByteData.slice());
132      controller.close();
133    }),
134  });
135  assert(isReadableByteStreamController(readable[kState].controller));
136
137  port2.onmessage = common.mustCall(({ data }) => {
138    assert(isReadableStream(data));
139    assert(!isReadableByteStreamController(data[kState].controller));
140
141    const reader = data.getReader();
142    reader.read().then(common.mustCall((chunk) => {
143      assert.deepStrictEqual(chunk, { done: false, value: theByteData });
144    }));
145
146    port2.close();
147  });
148
149  port1.onmessage = common.mustCall(({ data }) => {
150    assert(isReadableStream(data));
151    assert(!isReadableByteStreamController(data[kState].controller));
152    assert(!data.locked);
153    port1.postMessage(data, [data]);
154    assert(data.locked);
155  });
156
157  assert.throws(() => port2.postMessage(readable), {
158    code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST',
159  });
160
161  port2.postMessage(readable, [readable]);
162  assert(readable.locked);
163}
164
165{
166  const { port1, port2 } = new MessageChannel();
167  port1.onmessageerror = common.mustNotCall();
168  port2.onmessageerror = common.mustNotCall();
169
170  // Like the ReadableStream test above, this sets up a pipeline
171  // through which the data flows...
172  //
173  // We start with WritableStream W1, which is transferred to port1.
174  // Doing so creates an internal ReadableStream R1 and WritableStream W2,
175  // which are coupled together with MessagePorts P1 and P2.
176  // The port1.onmessage callback receives WritableStream W2 and
177  // immediately transfers that to port2. Doing so creates an internal
178  // ReadableStream R2 and WritableStream W3, which are coupled together
179  // with MessagePorts P3 and P4. WritableStream W3 is handed off to
180  // port2.onmessage.
181  //
182  // When the writer on port2.onmessage writes the chunk of data, it
183  // gets passed along the pipeline:
184  // W3 -> P4 -> P3 -> R2 -> W2 -> P2 -> P1 -> R1 -> W1
185
186  const writable = new WritableStream({
187    write: common.mustCall((chunk) => {
188      assert.strictEqual(chunk, theData);
189    }),
190  });
191
192  port2.onmessage = common.mustCall(({ data }) => {
193    assert(isWritableStream(data));
194    assert(!data.locked);
195    const writer = data.getWriter();
196    writer.write(theData).then(common.mustCall());
197    writer.close();
198    port2.close();
199  });
200
201  port1.onmessage = common.mustCall(({ data }) => {
202    assert(isWritableStream(data));
203    assert(!data.locked);
204    port1.postMessage(data, [data]);
205    assert(data.locked);
206  });
207
208  assert.throws(() => port2.postMessage(writable), {
209    code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST',
210  });
211
212  port2.postMessage(writable, [writable]);
213  assert(writable.locked);
214}
215
216{
217  const { port1, port2 } = new MessageChannel();
218  port1.onmessageerror = common.mustNotCall();
219  port2.onmessageerror = common.mustNotCall();
220
221  // The data flow here is actually quite complicated, and is a combination
222  // of the WritableStream and ReadableStream examples above.
223  //
224  // We start with TransformStream T1, which creates ReadableStream R1,
225  // and WritableStream W1.
226  //
227  // When T1 is transferred to port1.onmessage, R1 and W1 are individually
228  // transferred.
229  //
230  // When R1 is transferred, it creates internal WritableStream W2, and
231  // new ReadableStream R2, coupled together via MessagePorts P1 and P2.
232  //
233  // When W1 is transferred, it creates internal ReadableStream R3 and
234  // new WritableStream W3, coupled together via MessagePorts P3 and P4.
235  //
236  // A new TransformStream T2 is created that owns ReadableStream R2 and
237  // WritableStream W3. The port1.onmessage callback immediately transfers
238  // that to port2.onmessage.
239  //
240  // When T2 is transferred, R2 and W3 are individually transferred.
241  //
242  // When R2 is transferred, it creates internal WritableStream W4, and
243  // ReadableStream R4, coupled together via MessagePorts P5 and P6.
244  //
245  // When W3 is transferred, it creates internal ReadableStream R5, and
246  // WritableStream W5, coupled together via MessagePorts P7 and P8.
247  //
248  // A new TransformStream T3 is created that owns ReadableStream R4 and
249  // WritableStream W5.
250  //
251  // port1.onmessage then writes a chunk of data. That chunk of data
252  // flows through the pipeline to T1:
253  //
254  // W5 -> P8 -> P7 -> R5 -> W3 -> P4 -> P3 -> R3 -> W1 -> T1
255  //
256  // T1 performs the transformation, then pushes the chunk back out
257  // along the pipeline:
258  //
259  // T1 -> R1 -> W2 -> P1 -> P2 -> R2 -> W4 -> P5 -> P6 -> R4
260
261  const transform = new TransformStream({
262    transform(chunk, controller) {
263      controller.enqueue(chunk.toUpperCase());
264    }
265  });
266
267  port2.onmessage = common.mustCall(({ data }) => {
268    assert(isTransformStream(data));
269    const writer = data.writable.getWriter();
270    const reader = data.readable.getReader();
271    Promise.all([
272      writer.write(theData),
273      writer.close(),
274      reader.read().then(common.mustCall((result) => {
275        assert(!result.done);
276        assert.strictEqual(result.value, theData.toUpperCase());
277      })),
278      reader.read().then(common.mustCall((result) => {
279        assert(result.done);
280      })),
281    ]).then(common.mustCall());
282    port2.close();
283  });
284
285  port1.onmessage = common.mustCall(({ data }) => {
286    assert(isTransformStream(data));
287    assert(!data.readable.locked);
288    assert(!data.writable.locked);
289    port1.postMessage(data, [data]);
290    assert(data.readable.locked);
291    assert(data.writable.locked);
292  });
293
294  assert.throws(() => port2.postMessage(transform), {
295    code: 'ERR_MISSING_TRANSFERABLE_IN_TRANSFER_LIST',
296  });
297
298  port2.postMessage(transform, [transform]);
299  assert(transform.readable.locked);
300  assert(transform.writable.locked);
301}
302
303{
304  const { port1, port2 } = new MessageChannel();
305  let controller;
306
307  const readable = new ReadableStream({
308    start(c) { controller = c; },
309
310    cancel: common.mustCall((error) => {
311      assert.strictEqual(error.code, 25);
312      assert.strictEqual(error.name, 'DataCloneError');
313    }),
314  });
315
316  port1.onmessage = ({ data }) => {
317    const reader = data.getReader();
318    assert.rejects(reader.read(), {
319      code: 25,
320      name: 'DataCloneError',
321    });
322    port1.close();
323  };
324
325  port2.postMessage(readable, [readable]);
326
327  const notActuallyTransferable = makeTransferable({
328    [kClone]() {
329      return {
330        data: {},
331        deserializeInfo: 'nothing that will work',
332      };
333    },
334    [kDeserialize]: common.mustNotCall(),
335  });
336
337  controller.enqueue(notActuallyTransferable);
338}
339
340{
341  const { port1, port2 } = new MessageChannel();
342
343  const source = {
344    abort: common.mustCall((error) => {
345      process.nextTick(() => {
346        assert.strictEqual(error.code, 25);
347        assert.strictEqual(error.name, 'DataCloneError');
348      });
349    })
350  };
351
352  const writable = new WritableStream(source);
353
354  const notActuallyTransferable = makeTransferable({
355    [kClone]() {
356      return {
357        data: {},
358        deserializeInfo: 'nothing that will work',
359      };
360    },
361    [kDeserialize]: common.mustNotCall(),
362  });
363
364  port1.onmessage = common.mustCall(({ data }) => {
365    const writer = data.getWriter();
366
367    assert.rejects(writer.closed, {
368      code: 25,
369      name: 'DataCloneError',
370    });
371
372    writer.write(notActuallyTransferable).then(common.mustCall());
373
374    port1.close();
375  });
376
377  port2.postMessage(writable, [writable]);
378}
379
380{
381  const error = new Error('boom');
382  const { port1, port2 } = new MessageChannel();
383
384  const source = {
385    abort: common.mustCall((reason) => {
386      process.nextTick(() => {
387        assert.deepStrictEqual(reason, error);
388
389        // Reason is a clone of the original error.
390        assert.notStrictEqual(reason, error);
391      });
392    }),
393  };
394
395  const writable = new WritableStream(source);
396
397  port1.onmessage = common.mustCall(({ data }) => {
398    const writer = data.getWriter();
399
400    assert.rejects(writer.closed, error);
401
402    writer.abort(error).then(common.mustCall());
403    port1.close();
404  });
405
406  port2.postMessage(writable, [writable]);
407}
408
409{
410  const { port1, port2 } = new MessageChannel();
411
412  const source = {
413    abort: common.mustCall((error) => {
414      process.nextTick(() => {
415        assert.strictEqual(error.code, 25);
416        assert.strictEqual(error.name, 'DataCloneError');
417      });
418    })
419  };
420
421  const writable = new WritableStream(source);
422
423  port1.onmessage = common.mustCall(({ data }) => {
424    const writer = data.getWriter();
425
426    const m = new WebAssembly.Memory({ initial: 1 });
427
428    assert.rejects(writer.abort(m), {
429      code: 25,
430      name: 'DataCloneError',
431    });
432    port1.close();
433  });
434
435  port2.postMessage(writable, [writable]);
436}
437
438{
439  // Verify that the communication works across worker threads...
440
441  const worker = new Worker(`
442    const {
443      isReadableStream,
444    } = require('internal/webstreams/readablestream');
445
446    const {
447      parentPort,
448    } = require('worker_threads');
449
450    const assert = require('assert');
451
452    const tracker = new assert.CallTracker();
453    process.on('exit', () => {
454      tracker.verify();
455    });
456
457    parentPort.onmessage = tracker.calls(({ data }) => {
458      assert(isReadableStream(data));
459      const reader = data.getReader();
460      reader.read().then(tracker.calls((result) => {
461        assert(!result.done);
462        assert(result.value instanceof Uint8Array);
463      }));
464      parentPort.close();
465    });
466    parentPort.onmessageerror = () => assert.fail('should not be called');
467  `, { eval: true });
468
469  worker.on('error', common.mustNotCall());
470
471  const readable = new ReadableStream({
472    start(controller) {
473      controller.enqueue(new Uint8Array(10));
474      controller.close();
475    }
476  });
477
478  worker.postMessage(readable, [readable]);
479}
480
481{
482  const source = {
483    cancel: common.mustCall(),
484  };
485
486  const readable = new ReadableStream(source);
487
488  const { port1, port2 } = new MessageChannel();
489
490  port1.onmessage = common.mustCall(({ data }) => {
491    data.cancel().then(common.mustCall());
492    port1.close();
493  });
494
495  port2.postMessage(readable, [readable]);
496}
497
498{
499  const source = {
500    cancel: common.mustCall((error) => {
501      process.nextTick(() => {
502        assert.strictEqual(error.code, 25);
503        assert.strictEqual(error.name, 'DataCloneError');
504      });
505    }),
506  };
507
508  const readable = new ReadableStream(source);
509
510  const { port1, port2 } = new MessageChannel();
511
512  port1.onmessage = common.mustCall(({ data }) => {
513    const m = new WebAssembly.Memory({ initial: 1 });
514
515    const reader = data.getReader();
516
517    const cancel = reader.cancel(m);
518
519    reader.closed.then(common.mustCall());
520
521    assert.rejects(cancel, {
522      code: 25,
523      name: 'DataCloneError',
524    });
525
526    port1.close();
527  });
528
529  port2.postMessage(readable, [readable]);
530}
531
532{
533  const source = {
534    abort: common.mustCall((error) => {
535      process.nextTick(() => {
536        assert.strictEqual(error.code, 25);
537        assert.strictEqual(error.name, 'DataCloneError');
538      });
539    }),
540  };
541
542  const writable = new WritableStream(source);
543
544  const { port1, port2 } = new MessageChannel();
545
546  port1.onmessage = common.mustCall(({ data }) => {
547    const m = new WebAssembly.Memory({ initial: 1 });
548    const writer = data.getWriter();
549    const write = writer.write(m);
550    assert.rejects(write, { code: 25, name: 'DataCloneError' });
551    port1.close();
552  });
553
554  port2.postMessage(writable, [writable]);
555}
556
557{
558  const readable = new ReadableStream();
559  readable.getReader();
560  assert.throws(() => readable[kTransfer](), {
561    code: 25,
562    name: 'DataCloneError',
563  });
564
565  const writable = new WritableStream();
566  writable.getWriter();
567  assert.throws(() => writable[kTransfer](), {
568    code: 25,
569    name: 'DataCloneError',
570  });
571}
572