1'use strict' 2const { Minipass } = require('minipass') 3const EE = require('events').EventEmitter 4const fs = require('fs') 5 6const writev = fs.writev 7 8const _autoClose = Symbol('_autoClose') 9const _close = Symbol('_close') 10const _ended = Symbol('_ended') 11const _fd = Symbol('_fd') 12const _finished = Symbol('_finished') 13const _flags = Symbol('_flags') 14const _flush = Symbol('_flush') 15const _handleChunk = Symbol('_handleChunk') 16const _makeBuf = Symbol('_makeBuf') 17const _mode = Symbol('_mode') 18const _needDrain = Symbol('_needDrain') 19const _onerror = Symbol('_onerror') 20const _onopen = Symbol('_onopen') 21const _onread = Symbol('_onread') 22const _onwrite = Symbol('_onwrite') 23const _open = Symbol('_open') 24const _path = Symbol('_path') 25const _pos = Symbol('_pos') 26const _queue = Symbol('_queue') 27const _read = Symbol('_read') 28const _readSize = Symbol('_readSize') 29const _reading = Symbol('_reading') 30const _remain = Symbol('_remain') 31const _size = Symbol('_size') 32const _write = Symbol('_write') 33const _writing = Symbol('_writing') 34const _defaultFlag = Symbol('_defaultFlag') 35const _errored = Symbol('_errored') 36 37class ReadStream extends Minipass { 38 constructor (path, opt) { 39 opt = opt || {} 40 super(opt) 41 42 this.readable = true 43 this.writable = false 44 45 if (typeof path !== 'string') { 46 throw new TypeError('path must be a string') 47 } 48 49 this[_errored] = false 50 this[_fd] = typeof opt.fd === 'number' ? opt.fd : null 51 this[_path] = path 52 this[_readSize] = opt.readSize || 16 * 1024 * 1024 53 this[_reading] = false 54 this[_size] = typeof opt.size === 'number' ? opt.size : Infinity 55 this[_remain] = this[_size] 56 this[_autoClose] = typeof opt.autoClose === 'boolean' ? 57 opt.autoClose : true 58 59 if (typeof this[_fd] === 'number') { 60 this[_read]() 61 } else { 62 this[_open]() 63 } 64 } 65 66 get fd () { 67 return this[_fd] 68 } 69 70 get path () { 71 return this[_path] 72 } 73 74 write () { 75 throw new TypeError('this is a readable stream') 76 } 77 78 end () { 79 throw new TypeError('this is a readable stream') 80 } 81 82 [_open] () { 83 fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd)) 84 } 85 86 [_onopen] (er, fd) { 87 if (er) { 88 this[_onerror](er) 89 } else { 90 this[_fd] = fd 91 this.emit('open', fd) 92 this[_read]() 93 } 94 } 95 96 [_makeBuf] () { 97 return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain])) 98 } 99 100 [_read] () { 101 if (!this[_reading]) { 102 this[_reading] = true 103 const buf = this[_makeBuf]() 104 /* istanbul ignore if */ 105 if (buf.length === 0) { 106 return process.nextTick(() => this[_onread](null, 0, buf)) 107 } 108 fs.read(this[_fd], buf, 0, buf.length, null, (er, br, b) => 109 this[_onread](er, br, b)) 110 } 111 } 112 113 [_onread] (er, br, buf) { 114 this[_reading] = false 115 if (er) { 116 this[_onerror](er) 117 } else if (this[_handleChunk](br, buf)) { 118 this[_read]() 119 } 120 } 121 122 [_close] () { 123 if (this[_autoClose] && typeof this[_fd] === 'number') { 124 const fd = this[_fd] 125 this[_fd] = null 126 fs.close(fd, er => er ? this.emit('error', er) : this.emit('close')) 127 } 128 } 129 130 [_onerror] (er) { 131 this[_reading] = true 132 this[_close]() 133 this.emit('error', er) 134 } 135 136 [_handleChunk] (br, buf) { 137 let ret = false 138 // no effect if infinite 139 this[_remain] -= br 140 if (br > 0) { 141 ret = super.write(br < buf.length ? buf.slice(0, br) : buf) 142 } 143 144 if (br === 0 || this[_remain] <= 0) { 145 ret = false 146 this[_close]() 147 super.end() 148 } 149 150 return ret 151 } 152 153 emit (ev, data) { 154 switch (ev) { 155 case 'prefinish': 156 case 'finish': 157 break 158 159 case 'drain': 160 if (typeof this[_fd] === 'number') { 161 this[_read]() 162 } 163 break 164 165 case 'error': 166 if (this[_errored]) { 167 return 168 } 169 this[_errored] = true 170 return super.emit(ev, data) 171 172 default: 173 return super.emit(ev, data) 174 } 175 } 176} 177 178class ReadStreamSync extends ReadStream { 179 [_open] () { 180 let threw = true 181 try { 182 this[_onopen](null, fs.openSync(this[_path], 'r')) 183 threw = false 184 } finally { 185 if (threw) { 186 this[_close]() 187 } 188 } 189 } 190 191 [_read] () { 192 let threw = true 193 try { 194 if (!this[_reading]) { 195 this[_reading] = true 196 do { 197 const buf = this[_makeBuf]() 198 /* istanbul ignore next */ 199 const br = buf.length === 0 ? 0 200 : fs.readSync(this[_fd], buf, 0, buf.length, null) 201 if (!this[_handleChunk](br, buf)) { 202 break 203 } 204 } while (true) 205 this[_reading] = false 206 } 207 threw = false 208 } finally { 209 if (threw) { 210 this[_close]() 211 } 212 } 213 } 214 215 [_close] () { 216 if (this[_autoClose] && typeof this[_fd] === 'number') { 217 const fd = this[_fd] 218 this[_fd] = null 219 fs.closeSync(fd) 220 this.emit('close') 221 } 222 } 223} 224 225class WriteStream extends EE { 226 constructor (path, opt) { 227 opt = opt || {} 228 super(opt) 229 this.readable = false 230 this.writable = true 231 this[_errored] = false 232 this[_writing] = false 233 this[_ended] = false 234 this[_needDrain] = false 235 this[_queue] = [] 236 this[_path] = path 237 this[_fd] = typeof opt.fd === 'number' ? opt.fd : null 238 this[_mode] = opt.mode === undefined ? 0o666 : opt.mode 239 this[_pos] = typeof opt.start === 'number' ? opt.start : null 240 this[_autoClose] = typeof opt.autoClose === 'boolean' ? 241 opt.autoClose : true 242 243 // truncating makes no sense when writing into the middle 244 const defaultFlag = this[_pos] !== null ? 'r+' : 'w' 245 this[_defaultFlag] = opt.flags === undefined 246 this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags 247 248 if (this[_fd] === null) { 249 this[_open]() 250 } 251 } 252 253 emit (ev, data) { 254 if (ev === 'error') { 255 if (this[_errored]) { 256 return 257 } 258 this[_errored] = true 259 } 260 return super.emit(ev, data) 261 } 262 263 get fd () { 264 return this[_fd] 265 } 266 267 get path () { 268 return this[_path] 269 } 270 271 [_onerror] (er) { 272 this[_close]() 273 this[_writing] = true 274 this.emit('error', er) 275 } 276 277 [_open] () { 278 fs.open(this[_path], this[_flags], this[_mode], 279 (er, fd) => this[_onopen](er, fd)) 280 } 281 282 [_onopen] (er, fd) { 283 if (this[_defaultFlag] && 284 this[_flags] === 'r+' && 285 er && er.code === 'ENOENT') { 286 this[_flags] = 'w' 287 this[_open]() 288 } else if (er) { 289 this[_onerror](er) 290 } else { 291 this[_fd] = fd 292 this.emit('open', fd) 293 if (!this[_writing]) { 294 this[_flush]() 295 } 296 } 297 } 298 299 end (buf, enc) { 300 if (buf) { 301 this.write(buf, enc) 302 } 303 304 this[_ended] = true 305 306 // synthetic after-write logic, where drain/finish live 307 if (!this[_writing] && !this[_queue].length && 308 typeof this[_fd] === 'number') { 309 this[_onwrite](null, 0) 310 } 311 return this 312 } 313 314 write (buf, enc) { 315 if (typeof buf === 'string') { 316 buf = Buffer.from(buf, enc) 317 } 318 319 if (this[_ended]) { 320 this.emit('error', new Error('write() after end()')) 321 return false 322 } 323 324 if (this[_fd] === null || this[_writing] || this[_queue].length) { 325 this[_queue].push(buf) 326 this[_needDrain] = true 327 return false 328 } 329 330 this[_writing] = true 331 this[_write](buf) 332 return true 333 } 334 335 [_write] (buf) { 336 fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) => 337 this[_onwrite](er, bw)) 338 } 339 340 [_onwrite] (er, bw) { 341 if (er) { 342 this[_onerror](er) 343 } else { 344 if (this[_pos] !== null) { 345 this[_pos] += bw 346 } 347 if (this[_queue].length) { 348 this[_flush]() 349 } else { 350 this[_writing] = false 351 352 if (this[_ended] && !this[_finished]) { 353 this[_finished] = true 354 this[_close]() 355 this.emit('finish') 356 } else if (this[_needDrain]) { 357 this[_needDrain] = false 358 this.emit('drain') 359 } 360 } 361 } 362 } 363 364 [_flush] () { 365 if (this[_queue].length === 0) { 366 if (this[_ended]) { 367 this[_onwrite](null, 0) 368 } 369 } else if (this[_queue].length === 1) { 370 this[_write](this[_queue].pop()) 371 } else { 372 const iovec = this[_queue] 373 this[_queue] = [] 374 writev(this[_fd], iovec, this[_pos], 375 (er, bw) => this[_onwrite](er, bw)) 376 } 377 } 378 379 [_close] () { 380 if (this[_autoClose] && typeof this[_fd] === 'number') { 381 const fd = this[_fd] 382 this[_fd] = null 383 fs.close(fd, er => er ? this.emit('error', er) : this.emit('close')) 384 } 385 } 386} 387 388class WriteStreamSync extends WriteStream { 389 [_open] () { 390 let fd 391 // only wrap in a try{} block if we know we'll retry, to avoid 392 // the rethrow obscuring the error's source frame in most cases. 393 if (this[_defaultFlag] && this[_flags] === 'r+') { 394 try { 395 fd = fs.openSync(this[_path], this[_flags], this[_mode]) 396 } catch (er) { 397 if (er.code === 'ENOENT') { 398 this[_flags] = 'w' 399 return this[_open]() 400 } else { 401 throw er 402 } 403 } 404 } else { 405 fd = fs.openSync(this[_path], this[_flags], this[_mode]) 406 } 407 408 this[_onopen](null, fd) 409 } 410 411 [_close] () { 412 if (this[_autoClose] && typeof this[_fd] === 'number') { 413 const fd = this[_fd] 414 this[_fd] = null 415 fs.closeSync(fd) 416 this.emit('close') 417 } 418 } 419 420 [_write] (buf) { 421 // throw the original, but try to close if it fails 422 let threw = true 423 try { 424 this[_onwrite](null, 425 fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos])) 426 threw = false 427 } finally { 428 if (threw) { 429 try { 430 this[_close]() 431 } catch { 432 // ok error 433 } 434 } 435 } 436 } 437} 438 439exports.ReadStream = ReadStream 440exports.ReadStreamSync = ReadStreamSync 441 442exports.WriteStream = WriteStream 443exports.WriteStreamSync = WriteStreamSync 444