1'use strict'; 2 3const common = require('../common'); 4const assert = require('assert'); 5const { Readable, Writable, Transform, pipeline, PassThrough } = require('stream'); 6const { pipeline: pipelinePromise } = require('stream/promises'); 7const { ReadableStream, WritableStream, TransformStream } = require('stream/web'); 8const http = require('http'); 9 10{ 11 const values = []; 12 let c; 13 const rs = new ReadableStream({ 14 start(controller) { 15 c = controller; 16 } 17 }); 18 const ws = new WritableStream({ 19 write(chunk) { 20 values.push(chunk); 21 } 22 }); 23 24 pipeline(rs, ws, common.mustSucceed(() => { 25 assert.deepStrictEqual(values, ['hello', 'world']); 26 })); 27 28 c.enqueue('hello'); 29 c.enqueue('world'); 30 c.close(); 31} 32 33{ 34 let c; 35 const rs = new ReadableStream({ 36 start(controller) { 37 c = controller; 38 } 39 }); 40 41 const ws = new WritableStream({ 42 write() { } 43 }); 44 45 pipeline(rs, ws, common.mustCall((err) => { 46 assert.strictEqual(err?.message, 'kaboom'); 47 })); 48 49 c.error(new Error('kaboom')); 50} 51 52{ 53 let c; 54 const values = []; 55 const rs = new ReadableStream({ 56 start(controller) { 57 c = controller; 58 } 59 }); 60 61 const ts = new TransformStream({ 62 transform(chunk, controller) { 63 controller.enqueue(chunk?.toString().toUpperCase()); 64 } 65 }); 66 67 const ws = new WritableStream({ 68 write(chunk) { 69 values.push(chunk?.toString()); 70 } 71 }); 72 73 pipeline(rs, ts, ws, common.mustSucceed(() => { 74 assert.deepStrictEqual(values, ['HELLO', 'WORLD']); 75 })); 76 77 c.enqueue('hello'); 78 c.enqueue('world'); 79 c.close(); 80} 81 82{ 83 function makeTransformStream() { 84 return new TransformStream({ 85 transform(chunk, controller) { 86 controller.enqueue(chunk?.toString()); 87 } 88 }); 89 } 90 91 let c; 92 const rs = new ReadableStream({ 93 start(controller) { 94 c = controller; 95 } 96 }); 97 98 const ws = new WritableStream({ 99 write() { } 100 }); 101 102 pipeline(rs, 103 makeTransformStream(), 104 makeTransformStream(), 105 makeTransformStream(), 106 makeTransformStream(), 107 ws, 108 common.mustCall((err) => { 109 assert.strictEqual(err?.message, 'kaboom'); 110 })); 111 112 c.error(new Error('kaboom')); 113} 114 115{ 116 const values = []; 117 118 const r = new Readable({ 119 read() { } 120 }); 121 122 const ws = new WritableStream({ 123 write(chunk) { 124 values.push(chunk?.toString()); 125 } 126 }); 127 128 pipeline(r, ws, common.mustSucceed(() => { 129 assert.deepStrictEqual(values, ['helloworld']); 130 })); 131 132 r.push('hello'); 133 r.push('world'); 134 r.push(null); 135} 136 137{ 138 const values = []; 139 let c; 140 const rs = new ReadableStream({ 141 start(controller) { 142 c = controller; 143 } 144 }); 145 146 const w = new Writable({ 147 write(chunk, encoding, callback) { 148 values.push(chunk?.toString()); 149 callback(); 150 } 151 }); 152 153 pipeline(rs, w, common.mustSucceed(() => { 154 assert.deepStrictEqual(values, ['hello', 'world']); 155 })); 156 157 c.enqueue('hello'); 158 c.enqueue('world'); 159 c.close(); 160} 161 162{ 163 const values = []; 164 let c; 165 const rs = new ReadableStream({ 166 start(controller) { 167 c = controller; 168 } 169 }); 170 171 const ws = new WritableStream({ 172 write(chunk) { 173 values.push(chunk?.toString()); 174 } 175 }); 176 177 const t = new Transform({ 178 transform(chunk, encoding, callback) { 179 callback(null, chunk?.toString().toUpperCase()); 180 } 181 }); 182 183 pipeline(rs, t, ws, common.mustSucceed(() => { 184 assert.deepStrictEqual(values, ['HELLOWORLD']); 185 })); 186 187 c.enqueue('hello'); 188 c.enqueue('world'); 189 c.close(); 190} 191 192{ 193 const server = http.createServer((req, res) => { 194 const rs = new ReadableStream({ 195 start(controller) { 196 controller.enqueue('hello'); 197 controller.enqueue('world'); 198 controller.close(); 199 } 200 }); 201 pipeline(rs, res, common.mustSucceed(() => {})); 202 }); 203 204 server.listen(0, common.mustCall(() => { 205 const req = http.request({ 206 port: server.address().port 207 }); 208 req.end(); 209 const values = []; 210 req.on('response', (res) => { 211 res.on('data', (chunk) => { 212 values.push(chunk?.toString()); 213 }); 214 res.on('end', common.mustCall(() => { 215 assert.deepStrictEqual(values, ['hello', 'world']); 216 server.close(); 217 })); 218 }); 219 })); 220} 221 222{ 223 const values = []; 224 const server = http.createServer((req, res) => { 225 const ts = new TransformStream({ 226 transform(chunk, controller) { 227 controller.enqueue(chunk?.toString().toUpperCase()); 228 } 229 }); 230 pipeline(req, ts, res, common.mustSucceed()); 231 }); 232 233 server.listen(0, () => { 234 const req = http.request({ 235 port: server.address().port, 236 method: 'POST', 237 }); 238 239 240 const rs = new ReadableStream({ 241 start(controller) { 242 controller.enqueue('hello'); 243 controller.close(); 244 } 245 }); 246 247 pipeline(rs, req, common.mustSucceed()); 248 249 req.on('response', (res) => { 250 res.on('data', (chunk) => { 251 values.push(chunk?.toString()); 252 } 253 ); 254 res.on('end', common.mustCall(() => { 255 assert.deepStrictEqual(values, ['HELLO']); 256 server.close(); 257 })); 258 }); 259 }); 260} 261 262{ 263 const values = []; 264 let c; 265 const rs = new ReadableStream({ 266 start(controller) { 267 c = controller; 268 } 269 }); 270 const ws = new WritableStream({ 271 write(chunk) { 272 values.push(chunk?.toString()); 273 } 274 }); 275 276 pipelinePromise(rs, ws).then(common.mustCall(() => { 277 assert.deepStrictEqual(values, ['hello', 'world']); 278 })); 279 280 c.enqueue('hello'); 281 c.enqueue('world'); 282 c.close(); 283} 284 285{ 286 let c; 287 const rs = new ReadableStream({ 288 start(controller) { 289 c = controller; 290 } 291 }); 292 293 const ws = new WritableStream({ 294 write() { } 295 }); 296 297 pipelinePromise(rs, ws).then(common.mustNotCall()).catch(common.mustCall((err) => { 298 assert.strictEqual(err?.message, 'kaboom'); 299 })); 300 301 c.error(new Error('kaboom')); 302} 303 304{ 305 const values = []; 306 let c; 307 const rs = new ReadableStream({ 308 start(controller) { 309 c = controller; 310 } 311 }); 312 313 pipeline(rs, async function(source) { 314 for await (const chunk of source) { 315 values.push(chunk?.toString()); 316 } 317 }, common.mustSucceed(() => { 318 assert.deepStrictEqual(values, ['hello', 'world']); 319 })); 320 321 c.enqueue('hello'); 322 c.enqueue('world'); 323 c.close(); 324} 325 326{ 327 const rs = new ReadableStream({ 328 start() {} 329 }); 330 331 pipeline(rs, async function(source) { 332 throw new Error('kaboom'); 333 }, (err) => { 334 assert.strictEqual(err?.message, 'kaboom'); 335 }); 336} 337 338{ 339 const values = []; 340 let c; 341 const rs = new ReadableStream({ 342 start(controller) { 343 c = controller; 344 } 345 }); 346 347 const ts = new TransformStream({ 348 transform(chunk, controller) { 349 controller.enqueue(chunk?.toString().toUpperCase()); 350 } 351 }); 352 353 pipeline(rs, ts, async function(source) { 354 for await (const chunk of source) { 355 values.push(chunk?.toString()); 356 } 357 }, common.mustSucceed(() => { 358 assert.deepStrictEqual(values, ['HELLO', 'WORLD']); 359 })); 360 361 c.enqueue('hello'); 362 c.enqueue('world'); 363 c.close(); 364} 365 366{ 367 const values = []; 368 let c; 369 const rs = new ReadableStream({ 370 start(controller) { 371 c = controller; 372 } 373 }); 374 375 const ws = new WritableStream({ 376 write(chunk) { 377 values.push(chunk?.toString()); 378 } 379 }); 380 381 pipeline(rs, async function* (source) { 382 for await (const chunk of source) { 383 yield chunk?.toString().toUpperCase(); 384 } 385 }, ws, common.mustSucceed(() => { 386 assert.deepStrictEqual(values, ['HELLO', 'WORLD']); 387 })); 388 389 c.enqueue('hello'); 390 c.enqueue('world'); 391 c.close(); 392} 393 394{ 395 let c; 396 const rs = new ReadableStream({ 397 start(controller) { 398 c = controller; 399 } 400 }); 401 402 const ws = new WritableStream({ 403 write(chunk) { } 404 }, { highWaterMark: 0 }); 405 406 pipeline(rs, ws, common.mustNotCall()); 407 408 for (let i = 0; i < 10; i++) { 409 c.enqueue(`${i}`); 410 } 411 c.close(); 412} 413 414{ 415 const rs = new ReadableStream({ 416 start(controller) { 417 controller.close(); 418 } 419 }); 420 421 pipeline(rs, new PassThrough(), common.mustSucceed()); 422} 423