1'use strict'; 2 3const common = require('../common'); 4const { 5 Readable, 6} = require('stream'); 7const { deepStrictEqual, rejects, throws, strictEqual } = require('assert'); 8 9const { from } = Readable; 10 11const fromAsync = (...args) => from(...args).map(async (x) => x); 12 13const naturals = () => from(async function*() { 14 let i = 1; 15 while (true) { 16 yield i++; 17 } 18}()); 19 20{ 21 // Synchronous streams 22 (async () => { 23 deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]); 24 deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]); 25 deepStrictEqual(await from([]).drop(2).toArray(), []); 26 deepStrictEqual(await from([]).take(1).toArray(), []); 27 deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]); 28 deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]); 29 deepStrictEqual(await from([1, 2]).take(0).toArray(), []); 30 })().then(common.mustCall()); 31 // Asynchronous streams 32 (async () => { 33 deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]); 34 deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]); 35 deepStrictEqual(await fromAsync([]).drop(2).toArray(), []); 36 deepStrictEqual(await fromAsync([]).take(1).toArray(), []); 37 deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]); 38 deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]); 39 deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []); 40 })().then(common.mustCall()); 41 // Infinite streams 42 // Asynchronous streams 43 (async () => { 44 deepStrictEqual(await naturals().take(1).toArray(), [1]); 45 deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]); 46 const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]; 47 deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10); 48 deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]); 49 })().then(common.mustCall()); 50} 51 52 53// Don't wait for next item in the original stream when already consumed the requested take amount 54{ 55 let reached = false; 56 let resolve; 57 const promise = new Promise((res) => resolve = res); 58 59 const stream = from((async function *() { 60 yield 1; 61 await promise; 62 reached = true; 63 yield 2; 64 })()); 65 66 stream.take(1) 67 .toArray() 68 .then(common.mustCall(() => { 69 strictEqual(reached, false); 70 })) 71 .finally(() => resolve()); 72} 73 74{ 75 // Coercion 76 (async () => { 77 // The spec made me do this ^^ 78 deepStrictEqual(await naturals().take('cat').toArray(), []); 79 deepStrictEqual(await naturals().take('2').toArray(), [1, 2]); 80 deepStrictEqual(await naturals().take(true).toArray(), [1]); 81 })().then(common.mustCall()); 82} 83 84{ 85 // Support for AbortSignal 86 const ac = new AbortController(); 87 rejects( 88 Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), { 89 name: 'AbortError', 90 }).then(common.mustCall()); 91 rejects( 92 Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), { 93 name: 'AbortError', 94 }).then(common.mustCall()); 95 ac.abort(); 96} 97 98{ 99 // Support for AbortSignal, already aborted 100 const signal = AbortSignal.abort(); 101 rejects( 102 Readable.from([1, 2, 3]).take(1, { signal }).toArray(), { 103 name: 'AbortError', 104 }).then(common.mustCall()); 105} 106 107{ 108 // Error cases 109 const invalidArgs = [ 110 -1, 111 -Infinity, 112 -40, 113 ]; 114 115 for (const example of invalidArgs) { 116 throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/); 117 } 118 119 throws(() => Readable.from([1]).drop(1, 1), /ERR_INVALID_ARG_TYPE/); 120 throws(() => Readable.from([1]).drop(1, { signal: true }), /ERR_INVALID_ARG_TYPE/); 121 122 throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/); 123 throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/); 124} 125