1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23
24const common = require('../common');
25const { Readable: R, Writable: W } = require('stream');
26const assert = require('assert');
27
28const EE = require('events').EventEmitter;
29
30class TestReader extends R {
31  constructor(n) {
32    super();
33    this._buffer = Buffer.alloc(n || 100, 'x');
34    this._pos = 0;
35    this._bufs = 10;
36  }
37
38  _read(n) {
39    const max = this._buffer.length - this._pos;
40    n = Math.max(n, 0);
41    const toRead = Math.min(n, max);
42    if (toRead === 0) {
43      // Simulate the read buffer filling up with some more bytes some time
44      // in the future.
45      setTimeout(() => {
46        this._pos = 0;
47        this._bufs -= 1;
48        if (this._bufs <= 0) {
49          // read them all!
50          if (!this.ended)
51            this.push(null);
52        } else {
53          // now we have more.
54          // kinda cheating by calling _read, but whatever,
55          // it's just fake anyway.
56          this._read(n);
57        }
58      }, 10);
59      return;
60    }
61
62    const ret = this._buffer.slice(this._pos, this._pos + toRead);
63    this._pos += toRead;
64    this.push(ret);
65  }
66}
67
68class TestWriter extends EE {
69  constructor() {
70    super();
71    this.received = [];
72    this.flush = false;
73  }
74
75  write(c) {
76    this.received.push(c.toString());
77    this.emit('write', c);
78    return true;
79  }
80
81  end(c) {
82    if (c) this.write(c);
83    this.emit('end', this.received);
84  }
85}
86
87{
88  // Test basic functionality
89  const r = new TestReader(20);
90
91  const reads = [];
92  const expect = [ 'x',
93                   'xx',
94                   'xxx',
95                   'xxxx',
96                   'xxxxx',
97                   'xxxxxxxxx',
98                   'xxxxxxxxxx',
99                   'xxxxxxxxxxxx',
100                   'xxxxxxxxxxxxx',
101                   'xxxxxxxxxxxxxxx',
102                   'xxxxxxxxxxxxxxxxx',
103                   'xxxxxxxxxxxxxxxxxxx',
104                   'xxxxxxxxxxxxxxxxxxxxx',
105                   'xxxxxxxxxxxxxxxxxxxxxxx',
106                   'xxxxxxxxxxxxxxxxxxxxxxxxx',
107                   'xxxxxxxxxxxxxxxxxxxxx' ];
108
109  r.on('end', common.mustCall(function() {
110    assert.deepStrictEqual(reads, expect);
111  }));
112
113  let readSize = 1;
114  function flow() {
115    let res;
116    while (null !== (res = r.read(readSize++))) {
117      reads.push(res.toString());
118    }
119    r.once('readable', flow);
120  }
121
122  flow();
123}
124
125{
126  // Verify pipe
127  const r = new TestReader(5);
128
129  const expect = [ 'xxxxx',
130                   'xxxxx',
131                   'xxxxx',
132                   'xxxxx',
133                   'xxxxx',
134                   'xxxxx',
135                   'xxxxx',
136                   'xxxxx',
137                   'xxxxx',
138                   'xxxxx' ];
139
140  const w = new TestWriter();
141
142  w.on('end', common.mustCall(function(received) {
143    assert.deepStrictEqual(received, expect);
144  }));
145
146  r.pipe(w);
147}
148
149
150[1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) {
151  // Verify unpipe
152  const r = new TestReader(5);
153
154  // Unpipe after 3 writes, then write to another stream instead.
155  let expect = [ 'xxxxx',
156                 'xxxxx',
157                 'xxxxx',
158                 'xxxxx',
159                 'xxxxx',
160                 'xxxxx',
161                 'xxxxx',
162                 'xxxxx',
163                 'xxxxx',
164                 'xxxxx' ];
165  expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
166
167  const w = [ new TestWriter(), new TestWriter() ];
168
169  let writes = SPLIT;
170  w[0].on('write', function() {
171    if (--writes === 0) {
172      r.unpipe();
173      assert.deepStrictEqual(r._readableState.pipes, []);
174      w[0].end();
175      r.pipe(w[1]);
176      assert.deepStrictEqual(r._readableState.pipes, [w[1]]);
177    }
178  });
179
180  let ended = 0;
181
182  w[0].on('end', common.mustCall(function(results) {
183    ended++;
184    assert.strictEqual(ended, 1);
185    assert.deepStrictEqual(results, expect[0]);
186  }));
187
188  w[1].on('end', common.mustCall(function(results) {
189    ended++;
190    assert.strictEqual(ended, 2);
191    assert.deepStrictEqual(results, expect[1]);
192  }));
193
194  r.pipe(w[0]);
195});
196
197
198{
199  // Verify both writers get the same data when piping to destinations
200  const r = new TestReader(5);
201  const w = [ new TestWriter(), new TestWriter() ];
202
203  const expect = [ 'xxxxx',
204                   'xxxxx',
205                   'xxxxx',
206                   'xxxxx',
207                   'xxxxx',
208                   'xxxxx',
209                   'xxxxx',
210                   'xxxxx',
211                   'xxxxx',
212                   'xxxxx' ];
213
214  w[0].on('end', common.mustCall(function(received) {
215    assert.deepStrictEqual(received, expect);
216  }));
217  w[1].on('end', common.mustCall(function(received) {
218    assert.deepStrictEqual(received, expect);
219  }));
220
221  r.pipe(w[0]);
222  r.pipe(w[1]);
223}
224
225
226[1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) {
227  // Verify multi-unpipe
228  const r = new TestReader(5);
229
230  // Unpipe after 3 writes, then write to another stream instead.
231  let expect = [ 'xxxxx',
232                 'xxxxx',
233                 'xxxxx',
234                 'xxxxx',
235                 'xxxxx',
236                 'xxxxx',
237                 'xxxxx',
238                 'xxxxx',
239                 'xxxxx',
240                 'xxxxx' ];
241  expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ];
242
243  const w = [ new TestWriter(), new TestWriter(), new TestWriter() ];
244
245  let writes = SPLIT;
246  w[0].on('write', function() {
247    if (--writes === 0) {
248      r.unpipe();
249      w[0].end();
250      r.pipe(w[1]);
251    }
252  });
253
254  let ended = 0;
255
256  w[0].on('end', common.mustCall(function(results) {
257    ended++;
258    assert.strictEqual(ended, 1);
259    assert.deepStrictEqual(results, expect[0]);
260  }));
261
262  w[1].on('end', common.mustCall(function(results) {
263    ended++;
264    assert.strictEqual(ended, 2);
265    assert.deepStrictEqual(results, expect[1]);
266  }));
267
268  r.pipe(w[0]);
269  r.pipe(w[2]);
270});
271
272{
273  // Verify that back pressure is respected
274  const r = new R({ objectMode: true });
275  r._read = common.mustNotCall();
276  let counter = 0;
277  r.push(['one']);
278  r.push(['two']);
279  r.push(['three']);
280  r.push(['four']);
281  r.push(null);
282
283  const w1 = new R();
284  w1.write = function(chunk) {
285    assert.strictEqual(chunk[0], 'one');
286    w1.emit('close');
287    process.nextTick(function() {
288      r.pipe(w2);
289      r.pipe(w3);
290    });
291  };
292  w1.end = common.mustNotCall();
293
294  r.pipe(w1);
295
296  const expected = ['two', 'two', 'three', 'three', 'four', 'four'];
297
298  const w2 = new R();
299  w2.write = function(chunk) {
300    assert.strictEqual(chunk[0], expected.shift());
301    assert.strictEqual(counter, 0);
302
303    counter++;
304
305    if (chunk[0] === 'four') {
306      return true;
307    }
308
309    setTimeout(function() {
310      counter--;
311      w2.emit('drain');
312    }, 10);
313
314    return false;
315  };
316  w2.end = common.mustCall();
317
318  const w3 = new R();
319  w3.write = function(chunk) {
320    assert.strictEqual(chunk[0], expected.shift());
321    assert.strictEqual(counter, 1);
322
323    counter++;
324
325    if (chunk[0] === 'four') {
326      return true;
327    }
328
329    setTimeout(function() {
330      counter--;
331      w3.emit('drain');
332    }, 50);
333
334    return false;
335  };
336  w3.end = common.mustCall(function() {
337    assert.strictEqual(counter, 2);
338    assert.strictEqual(expected.length, 0);
339  });
340}
341
342{
343  // Verify read(0) behavior for ended streams
344  const r = new R();
345  let written = false;
346  let ended = false;
347  r._read = common.mustNotCall();
348
349  r.push(Buffer.from('foo'));
350  r.push(null);
351
352  const v = r.read(0);
353
354  assert.strictEqual(v, null);
355
356  const w = new R();
357  w.write = function(buffer) {
358    written = true;
359    assert.strictEqual(ended, false);
360    assert.strictEqual(buffer.toString(), 'foo');
361  };
362
363  w.end = common.mustCall(function() {
364    ended = true;
365    assert.strictEqual(written, true);
366  });
367
368  r.pipe(w);
369}
370
371{
372  // Verify synchronous _read ending
373  const r = new R();
374  let called = false;
375  r._read = function(n) {
376    r.push(null);
377  };
378
379  r.once('end', function() {
380    // Verify that this is called before the next tick
381    called = true;
382  });
383
384  r.read();
385
386  process.nextTick(function() {
387    assert.strictEqual(called, true);
388  });
389}
390
391{
392  // Verify that adding readable listeners trigger data flow
393  const r = new R({ highWaterMark: 5 });
394  let onReadable = false;
395  let readCalled = 0;
396
397  r._read = function(n) {
398    if (readCalled++ === 2)
399      r.push(null);
400    else
401      r.push(Buffer.from('asdf'));
402  };
403
404  r.on('readable', function() {
405    onReadable = true;
406    r.read();
407  });
408
409  r.on('end', common.mustCall(function() {
410    assert.strictEqual(readCalled, 3);
411    assert.ok(onReadable);
412  }));
413}
414
415{
416  // Verify that streams are chainable
417  const r = new R();
418  r._read = common.mustCall();
419  const r2 = r.setEncoding('utf8').pause().resume().pause();
420  assert.strictEqual(r, r2);
421}
422
423{
424  // Verify readableEncoding property
425  assert(Object.hasOwn(R.prototype, 'readableEncoding'));
426
427  const r = new R({ encoding: 'utf8' });
428  assert.strictEqual(r.readableEncoding, 'utf8');
429}
430
431{
432  // Verify readableObjectMode property
433  assert(Object.hasOwn(R.prototype, 'readableObjectMode'));
434
435  const r = new R({ objectMode: true });
436  assert.strictEqual(r.readableObjectMode, true);
437}
438
439{
440  // Verify writableObjectMode property
441  assert(Object.hasOwn(W.prototype, 'writableObjectMode'));
442
443  const w = new W({ objectMode: true });
444  assert.strictEqual(w.writableObjectMode, true);
445}
446