1'use strict';
2
3const common = require('../common');
4const assert = require('assert');
5const { Readable, Writable, Transform, pipeline, PassThrough } = require('stream');
6const { pipeline: pipelinePromise } = require('stream/promises');
7const { ReadableStream, WritableStream, TransformStream } = require('stream/web');
8const http = require('http');
9
10{
11  const values = [];
12  let c;
13  const rs = new ReadableStream({
14    start(controller) {
15      c = controller;
16    }
17  });
18  const ws = new WritableStream({
19    write(chunk) {
20      values.push(chunk);
21    }
22  });
23
24  pipeline(rs, ws, common.mustSucceed(() => {
25    assert.deepStrictEqual(values, ['hello', 'world']);
26  }));
27
28  c.enqueue('hello');
29  c.enqueue('world');
30  c.close();
31}
32
33{
34  let c;
35  const rs = new ReadableStream({
36    start(controller) {
37      c = controller;
38    }
39  });
40
41  const ws = new WritableStream({
42    write() { }
43  });
44
45  pipeline(rs, ws, common.mustCall((err) => {
46    assert.strictEqual(err?.message, 'kaboom');
47  }));
48
49  c.error(new Error('kaboom'));
50}
51
52{
53  let c;
54  const values = [];
55  const rs = new ReadableStream({
56    start(controller) {
57      c = controller;
58    }
59  });
60
61  const ts = new TransformStream({
62    transform(chunk, controller) {
63      controller.enqueue(chunk?.toString().toUpperCase());
64    }
65  });
66
67  const ws = new WritableStream({
68    write(chunk) {
69      values.push(chunk?.toString());
70    }
71  });
72
73  pipeline(rs, ts, ws, common.mustSucceed(() => {
74    assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
75  }));
76
77  c.enqueue('hello');
78  c.enqueue('world');
79  c.close();
80}
81
82{
83  function makeTransformStream() {
84    return new TransformStream({
85      transform(chunk, controller) {
86        controller.enqueue(chunk?.toString());
87      }
88    });
89  }
90
91  let c;
92  const rs = new ReadableStream({
93    start(controller) {
94      c = controller;
95    }
96  });
97
98  const ws = new WritableStream({
99    write() { }
100  });
101
102  pipeline(rs,
103           makeTransformStream(),
104           makeTransformStream(),
105           makeTransformStream(),
106           makeTransformStream(),
107           ws,
108           common.mustCall((err) => {
109             assert.strictEqual(err?.message, 'kaboom');
110           }));
111
112  c.error(new Error('kaboom'));
113}
114
115{
116  const values = [];
117
118  const r = new Readable({
119    read() { }
120  });
121
122  const ws = new WritableStream({
123    write(chunk) {
124      values.push(chunk?.toString());
125    }
126  });
127
128  pipeline(r, ws, common.mustSucceed(() => {
129    assert.deepStrictEqual(values, ['helloworld']);
130  }));
131
132  r.push('hello');
133  r.push('world');
134  r.push(null);
135}
136
137{
138  const values = [];
139  let c;
140  const rs = new ReadableStream({
141    start(controller) {
142      c = controller;
143    }
144  });
145
146  const w = new Writable({
147    write(chunk, encoding, callback) {
148      values.push(chunk?.toString());
149      callback();
150    }
151  });
152
153  pipeline(rs, w, common.mustSucceed(() => {
154    assert.deepStrictEqual(values, ['hello', 'world']);
155  }));
156
157  c.enqueue('hello');
158  c.enqueue('world');
159  c.close();
160}
161
162{
163  const values = [];
164  let c;
165  const rs = new ReadableStream({
166    start(controller) {
167      c = controller;
168    }
169  });
170
171  const ws = new WritableStream({
172    write(chunk) {
173      values.push(chunk?.toString());
174    }
175  });
176
177  const t = new Transform({
178    transform(chunk, encoding, callback) {
179      callback(null, chunk?.toString().toUpperCase());
180    }
181  });
182
183  pipeline(rs, t, ws, common.mustSucceed(() => {
184    assert.deepStrictEqual(values, ['HELLOWORLD']);
185  }));
186
187  c.enqueue('hello');
188  c.enqueue('world');
189  c.close();
190}
191
192{
193  const server = http.createServer((req, res) => {
194    const rs = new ReadableStream({
195      start(controller) {
196        controller.enqueue('hello');
197        controller.enqueue('world');
198        controller.close();
199      }
200    });
201    pipeline(rs, res, common.mustSucceed(() => {}));
202  });
203
204  server.listen(0, common.mustCall(() => {
205    const req = http.request({
206      port: server.address().port
207    });
208    req.end();
209    const values = [];
210    req.on('response', (res) => {
211      res.on('data', (chunk) => {
212        values.push(chunk?.toString());
213      });
214      res.on('end', common.mustCall(() => {
215        assert.deepStrictEqual(values, ['hello', 'world']);
216        server.close();
217      }));
218    });
219  }));
220}
221
222{
223  const values = [];
224  const server = http.createServer((req, res) => {
225    const ts = new TransformStream({
226      transform(chunk, controller) {
227        controller.enqueue(chunk?.toString().toUpperCase());
228      }
229    });
230    pipeline(req, ts, res, common.mustSucceed());
231  });
232
233  server.listen(0, () => {
234    const req = http.request({
235      port: server.address().port,
236      method: 'POST',
237    });
238
239
240    const rs = new ReadableStream({
241      start(controller) {
242        controller.enqueue('hello');
243        controller.close();
244      }
245    });
246
247    pipeline(rs, req, common.mustSucceed());
248
249    req.on('response', (res) => {
250      res.on('data', (chunk) => {
251        values.push(chunk?.toString());
252      }
253      );
254      res.on('end', common.mustCall(() => {
255        assert.deepStrictEqual(values, ['HELLO']);
256        server.close();
257      }));
258    });
259  });
260}
261
262{
263  const values = [];
264  let c;
265  const rs = new ReadableStream({
266    start(controller) {
267      c = controller;
268    }
269  });
270  const ws = new WritableStream({
271    write(chunk) {
272      values.push(chunk?.toString());
273    }
274  });
275
276  pipelinePromise(rs, ws).then(common.mustCall(() => {
277    assert.deepStrictEqual(values, ['hello', 'world']);
278  }));
279
280  c.enqueue('hello');
281  c.enqueue('world');
282  c.close();
283}
284
285{
286  let c;
287  const rs = new ReadableStream({
288    start(controller) {
289      c = controller;
290    }
291  });
292
293  const ws = new WritableStream({
294    write() { }
295  });
296
297  pipelinePromise(rs, ws).then(common.mustNotCall()).catch(common.mustCall((err) => {
298    assert.strictEqual(err?.message, 'kaboom');
299  }));
300
301  c.error(new Error('kaboom'));
302}
303
304{
305  const values = [];
306  let c;
307  const rs = new ReadableStream({
308    start(controller) {
309      c = controller;
310    }
311  });
312
313  pipeline(rs, async function(source) {
314    for await (const chunk of source) {
315      values.push(chunk?.toString());
316    }
317  }, common.mustSucceed(() => {
318    assert.deepStrictEqual(values, ['hello', 'world']);
319  }));
320
321  c.enqueue('hello');
322  c.enqueue('world');
323  c.close();
324}
325
326{
327  const rs = new ReadableStream({
328    start() {}
329  });
330
331  pipeline(rs, async function(source) {
332    throw new Error('kaboom');
333  }, (err) => {
334    assert.strictEqual(err?.message, 'kaboom');
335  });
336}
337
338{
339  const values = [];
340  let c;
341  const rs = new ReadableStream({
342    start(controller) {
343      c = controller;
344    }
345  });
346
347  const ts = new TransformStream({
348    transform(chunk, controller) {
349      controller.enqueue(chunk?.toString().toUpperCase());
350    }
351  });
352
353  pipeline(rs, ts, async function(source) {
354    for await (const chunk of source) {
355      values.push(chunk?.toString());
356    }
357  }, common.mustSucceed(() => {
358    assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
359  }));
360
361  c.enqueue('hello');
362  c.enqueue('world');
363  c.close();
364}
365
366{
367  const values = [];
368  let c;
369  const rs = new ReadableStream({
370    start(controller) {
371      c = controller;
372    }
373  });
374
375  const ws = new WritableStream({
376    write(chunk) {
377      values.push(chunk?.toString());
378    }
379  });
380
381  pipeline(rs, async function* (source) {
382    for await (const chunk of source) {
383      yield chunk?.toString().toUpperCase();
384    }
385  }, ws, common.mustSucceed(() => {
386    assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
387  }));
388
389  c.enqueue('hello');
390  c.enqueue('world');
391  c.close();
392}
393
394{
395  let c;
396  const rs = new ReadableStream({
397    start(controller) {
398      c = controller;
399    }
400  });
401
402  const ws = new WritableStream({
403    write(chunk) { }
404  }, { highWaterMark: 0 });
405
406  pipeline(rs, ws, common.mustNotCall());
407
408  for (let i = 0; i < 10; i++) {
409    c.enqueue(`${i}`);
410  }
411  c.close();
412}
413
414{
415  const rs = new ReadableStream({
416    start(controller) {
417      controller.close();
418    }
419  });
420
421  pipeline(rs, new PassThrough(), common.mustSucceed());
422}
423