1'use strict';
2
3const common = require('../common');
4const {
5  Readable,
6} = require('stream');
7const { deepStrictEqual, rejects, throws, strictEqual } = require('assert');
8
9const { from } = Readable;
10
11const fromAsync = (...args) => from(...args).map(async (x) => x);
12
13const naturals = () => from(async function*() {
14  let i = 1;
15  while (true) {
16    yield i++;
17  }
18}());
19
20{
21  // Synchronous streams
22  (async () => {
23    deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]);
24    deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]);
25    deepStrictEqual(await from([]).drop(2).toArray(), []);
26    deepStrictEqual(await from([]).take(1).toArray(), []);
27    deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]);
28    deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]);
29    deepStrictEqual(await from([1, 2]).take(0).toArray(), []);
30  })().then(common.mustCall());
31  // Asynchronous streams
32  (async () => {
33    deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]);
34    deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]);
35    deepStrictEqual(await fromAsync([]).drop(2).toArray(), []);
36    deepStrictEqual(await fromAsync([]).take(1).toArray(), []);
37    deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]);
38    deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]);
39    deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []);
40  })().then(common.mustCall());
41  // Infinite streams
42  // Asynchronous streams
43  (async () => {
44    deepStrictEqual(await naturals().take(1).toArray(), [1]);
45    deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]);
46    const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
47    deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10);
48    deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]);
49  })().then(common.mustCall());
50}
51
52
53// Don't wait for next item in the original stream when already consumed the requested take amount
54{
55  let reached = false;
56  let resolve;
57  const promise = new Promise((res) => resolve = res);
58
59  const stream = from((async function *() {
60    yield 1;
61    await promise;
62    reached = true;
63    yield 2;
64  })());
65
66  stream.take(1)
67    .toArray()
68    .then(common.mustCall(() => {
69      strictEqual(reached, false);
70    }))
71    .finally(() => resolve());
72}
73
74{
75  // Coercion
76  (async () => {
77    // The spec made me do this ^^
78    deepStrictEqual(await naturals().take('cat').toArray(), []);
79    deepStrictEqual(await naturals().take('2').toArray(), [1, 2]);
80    deepStrictEqual(await naturals().take(true).toArray(), [1]);
81  })().then(common.mustCall());
82}
83
84{
85  // Support for AbortSignal
86  const ac = new AbortController();
87  rejects(
88    Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), {
89      name: 'AbortError',
90    }).then(common.mustCall());
91  rejects(
92    Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), {
93      name: 'AbortError',
94    }).then(common.mustCall());
95  ac.abort();
96}
97
98{
99  // Support for AbortSignal, already aborted
100  const signal = AbortSignal.abort();
101  rejects(
102    Readable.from([1, 2, 3]).take(1, { signal }).toArray(), {
103      name: 'AbortError',
104    }).then(common.mustCall());
105}
106
107{
108  // Error cases
109  const invalidArgs = [
110    -1,
111    -Infinity,
112    -40,
113  ];
114
115  for (const example of invalidArgs) {
116    throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
117  }
118
119  throws(() => Readable.from([1]).drop(1, 1), /ERR_INVALID_ARG_TYPE/);
120  throws(() => Readable.from([1]).drop(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
121
122  throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/);
123  throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
124}
125