1'use strict';
2
3const common = require('../common');
4const assert = require('assert');
5const { ReadableStream, WritableStream } = require('stream/web');
6const { finished } = require('stream');
7const { finished: finishedPromise } = require('stream/promises');
8
9{
10  const rs = new ReadableStream({
11    start(controller) {
12      controller.enqueue('asd');
13      controller.close();
14    },
15  });
16  finished(rs, common.mustSucceed());
17  async function test() {
18    const values = [];
19    for await (const chunk of rs) {
20      values.push(chunk);
21    }
22    assert.deepStrictEqual(values, ['asd']);
23  }
24  test();
25}
26
27{
28  const rs = new ReadableStream({
29    start(controller) {
30      controller.error(new Error('asd'));
31    }
32  });
33
34  finished(rs, common.mustCall((err) => {
35    assert.strictEqual(err?.message, 'asd');
36  }));
37}
38
39{
40  const rs = new ReadableStream({
41    async start(controller) {
42      throw new Error('asd');
43    }
44  });
45
46  finished(rs, common.mustCall((err) => {
47    assert.strictEqual(err?.message, 'asd');
48  }));
49}
50
51{
52  const rs = new ReadableStream({
53    start(controller) {
54      controller.enqueue('asd');
55      controller.close();
56    }
57  });
58
59  async function test() {
60    const values = [];
61    for await (const chunk of rs) {
62      values.push(chunk);
63    }
64    assert.deepStrictEqual(values, ['asd']);
65  }
66
67  finishedPromise(rs).then(common.mustSucceed());
68
69  test();
70}
71
72{
73  const rs = new ReadableStream({
74    start(controller) {
75      controller.error(new Error('asd'));
76    }
77  });
78
79  finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => {
80    assert.strictEqual(err?.message, 'asd');
81  }));
82}
83
84{
85  const rs = new ReadableStream({
86    async start(controller) {
87      throw new Error('asd');
88    }
89  });
90
91  finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => {
92    assert.strictEqual(err?.message, 'asd');
93  }));
94}
95
96{
97  const rs = new ReadableStream({
98    start(controller) {
99      controller.enqueue('asd');
100      controller.close();
101    }
102  });
103
104  const { 0: s1, 1: s2 } = rs.tee();
105
106  finished(s1, common.mustSucceed());
107  finished(s2, common.mustSucceed());
108
109  async function test(stream) {
110    const values = [];
111    for await (const chunk of stream) {
112      values.push(chunk);
113    }
114    assert.deepStrictEqual(values, ['asd']);
115  }
116
117  Promise.all([
118    test(s1),
119    test(s2),
120  ]).then(common.mustCall());
121}
122
123{
124  const rs = new ReadableStream({
125    start(controller) {
126      controller.error(new Error('asd'));
127    }
128  });
129
130  const { 0: s1, 1: s2 } = rs.tee();
131
132  finished(s1, common.mustCall((err) => {
133    assert.strictEqual(err?.message, 'asd');
134  }));
135
136  finished(s2, common.mustCall((err) => {
137    assert.strictEqual(err?.message, 'asd');
138  }));
139}
140
141{
142  const rs = new ReadableStream({
143    start(controller) {
144      controller.enqueue('asd');
145      controller.close();
146    }
147  });
148
149  finished(rs, common.mustSucceed());
150
151  rs.cancel();
152}
153
154{
155  let str = '';
156  const ws = new WritableStream({
157    write(chunk) {
158      str += chunk;
159    }
160  });
161
162  finished(ws, common.mustSucceed(() => {
163    assert.strictEqual(str, 'asd');
164  }));
165
166  const writer = ws.getWriter();
167  writer.write('asd');
168  writer.close();
169}
170
171{
172  const ws = new WritableStream({
173    async write(chunk) {
174      throw new Error('asd');
175    }
176  });
177
178  finished(ws, common.mustCall((err) => {
179    assert.strictEqual(err?.message, 'asd');
180  }));
181
182  const writer = ws.getWriter();
183  writer.write('asd').catch((err) => {
184    assert.strictEqual(err?.message, 'asd');
185  });
186}
187
188{
189  let str = '';
190  const ws = new WritableStream({
191    write(chunk) {
192      str += chunk;
193    }
194  });
195
196  finishedPromise(ws).then(common.mustSucceed(() => {
197    assert.strictEqual(str, 'asd');
198  }));
199
200  const writer = ws.getWriter();
201  writer.write('asd');
202  writer.close();
203}
204
205{
206  const ws = new WritableStream({
207    write(chunk) { }
208  });
209  finished(ws, common.mustCall((err) => {
210    assert.strictEqual(err?.message, 'asd');
211  }));
212
213  const writer = ws.getWriter();
214  writer.abort(new Error('asd'));
215}
216
217{
218  const ws = new WritableStream({
219    async write(chunk) {
220      throw new Error('asd');
221    }
222  });
223
224  finishedPromise(ws).then(common.mustNotCall()).catch(common.mustCall((err) => {
225    assert.strictEqual(err?.message, 'asd');
226  }));
227
228  const writer = ws.getWriter();
229  writer.write('asd').catch((err) => {
230    assert.strictEqual(err?.message, 'asd');
231  });
232}
233
234{
235  // Check pre-cancelled
236  const signal = new EventTarget();
237  signal.aborted = true;
238
239  const rs = new ReadableStream({
240    start() {}
241  });
242  finished(rs, { signal }, common.mustCall((err) => {
243    assert.strictEqual(err.name, 'AbortError');
244  }));
245}
246
247{
248  // Check cancelled before the stream ends sync.
249  const ac = new AbortController();
250  const { signal } = ac;
251
252  const rs = new ReadableStream({
253    start() {}
254  });
255  finished(rs, { signal }, common.mustCall((err) => {
256    assert.strictEqual(err.name, 'AbortError');
257  }));
258
259  ac.abort();
260}
261
262{
263  // Check cancelled before the stream ends async.
264  const ac = new AbortController();
265  const { signal } = ac;
266
267  const rs = new ReadableStream({
268    start() {}
269  });
270  setTimeout(() => ac.abort(), 1);
271  finished(rs, { signal }, common.mustCall((err) => {
272    assert.strictEqual(err.name, 'AbortError');
273  }));
274}
275
276{
277  // Check cancelled after doesn't throw.
278  const ac = new AbortController();
279  const { signal } = ac;
280
281  const rs = new ReadableStream({
282    start(controller) {
283      controller.enqueue('asd');
284      controller.close();
285    }
286  });
287  finished(rs, { signal }, common.mustSucceed());
288
289  rs.getReader().read().then(common.mustCall((chunk) => {
290    assert.strictEqual(chunk.value, 'asd');
291    setImmediate(() => ac.abort());
292  }));
293}
294
295{
296  // Promisified abort works
297  async function run() {
298    const ac = new AbortController();
299    const { signal } = ac;
300    const rs = new ReadableStream({
301      start() {}
302    });
303    setImmediate(() => ac.abort());
304    await finishedPromise(rs, { signal });
305  }
306
307  assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
308}
309
310{
311  // Promisified pre-aborted works
312  async function run() {
313    const signal = new EventTarget();
314    signal.aborted = true;
315    const rs = new ReadableStream({
316      start() {}
317    });
318    await finishedPromise(rs, { signal });
319  }
320
321  assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
322}
323