1// Flags: --no-warnings --expose-internals 2'use strict'; 3 4const common = require('../common'); 5 6const assert = require('assert'); 7 8const { 9 WritableStream, 10} = require('stream/web'); 11 12const { 13 newStreamWritableFromWritableStream, 14} = require('internal/webstreams/adapters'); 15 16const { 17 finished, 18 pipeline, 19 Readable, 20} = require('stream'); 21 22const { 23 kState, 24} = require('internal/webstreams/util'); 25 26class TestSource { 27 constructor() { 28 this.chunks = []; 29 } 30 31 start(c) { 32 this.controller = c; 33 this.started = true; 34 } 35 36 write(chunk) { 37 this.chunks.push(chunk); 38 } 39 40 close() { 41 this.closed = true; 42 } 43 44 abort(reason) { 45 this.abortReason = reason; 46 } 47} 48 49[1, {}, false, []].forEach((arg) => { 50 assert.throws(() => newStreamWritableFromWritableStream(arg), { 51 code: 'ERR_INVALID_ARG_TYPE', 52 }); 53}); 54 55{ 56 // Ending the stream.Writable should close the writableStream 57 const source = new TestSource(); 58 const writableStream = new WritableStream(source); 59 const writable = newStreamWritableFromWritableStream(writableStream); 60 61 assert(writableStream.locked); 62 63 writable.end('chunk'); 64 65 writable.on('close', common.mustCall(() => { 66 assert(writableStream.locked); 67 assert.strictEqual(writableStream[kState].state, 'closed'); 68 assert.strictEqual(source.chunks.length, 1); 69 assert.deepStrictEqual(source.chunks[0], Buffer.from('chunk')); 70 })); 71} 72 73{ 74 // Destroying the stream.Writable without an error should close 75 // the writableStream with no error. 76 const source = new TestSource(); 77 const writableStream = new WritableStream(source); 78 const writable = newStreamWritableFromWritableStream(writableStream); 79 80 assert(writableStream.locked); 81 82 writable.destroy(); 83 84 writable.on('close', common.mustCall(() => { 85 assert(writableStream.locked); 86 assert.strictEqual(writableStream[kState].state, 'closed'); 87 assert.strictEqual(source.chunks.length, 0); 88 })); 89} 90 91{ 92 // Destroying the stream.Writable with an error should error 93 // the writableStream 94 const error = new Error('boom'); 95 const source = new TestSource(); 96 const writableStream = new WritableStream(source); 97 const writable = newStreamWritableFromWritableStream(writableStream); 98 99 assert(writableStream.locked); 100 101 writable.destroy(error); 102 103 writable.on('error', common.mustCall((reason) => { 104 assert.strictEqual(reason, error); 105 })); 106 107 writable.on('close', common.mustCall(() => { 108 assert(writableStream.locked); 109 assert.strictEqual(writableStream[kState].state, 'errored'); 110 assert.strictEqual(writableStream[kState].storedError, error); 111 assert.strictEqual(source.chunks.length, 0); 112 })); 113} 114 115{ 116 // Attempting to close, abort, or getWriter on writableStream 117 // should fail because it is locked. An internal error in 118 // writableStream should error the writable. 119 const error = new Error('boom'); 120 const source = new TestSource(); 121 const writableStream = new WritableStream(source); 122 const writable = newStreamWritableFromWritableStream(writableStream); 123 124 assert(writableStream.locked); 125 126 assert.rejects(writableStream.close(), { 127 code: 'ERR_INVALID_STATE', 128 }); 129 130 assert.rejects(writableStream.abort(), { 131 code: 'ERR_INVALID_STATE', 132 }); 133 134 assert.throws(() => writableStream.getWriter(), { 135 code: 'ERR_INVALID_STATE', 136 }); 137 138 writable.on('error', common.mustCall((reason) => { 139 assert.strictEqual(error, reason); 140 })); 141 142 source.controller.error(error); 143} 144 145{ 146 const source = new TestSource(); 147 const writableStream = new WritableStream(source); 148 const writable = newStreamWritableFromWritableStream(writableStream); 149 150 writable.on('error', common.mustNotCall()); 151 writable.on('finish', common.mustCall()); 152 writable.on('close', common.mustCall(() => { 153 assert.strictEqual(source.chunks.length, 1); 154 assert.deepStrictEqual(source.chunks[0], Buffer.from('hello')); 155 })); 156 157 writable.write('hello', common.mustCall()); 158 writable.end(); 159} 160 161{ 162 const source = new TestSource(); 163 const writableStream = new WritableStream(source); 164 const writable = 165 newStreamWritableFromWritableStream(writableStream, { 166 decodeStrings: false, 167 }); 168 169 writable.on('error', common.mustNotCall()); 170 writable.on('finish', common.mustCall()); 171 writable.on('close', common.mustCall(() => { 172 assert.strictEqual(source.chunks.length, 1); 173 assert.strictEqual(source.chunks[0], 'hello'); 174 })); 175 176 writable.write('hello', common.mustCall()); 177 writable.end(); 178} 179 180{ 181 const source = new TestSource(); 182 const writableStream = new WritableStream(source); 183 const writable = 184 newStreamWritableFromWritableStream( 185 writableStream, { 186 objectMode: true 187 }); 188 assert(writable.writableObjectMode); 189 190 writable.on('error', common.mustNotCall()); 191 writable.on('finish', common.mustCall()); 192 writable.on('close', common.mustCall(() => { 193 assert.strictEqual(source.chunks.length, 1); 194 assert.strictEqual(source.chunks[0], 'hello'); 195 })); 196 197 writable.write('hello', common.mustCall()); 198 writable.end(); 199} 200 201{ 202 const writableStream = new WritableStream({ 203 write: common.mustCall(5), 204 close: common.mustCall(), 205 }); 206 const writable = newStreamWritableFromWritableStream(writableStream); 207 208 finished(writable, common.mustCall()); 209 210 writable.write('hello'); 211 writable.write('hello'); 212 writable.write('hello'); 213 writable.write('world'); 214 writable.write('world'); 215 writable.end(); 216} 217 218{ 219 const writableStream = new WritableStream({ 220 write: common.mustCall(2), 221 close: common.mustCall(), 222 }); 223 const writable = newStreamWritableFromWritableStream(writableStream); 224 225 const readable = new Readable({ 226 read() { 227 readable.push(Buffer.from('hello')); 228 readable.push(Buffer.from('world')); 229 readable.push(null); 230 } 231 }); 232 233 pipeline(readable, writable, common.mustCall()); 234} 235