11cb0ef41Sopenharmony_ci// Flags: --expose-internals --no-warnings
21cb0ef41Sopenharmony_ci'use strict';
31cb0ef41Sopenharmony_ci
41cb0ef41Sopenharmony_ciconst common = require('../common');
51cb0ef41Sopenharmony_ci
61cb0ef41Sopenharmony_ciconst assert = require('assert');
71cb0ef41Sopenharmony_ci
81cb0ef41Sopenharmony_ciconst {
91cb0ef41Sopenharmony_ci  pipeline,
101cb0ef41Sopenharmony_ci  finished,
111cb0ef41Sopenharmony_ci  Writable,
121cb0ef41Sopenharmony_ci} = require('stream');
131cb0ef41Sopenharmony_ci
141cb0ef41Sopenharmony_ciconst {
151cb0ef41Sopenharmony_ci  ReadableStream,
161cb0ef41Sopenharmony_ci  WritableStream,
171cb0ef41Sopenharmony_ci} = require('stream/web');
181cb0ef41Sopenharmony_ci
191cb0ef41Sopenharmony_ciconst {
201cb0ef41Sopenharmony_ci  newStreamReadableFromReadableStream,
211cb0ef41Sopenharmony_ci} = require('internal/webstreams/adapters');
221cb0ef41Sopenharmony_ci
231cb0ef41Sopenharmony_ciconst {
241cb0ef41Sopenharmony_ci  kState,
251cb0ef41Sopenharmony_ci} = require('internal/webstreams/util');
261cb0ef41Sopenharmony_ci
271cb0ef41Sopenharmony_ciclass MySource {
281cb0ef41Sopenharmony_ci  constructor(value = new Uint8Array(10)) {
291cb0ef41Sopenharmony_ci    this.value = value;
301cb0ef41Sopenharmony_ci  }
311cb0ef41Sopenharmony_ci
321cb0ef41Sopenharmony_ci  start(c) {
331cb0ef41Sopenharmony_ci    this.started = true;
341cb0ef41Sopenharmony_ci    this.controller = c;
351cb0ef41Sopenharmony_ci  }
361cb0ef41Sopenharmony_ci
371cb0ef41Sopenharmony_ci  pull(controller) {
381cb0ef41Sopenharmony_ci    controller.enqueue(this.value);
391cb0ef41Sopenharmony_ci    controller.close();
401cb0ef41Sopenharmony_ci  }
411cb0ef41Sopenharmony_ci
421cb0ef41Sopenharmony_ci  cancel(reason) {
431cb0ef41Sopenharmony_ci    this.canceled = true;
441cb0ef41Sopenharmony_ci    this.cancelReason = reason;
451cb0ef41Sopenharmony_ci  }
461cb0ef41Sopenharmony_ci}
471cb0ef41Sopenharmony_ci
481cb0ef41Sopenharmony_ci{
491cb0ef41Sopenharmony_ci  // Destroying the readable without an error closes
501cb0ef41Sopenharmony_ci  // the readableStream.
511cb0ef41Sopenharmony_ci
521cb0ef41Sopenharmony_ci  const readableStream = new ReadableStream();
531cb0ef41Sopenharmony_ci  const readable = newStreamReadableFromReadableStream(readableStream);
541cb0ef41Sopenharmony_ci
551cb0ef41Sopenharmony_ci  assert(readableStream.locked);
561cb0ef41Sopenharmony_ci
571cb0ef41Sopenharmony_ci  assert.rejects(readableStream.cancel(), {
581cb0ef41Sopenharmony_ci    code: 'ERR_INVALID_STATE',
591cb0ef41Sopenharmony_ci  });
601cb0ef41Sopenharmony_ci  assert.rejects(readableStream.pipeTo(new WritableStream()), {
611cb0ef41Sopenharmony_ci    code: 'ERR_INVALID_STATE',
621cb0ef41Sopenharmony_ci  });
631cb0ef41Sopenharmony_ci  assert.throws(() => readableStream.tee(), {
641cb0ef41Sopenharmony_ci    code: 'ERR_INVALID_STATE',
651cb0ef41Sopenharmony_ci  });
661cb0ef41Sopenharmony_ci  assert.throws(() => readableStream.getReader(), {
671cb0ef41Sopenharmony_ci    code: 'ERR_INVALID_STATE',
681cb0ef41Sopenharmony_ci  });
691cb0ef41Sopenharmony_ci  assert.throws(() => {
701cb0ef41Sopenharmony_ci    readableStream.pipeThrough({
711cb0ef41Sopenharmony_ci      readable: new ReadableStream(),
721cb0ef41Sopenharmony_ci      writable: new WritableStream(),
731cb0ef41Sopenharmony_ci    });
741cb0ef41Sopenharmony_ci  }, {
751cb0ef41Sopenharmony_ci    code: 'ERR_INVALID_STATE',
761cb0ef41Sopenharmony_ci  });
771cb0ef41Sopenharmony_ci
781cb0ef41Sopenharmony_ci  readable.destroy();
791cb0ef41Sopenharmony_ci
801cb0ef41Sopenharmony_ci  readable.on('close', common.mustCall(() => {
811cb0ef41Sopenharmony_ci    assert.strictEqual(readableStream[kState].state, 'closed');
821cb0ef41Sopenharmony_ci  }));
831cb0ef41Sopenharmony_ci}
841cb0ef41Sopenharmony_ci
851cb0ef41Sopenharmony_ci{
861cb0ef41Sopenharmony_ci  // Destroying the readable with an error closes the readableStream
871cb0ef41Sopenharmony_ci  // without error but records the cancel reason in the source.
881cb0ef41Sopenharmony_ci  const error = new Error('boom');
891cb0ef41Sopenharmony_ci  const source = new MySource();
901cb0ef41Sopenharmony_ci  const readableStream = new ReadableStream(source);
911cb0ef41Sopenharmony_ci  const readable = newStreamReadableFromReadableStream(readableStream);
921cb0ef41Sopenharmony_ci
931cb0ef41Sopenharmony_ci  assert(readableStream.locked);
941cb0ef41Sopenharmony_ci
951cb0ef41Sopenharmony_ci  readable.destroy(error);
961cb0ef41Sopenharmony_ci
971cb0ef41Sopenharmony_ci  readable.on('error', common.mustCall((reason) => {
981cb0ef41Sopenharmony_ci    assert.strictEqual(reason, error);
991cb0ef41Sopenharmony_ci  }));
1001cb0ef41Sopenharmony_ci
1011cb0ef41Sopenharmony_ci  readable.on('close', common.mustCall(() => {
1021cb0ef41Sopenharmony_ci    assert.strictEqual(readableStream[kState].state, 'closed');
1031cb0ef41Sopenharmony_ci    assert.strictEqual(source.cancelReason, error);
1041cb0ef41Sopenharmony_ci  }));
1051cb0ef41Sopenharmony_ci}
1061cb0ef41Sopenharmony_ci
1071cb0ef41Sopenharmony_ci{
1081cb0ef41Sopenharmony_ci  // An error in the source causes the readable to error.
1091cb0ef41Sopenharmony_ci  const error = new Error('boom');
1101cb0ef41Sopenharmony_ci  const source = new MySource();
1111cb0ef41Sopenharmony_ci  const readableStream = new ReadableStream(source);
1121cb0ef41Sopenharmony_ci  const readable = newStreamReadableFromReadableStream(readableStream);
1131cb0ef41Sopenharmony_ci
1141cb0ef41Sopenharmony_ci  assert(readableStream.locked);
1151cb0ef41Sopenharmony_ci
1161cb0ef41Sopenharmony_ci  source.controller.error(error);
1171cb0ef41Sopenharmony_ci
1181cb0ef41Sopenharmony_ci  readable.on('error', common.mustCall((reason) => {
1191cb0ef41Sopenharmony_ci    assert.strictEqual(reason, error);
1201cb0ef41Sopenharmony_ci  }));
1211cb0ef41Sopenharmony_ci
1221cb0ef41Sopenharmony_ci  readable.on('close', common.mustCall(() => {
1231cb0ef41Sopenharmony_ci    assert.strictEqual(readableStream[kState].state, 'errored');
1241cb0ef41Sopenharmony_ci  }));
1251cb0ef41Sopenharmony_ci}
1261cb0ef41Sopenharmony_ci
1271cb0ef41Sopenharmony_ci{
1281cb0ef41Sopenharmony_ci  const readableStream = new ReadableStream(new MySource());
1291cb0ef41Sopenharmony_ci  const readable = newStreamReadableFromReadableStream(readableStream);
1301cb0ef41Sopenharmony_ci
1311cb0ef41Sopenharmony_ci  readable.on('data', common.mustCall((chunk) => {
1321cb0ef41Sopenharmony_ci    assert.deepStrictEqual(chunk, Buffer.alloc(10));
1331cb0ef41Sopenharmony_ci  }));
1341cb0ef41Sopenharmony_ci  readable.on('end', common.mustCall());
1351cb0ef41Sopenharmony_ci  readable.on('close', common.mustCall());
1361cb0ef41Sopenharmony_ci  readable.on('error', common.mustNotCall());
1371cb0ef41Sopenharmony_ci}
1381cb0ef41Sopenharmony_ci
1391cb0ef41Sopenharmony_ci{
1401cb0ef41Sopenharmony_ci  const readableStream = new ReadableStream(new MySource('hello'));
1411cb0ef41Sopenharmony_ci  const readable = newStreamReadableFromReadableStream(readableStream, {
1421cb0ef41Sopenharmony_ci    encoding: 'utf8',
1431cb0ef41Sopenharmony_ci  });
1441cb0ef41Sopenharmony_ci
1451cb0ef41Sopenharmony_ci  readable.on('data', common.mustCall((chunk) => {
1461cb0ef41Sopenharmony_ci    assert.strictEqual(chunk, 'hello');
1471cb0ef41Sopenharmony_ci  }));
1481cb0ef41Sopenharmony_ci  readable.on('end', common.mustCall());
1491cb0ef41Sopenharmony_ci  readable.on('close', common.mustCall());
1501cb0ef41Sopenharmony_ci  readable.on('error', common.mustNotCall());
1511cb0ef41Sopenharmony_ci}
1521cb0ef41Sopenharmony_ci
1531cb0ef41Sopenharmony_ci{
1541cb0ef41Sopenharmony_ci  const readableStream = new ReadableStream(new MySource());
1551cb0ef41Sopenharmony_ci  const readable = newStreamReadableFromReadableStream(readableStream, {
1561cb0ef41Sopenharmony_ci    objectMode: true
1571cb0ef41Sopenharmony_ci  });
1581cb0ef41Sopenharmony_ci
1591cb0ef41Sopenharmony_ci  readable.on('data', common.mustCall((chunk) => {
1601cb0ef41Sopenharmony_ci    assert.deepStrictEqual(chunk, new Uint8Array(10));
1611cb0ef41Sopenharmony_ci  }));
1621cb0ef41Sopenharmony_ci  readable.on('end', common.mustCall());
1631cb0ef41Sopenharmony_ci  readable.on('close', common.mustCall());
1641cb0ef41Sopenharmony_ci  readable.on('error', common.mustNotCall());
1651cb0ef41Sopenharmony_ci}
1661cb0ef41Sopenharmony_ci
1671cb0ef41Sopenharmony_ci{
1681cb0ef41Sopenharmony_ci  const ec = new TextEncoder();
1691cb0ef41Sopenharmony_ci  const readable = new ReadableStream({
1701cb0ef41Sopenharmony_ci    start(controller) {
1711cb0ef41Sopenharmony_ci      controller.enqueue(ec.encode('hello'));
1721cb0ef41Sopenharmony_ci      setImmediate(() => {
1731cb0ef41Sopenharmony_ci        controller.enqueue(ec.encode('there'));
1741cb0ef41Sopenharmony_ci        controller.close();
1751cb0ef41Sopenharmony_ci      });
1761cb0ef41Sopenharmony_ci    }
1771cb0ef41Sopenharmony_ci  });
1781cb0ef41Sopenharmony_ci  const streamReadable = newStreamReadableFromReadableStream(readable);
1791cb0ef41Sopenharmony_ci
1801cb0ef41Sopenharmony_ci  finished(streamReadable, common.mustCall());
1811cb0ef41Sopenharmony_ci
1821cb0ef41Sopenharmony_ci  streamReadable.resume();
1831cb0ef41Sopenharmony_ci}
1841cb0ef41Sopenharmony_ci
1851cb0ef41Sopenharmony_ci{
1861cb0ef41Sopenharmony_ci  const ec = new TextEncoder();
1871cb0ef41Sopenharmony_ci  const readable = new ReadableStream({
1881cb0ef41Sopenharmony_ci    start(controller) {
1891cb0ef41Sopenharmony_ci      controller.enqueue(ec.encode('hello'));
1901cb0ef41Sopenharmony_ci      setImmediate(() => {
1911cb0ef41Sopenharmony_ci        controller.enqueue(ec.encode('there'));
1921cb0ef41Sopenharmony_ci        controller.close();
1931cb0ef41Sopenharmony_ci      });
1941cb0ef41Sopenharmony_ci    }
1951cb0ef41Sopenharmony_ci  });
1961cb0ef41Sopenharmony_ci  const streamReadable = newStreamReadableFromReadableStream(readable);
1971cb0ef41Sopenharmony_ci
1981cb0ef41Sopenharmony_ci  finished(streamReadable, common.mustCall());
1991cb0ef41Sopenharmony_ci
2001cb0ef41Sopenharmony_ci  streamReadable.resume();
2011cb0ef41Sopenharmony_ci}
2021cb0ef41Sopenharmony_ci
2031cb0ef41Sopenharmony_ci{
2041cb0ef41Sopenharmony_ci  const ec = new TextEncoder();
2051cb0ef41Sopenharmony_ci  const dc = new TextDecoder();
2061cb0ef41Sopenharmony_ci  const check = ['hello', 'there'];
2071cb0ef41Sopenharmony_ci  const readable = new ReadableStream({
2081cb0ef41Sopenharmony_ci    start(controller) {
2091cb0ef41Sopenharmony_ci      controller.enqueue(ec.encode('hello'));
2101cb0ef41Sopenharmony_ci      setImmediate(() => {
2111cb0ef41Sopenharmony_ci        controller.enqueue(ec.encode('there'));
2121cb0ef41Sopenharmony_ci        controller.close();
2131cb0ef41Sopenharmony_ci      });
2141cb0ef41Sopenharmony_ci    }
2151cb0ef41Sopenharmony_ci  });
2161cb0ef41Sopenharmony_ci  const writable = new Writable({
2171cb0ef41Sopenharmony_ci    write: common.mustCall((chunk, encoding, callback) => {
2181cb0ef41Sopenharmony_ci      assert.strictEqual(dc.decode(chunk), check.shift());
2191cb0ef41Sopenharmony_ci      assert.strictEqual(encoding, 'buffer');
2201cb0ef41Sopenharmony_ci      callback();
2211cb0ef41Sopenharmony_ci    }, 2),
2221cb0ef41Sopenharmony_ci  });
2231cb0ef41Sopenharmony_ci
2241cb0ef41Sopenharmony_ci  const streamReadable = newStreamReadableFromReadableStream(readable);
2251cb0ef41Sopenharmony_ci
2261cb0ef41Sopenharmony_ci  pipeline(streamReadable, writable, common.mustCall());
2271cb0ef41Sopenharmony_ci
2281cb0ef41Sopenharmony_ci  streamReadable.resume();
2291cb0ef41Sopenharmony_ci}
230