1'use strict';
2
3const common = require('../common');
4const {
5  Readable, Transform,
6} = require('stream');
7const assert = require('assert');
8
9{
10  // with async generator
11  const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) {
12    let str = '';
13    for await (const chunk of stream) {
14      str += chunk;
15
16      if (str.length === 2) {
17        yield str;
18        str = '';
19      }
20    }
21  });
22  const result = ['ab', 'cd'];
23  (async () => {
24    for await (const item of stream) {
25      assert.strictEqual(item, result.shift());
26    }
27  })().then(common.mustCall());
28}
29
30{
31  // With Transformer
32  const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({
33    objectMode: true,
34    transform: common.mustCall((chunk, encoding, callback) => {
35      callback(null, chunk);
36    }, 4)
37  }));
38  const result = ['a', 'b', 'c', 'd'];
39  (async () => {
40    for await (const item of stream) {
41      assert.strictEqual(item, result.shift());
42    }
43  })().then(common.mustCall());
44}
45
46{
47  // Throwing an error during `compose` (before waiting for data)
48  const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
49
50    throw new Error('boom');
51  });
52
53  assert.rejects(async () => {
54    for await (const item of stream) {
55      assert.fail('should not reach here, got ' + item);
56    }
57  }, /boom/).then(common.mustCall());
58}
59
60{
61  // Throwing an error during `compose` (when waiting for data)
62  const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) {
63    for await (const chunk of stream) {
64      if (chunk === 3) {
65        throw new Error('boom');
66      }
67      yield chunk;
68    }
69  });
70
71  assert.rejects(
72    stream.toArray(),
73    /boom/,
74  ).then(common.mustCall());
75}
76
77{
78  // Throwing an error during `compose` (after finishing all readable data)
79  const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
80
81    // eslint-disable-next-line no-unused-vars,no-empty
82    for await (const chunk of stream) {
83    }
84
85    throw new Error('boom');
86  });
87  assert.rejects(
88    stream.toArray(),
89    /boom/,
90  ).then(common.mustCall());
91}
92
93{
94  // AbortSignal
95  const ac = new AbortController();
96  const stream = Readable.from([1, 2, 3, 4, 5])
97    .compose(async function *(source) {
98      // Should not reach here
99      for await (const chunk of source) {
100        yield chunk;
101      }
102    }, { signal: ac.signal });
103
104  ac.abort();
105
106  assert.rejects(async () => {
107    for await (const item of stream) {
108      assert.fail('should not reach here, got ' + item);
109    }
110  }, {
111    name: 'AbortError',
112  }).then(common.mustCall());
113}
114
115{
116  assert.throws(
117    () => Readable.from(['a']).compose(Readable.from(['b'])),
118    { code: 'ERR_INVALID_ARG_VALUE' }
119  );
120}
121
122{
123  assert.throws(
124    () => Readable.from(['a']).compose(),
125    { code: 'ERR_INVALID_ARG_TYPE' }
126  );
127}
128