1// Flags: --no-warnings --expose-internals
2'use strict';
3
4const common = require('../common');
5
6const assert = require('assert');
7
8const {
9  WritableStream,
10} = require('stream/web');
11
12const {
13  newStreamWritableFromWritableStream,
14} = require('internal/webstreams/adapters');
15
16const {
17  finished,
18  pipeline,
19  Readable,
20} = require('stream');
21
22const {
23  kState,
24} = require('internal/webstreams/util');
25
26class TestSource {
27  constructor() {
28    this.chunks = [];
29  }
30
31  start(c) {
32    this.controller = c;
33    this.started = true;
34  }
35
36  write(chunk) {
37    this.chunks.push(chunk);
38  }
39
40  close() {
41    this.closed = true;
42  }
43
44  abort(reason) {
45    this.abortReason = reason;
46  }
47}
48
49[1, {}, false, []].forEach((arg) => {
50  assert.throws(() => newStreamWritableFromWritableStream(arg), {
51    code: 'ERR_INVALID_ARG_TYPE',
52  });
53});
54
55{
56  // Ending the stream.Writable should close the writableStream
57  const source = new TestSource();
58  const writableStream = new WritableStream(source);
59  const writable = newStreamWritableFromWritableStream(writableStream);
60
61  assert(writableStream.locked);
62
63  writable.end('chunk');
64
65  writable.on('close', common.mustCall(() => {
66    assert(writableStream.locked);
67    assert.strictEqual(writableStream[kState].state, 'closed');
68    assert.strictEqual(source.chunks.length, 1);
69    assert.deepStrictEqual(source.chunks[0], Buffer.from('chunk'));
70  }));
71}
72
73{
74  // Destroying the stream.Writable without an error should close
75  // the writableStream with no error.
76  const source = new TestSource();
77  const writableStream = new WritableStream(source);
78  const writable = newStreamWritableFromWritableStream(writableStream);
79
80  assert(writableStream.locked);
81
82  writable.destroy();
83
84  writable.on('close', common.mustCall(() => {
85    assert(writableStream.locked);
86    assert.strictEqual(writableStream[kState].state, 'closed');
87    assert.strictEqual(source.chunks.length, 0);
88  }));
89}
90
91{
92  // Destroying the stream.Writable with an error should error
93  // the writableStream
94  const error = new Error('boom');
95  const source = new TestSource();
96  const writableStream = new WritableStream(source);
97  const writable = newStreamWritableFromWritableStream(writableStream);
98
99  assert(writableStream.locked);
100
101  writable.destroy(error);
102
103  writable.on('error', common.mustCall((reason) => {
104    assert.strictEqual(reason, error);
105  }));
106
107  writable.on('close', common.mustCall(() => {
108    assert(writableStream.locked);
109    assert.strictEqual(writableStream[kState].state, 'errored');
110    assert.strictEqual(writableStream[kState].storedError, error);
111    assert.strictEqual(source.chunks.length, 0);
112  }));
113}
114
115{
116  // Attempting to close, abort, or getWriter on writableStream
117  // should fail because it is locked. An internal error in
118  // writableStream should error the writable.
119  const error = new Error('boom');
120  const source = new TestSource();
121  const writableStream = new WritableStream(source);
122  const writable = newStreamWritableFromWritableStream(writableStream);
123
124  assert(writableStream.locked);
125
126  assert.rejects(writableStream.close(), {
127    code: 'ERR_INVALID_STATE',
128  });
129
130  assert.rejects(writableStream.abort(), {
131    code: 'ERR_INVALID_STATE',
132  });
133
134  assert.throws(() => writableStream.getWriter(), {
135    code: 'ERR_INVALID_STATE',
136  });
137
138  writable.on('error', common.mustCall((reason) => {
139    assert.strictEqual(error, reason);
140  }));
141
142  source.controller.error(error);
143}
144
145{
146  const source = new TestSource();
147  const writableStream = new WritableStream(source);
148  const writable = newStreamWritableFromWritableStream(writableStream);
149
150  writable.on('error', common.mustNotCall());
151  writable.on('finish', common.mustCall());
152  writable.on('close', common.mustCall(() => {
153    assert.strictEqual(source.chunks.length, 1);
154    assert.deepStrictEqual(source.chunks[0], Buffer.from('hello'));
155  }));
156
157  writable.write('hello', common.mustCall());
158  writable.end();
159}
160
161{
162  const source = new TestSource();
163  const writableStream = new WritableStream(source);
164  const writable =
165    newStreamWritableFromWritableStream(writableStream, {
166      decodeStrings: false,
167    });
168
169  writable.on('error', common.mustNotCall());
170  writable.on('finish', common.mustCall());
171  writable.on('close', common.mustCall(() => {
172    assert.strictEqual(source.chunks.length, 1);
173    assert.strictEqual(source.chunks[0], 'hello');
174  }));
175
176  writable.write('hello', common.mustCall());
177  writable.end();
178}
179
180{
181  const source = new TestSource();
182  const writableStream = new WritableStream(source);
183  const writable =
184    newStreamWritableFromWritableStream(
185      writableStream, {
186        objectMode: true
187      });
188  assert(writable.writableObjectMode);
189
190  writable.on('error', common.mustNotCall());
191  writable.on('finish', common.mustCall());
192  writable.on('close', common.mustCall(() => {
193    assert.strictEqual(source.chunks.length, 1);
194    assert.strictEqual(source.chunks[0], 'hello');
195  }));
196
197  writable.write('hello', common.mustCall());
198  writable.end();
199}
200
201{
202  const writableStream = new WritableStream({
203    write: common.mustCall(5),
204    close: common.mustCall(),
205  });
206  const writable = newStreamWritableFromWritableStream(writableStream);
207
208  finished(writable, common.mustCall());
209
210  writable.write('hello');
211  writable.write('hello');
212  writable.write('hello');
213  writable.write('world');
214  writable.write('world');
215  writable.end();
216}
217
218{
219  const writableStream = new WritableStream({
220    write: common.mustCall(2),
221    close: common.mustCall(),
222  });
223  const writable = newStreamWritableFromWritableStream(writableStream);
224
225  const readable = new Readable({
226    read() {
227      readable.push(Buffer.from('hello'));
228      readable.push(Buffer.from('world'));
229      readable.push(null);
230    }
231  });
232
233  pipeline(readable, writable, common.mustCall());
234}
235