1'use strict'; 2 3const common = require('../common'); 4const assert = require('assert'); 5const { ReadableStream, WritableStream } = require('stream/web'); 6const { finished } = require('stream'); 7const { finished: finishedPromise } = require('stream/promises'); 8 9{ 10 const rs = new ReadableStream({ 11 start(controller) { 12 controller.enqueue('asd'); 13 controller.close(); 14 }, 15 }); 16 finished(rs, common.mustSucceed()); 17 async function test() { 18 const values = []; 19 for await (const chunk of rs) { 20 values.push(chunk); 21 } 22 assert.deepStrictEqual(values, ['asd']); 23 } 24 test(); 25} 26 27{ 28 const rs = new ReadableStream({ 29 start(controller) { 30 controller.error(new Error('asd')); 31 } 32 }); 33 34 finished(rs, common.mustCall((err) => { 35 assert.strictEqual(err?.message, 'asd'); 36 })); 37} 38 39{ 40 const rs = new ReadableStream({ 41 async start(controller) { 42 throw new Error('asd'); 43 } 44 }); 45 46 finished(rs, common.mustCall((err) => { 47 assert.strictEqual(err?.message, 'asd'); 48 })); 49} 50 51{ 52 const rs = new ReadableStream({ 53 start(controller) { 54 controller.enqueue('asd'); 55 controller.close(); 56 } 57 }); 58 59 async function test() { 60 const values = []; 61 for await (const chunk of rs) { 62 values.push(chunk); 63 } 64 assert.deepStrictEqual(values, ['asd']); 65 } 66 67 finishedPromise(rs).then(common.mustSucceed()); 68 69 test(); 70} 71 72{ 73 const rs = new ReadableStream({ 74 start(controller) { 75 controller.error(new Error('asd')); 76 } 77 }); 78 79 finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => { 80 assert.strictEqual(err?.message, 'asd'); 81 })); 82} 83 84{ 85 const rs = new ReadableStream({ 86 async start(controller) { 87 throw new Error('asd'); 88 } 89 }); 90 91 finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => { 92 assert.strictEqual(err?.message, 'asd'); 93 })); 94} 95 96{ 97 const rs = new ReadableStream({ 98 start(controller) { 99 controller.enqueue('asd'); 100 controller.close(); 101 } 102 }); 103 104 const { 0: s1, 1: s2 } = rs.tee(); 105 106 finished(s1, common.mustSucceed()); 107 finished(s2, common.mustSucceed()); 108 109 async function test(stream) { 110 const values = []; 111 for await (const chunk of stream) { 112 values.push(chunk); 113 } 114 assert.deepStrictEqual(values, ['asd']); 115 } 116 117 Promise.all([ 118 test(s1), 119 test(s2), 120 ]).then(common.mustCall()); 121} 122 123{ 124 const rs = new ReadableStream({ 125 start(controller) { 126 controller.error(new Error('asd')); 127 } 128 }); 129 130 const { 0: s1, 1: s2 } = rs.tee(); 131 132 finished(s1, common.mustCall((err) => { 133 assert.strictEqual(err?.message, 'asd'); 134 })); 135 136 finished(s2, common.mustCall((err) => { 137 assert.strictEqual(err?.message, 'asd'); 138 })); 139} 140 141{ 142 const rs = new ReadableStream({ 143 start(controller) { 144 controller.enqueue('asd'); 145 controller.close(); 146 } 147 }); 148 149 finished(rs, common.mustSucceed()); 150 151 rs.cancel(); 152} 153 154{ 155 let str = ''; 156 const ws = new WritableStream({ 157 write(chunk) { 158 str += chunk; 159 } 160 }); 161 162 finished(ws, common.mustSucceed(() => { 163 assert.strictEqual(str, 'asd'); 164 })); 165 166 const writer = ws.getWriter(); 167 writer.write('asd'); 168 writer.close(); 169} 170 171{ 172 const ws = new WritableStream({ 173 async write(chunk) { 174 throw new Error('asd'); 175 } 176 }); 177 178 finished(ws, common.mustCall((err) => { 179 assert.strictEqual(err?.message, 'asd'); 180 })); 181 182 const writer = ws.getWriter(); 183 writer.write('asd').catch((err) => { 184 assert.strictEqual(err?.message, 'asd'); 185 }); 186} 187 188{ 189 let str = ''; 190 const ws = new WritableStream({ 191 write(chunk) { 192 str += chunk; 193 } 194 }); 195 196 finishedPromise(ws).then(common.mustSucceed(() => { 197 assert.strictEqual(str, 'asd'); 198 })); 199 200 const writer = ws.getWriter(); 201 writer.write('asd'); 202 writer.close(); 203} 204 205{ 206 const ws = new WritableStream({ 207 write(chunk) { } 208 }); 209 finished(ws, common.mustCall((err) => { 210 assert.strictEqual(err?.message, 'asd'); 211 })); 212 213 const writer = ws.getWriter(); 214 writer.abort(new Error('asd')); 215} 216 217{ 218 const ws = new WritableStream({ 219 async write(chunk) { 220 throw new Error('asd'); 221 } 222 }); 223 224 finishedPromise(ws).then(common.mustNotCall()).catch(common.mustCall((err) => { 225 assert.strictEqual(err?.message, 'asd'); 226 })); 227 228 const writer = ws.getWriter(); 229 writer.write('asd').catch((err) => { 230 assert.strictEqual(err?.message, 'asd'); 231 }); 232} 233 234{ 235 // Check pre-cancelled 236 const signal = new EventTarget(); 237 signal.aborted = true; 238 239 const rs = new ReadableStream({ 240 start() {} 241 }); 242 finished(rs, { signal }, common.mustCall((err) => { 243 assert.strictEqual(err.name, 'AbortError'); 244 })); 245} 246 247{ 248 // Check cancelled before the stream ends sync. 249 const ac = new AbortController(); 250 const { signal } = ac; 251 252 const rs = new ReadableStream({ 253 start() {} 254 }); 255 finished(rs, { signal }, common.mustCall((err) => { 256 assert.strictEqual(err.name, 'AbortError'); 257 })); 258 259 ac.abort(); 260} 261 262{ 263 // Check cancelled before the stream ends async. 264 const ac = new AbortController(); 265 const { signal } = ac; 266 267 const rs = new ReadableStream({ 268 start() {} 269 }); 270 setTimeout(() => ac.abort(), 1); 271 finished(rs, { signal }, common.mustCall((err) => { 272 assert.strictEqual(err.name, 'AbortError'); 273 })); 274} 275 276{ 277 // Check cancelled after doesn't throw. 278 const ac = new AbortController(); 279 const { signal } = ac; 280 281 const rs = new ReadableStream({ 282 start(controller) { 283 controller.enqueue('asd'); 284 controller.close(); 285 } 286 }); 287 finished(rs, { signal }, common.mustSucceed()); 288 289 rs.getReader().read().then(common.mustCall((chunk) => { 290 assert.strictEqual(chunk.value, 'asd'); 291 setImmediate(() => ac.abort()); 292 })); 293} 294 295{ 296 // Promisified abort works 297 async function run() { 298 const ac = new AbortController(); 299 const { signal } = ac; 300 const rs = new ReadableStream({ 301 start() {} 302 }); 303 setImmediate(() => ac.abort()); 304 await finishedPromise(rs, { signal }); 305 } 306 307 assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); 308} 309 310{ 311 // Promisified pre-aborted works 312 async function run() { 313 const signal = new EventTarget(); 314 signal.aborted = true; 315 const rs = new ReadableStream({ 316 start() {} 317 }); 318 await finishedPromise(rs, { signal }); 319 } 320 321 assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); 322} 323