11cb0ef41Sopenharmony_ciconst Minipass = require('minipass') 21cb0ef41Sopenharmony_ciconst EE = require('events') 31cb0ef41Sopenharmony_ciconst isStream = s => s && s instanceof EE && ( 41cb0ef41Sopenharmony_ci typeof s.pipe === 'function' || // readable 51cb0ef41Sopenharmony_ci (typeof s.write === 'function' && typeof s.end === 'function') // writable 61cb0ef41Sopenharmony_ci) 71cb0ef41Sopenharmony_ci 81cb0ef41Sopenharmony_ciconst _head = Symbol('_head') 91cb0ef41Sopenharmony_ciconst _tail = Symbol('_tail') 101cb0ef41Sopenharmony_ciconst _linkStreams = Symbol('_linkStreams') 111cb0ef41Sopenharmony_ciconst _setHead = Symbol('_setHead') 121cb0ef41Sopenharmony_ciconst _setTail = Symbol('_setTail') 131cb0ef41Sopenharmony_ciconst _onError = Symbol('_onError') 141cb0ef41Sopenharmony_ciconst _onData = Symbol('_onData') 151cb0ef41Sopenharmony_ciconst _onEnd = Symbol('_onEnd') 161cb0ef41Sopenharmony_ciconst _onDrain = Symbol('_onDrain') 171cb0ef41Sopenharmony_ciconst _streams = Symbol('_streams') 181cb0ef41Sopenharmony_ciclass Pipeline extends Minipass { 191cb0ef41Sopenharmony_ci constructor (opts, ...streams) { 201cb0ef41Sopenharmony_ci if (isStream(opts)) { 211cb0ef41Sopenharmony_ci streams.unshift(opts) 221cb0ef41Sopenharmony_ci opts = {} 231cb0ef41Sopenharmony_ci } 241cb0ef41Sopenharmony_ci 251cb0ef41Sopenharmony_ci super(opts) 261cb0ef41Sopenharmony_ci this[_streams] = [] 271cb0ef41Sopenharmony_ci if (streams.length) 281cb0ef41Sopenharmony_ci this.push(...streams) 291cb0ef41Sopenharmony_ci } 301cb0ef41Sopenharmony_ci 311cb0ef41Sopenharmony_ci [_linkStreams] (streams) { 321cb0ef41Sopenharmony_ci // reduce takes (left,right), and we return right to make it the 331cb0ef41Sopenharmony_ci // new left value. 341cb0ef41Sopenharmony_ci return streams.reduce((src, dest) => { 351cb0ef41Sopenharmony_ci src.on('error', er => dest.emit('error', er)) 361cb0ef41Sopenharmony_ci src.pipe(dest) 371cb0ef41Sopenharmony_ci return dest 381cb0ef41Sopenharmony_ci }) 391cb0ef41Sopenharmony_ci } 401cb0ef41Sopenharmony_ci 411cb0ef41Sopenharmony_ci push (...streams) { 421cb0ef41Sopenharmony_ci this[_streams].push(...streams) 431cb0ef41Sopenharmony_ci if (this[_tail]) 441cb0ef41Sopenharmony_ci streams.unshift(this[_tail]) 451cb0ef41Sopenharmony_ci 461cb0ef41Sopenharmony_ci const linkRet = this[_linkStreams](streams) 471cb0ef41Sopenharmony_ci 481cb0ef41Sopenharmony_ci this[_setTail](linkRet) 491cb0ef41Sopenharmony_ci if (!this[_head]) 501cb0ef41Sopenharmony_ci this[_setHead](streams[0]) 511cb0ef41Sopenharmony_ci } 521cb0ef41Sopenharmony_ci 531cb0ef41Sopenharmony_ci unshift (...streams) { 541cb0ef41Sopenharmony_ci this[_streams].unshift(...streams) 551cb0ef41Sopenharmony_ci if (this[_head]) 561cb0ef41Sopenharmony_ci streams.push(this[_head]) 571cb0ef41Sopenharmony_ci 581cb0ef41Sopenharmony_ci const linkRet = this[_linkStreams](streams) 591cb0ef41Sopenharmony_ci this[_setHead](streams[0]) 601cb0ef41Sopenharmony_ci if (!this[_tail]) 611cb0ef41Sopenharmony_ci this[_setTail](linkRet) 621cb0ef41Sopenharmony_ci } 631cb0ef41Sopenharmony_ci 641cb0ef41Sopenharmony_ci destroy (er) { 651cb0ef41Sopenharmony_ci // set fire to the whole thing. 661cb0ef41Sopenharmony_ci this[_streams].forEach(s => 671cb0ef41Sopenharmony_ci typeof s.destroy === 'function' && s.destroy()) 681cb0ef41Sopenharmony_ci return super.destroy(er) 691cb0ef41Sopenharmony_ci } 701cb0ef41Sopenharmony_ci 711cb0ef41Sopenharmony_ci // readable interface -> tail 721cb0ef41Sopenharmony_ci [_setTail] (stream) { 731cb0ef41Sopenharmony_ci this[_tail] = stream 741cb0ef41Sopenharmony_ci stream.on('error', er => this[_onError](stream, er)) 751cb0ef41Sopenharmony_ci stream.on('data', chunk => this[_onData](stream, chunk)) 761cb0ef41Sopenharmony_ci stream.on('end', () => this[_onEnd](stream)) 771cb0ef41Sopenharmony_ci stream.on('finish', () => this[_onEnd](stream)) 781cb0ef41Sopenharmony_ci } 791cb0ef41Sopenharmony_ci 801cb0ef41Sopenharmony_ci // errors proxied down the pipeline 811cb0ef41Sopenharmony_ci // they're considered part of the "read" interface 821cb0ef41Sopenharmony_ci [_onError] (stream, er) { 831cb0ef41Sopenharmony_ci if (stream === this[_tail]) 841cb0ef41Sopenharmony_ci this.emit('error', er) 851cb0ef41Sopenharmony_ci } 861cb0ef41Sopenharmony_ci [_onData] (stream, chunk) { 871cb0ef41Sopenharmony_ci if (stream === this[_tail]) 881cb0ef41Sopenharmony_ci super.write(chunk) 891cb0ef41Sopenharmony_ci } 901cb0ef41Sopenharmony_ci [_onEnd] (stream) { 911cb0ef41Sopenharmony_ci if (stream === this[_tail]) 921cb0ef41Sopenharmony_ci super.end() 931cb0ef41Sopenharmony_ci } 941cb0ef41Sopenharmony_ci pause () { 951cb0ef41Sopenharmony_ci super.pause() 961cb0ef41Sopenharmony_ci return this[_tail] && this[_tail].pause && this[_tail].pause() 971cb0ef41Sopenharmony_ci } 981cb0ef41Sopenharmony_ci 991cb0ef41Sopenharmony_ci // NB: Minipass calls its internal private [RESUME] method during 1001cb0ef41Sopenharmony_ci // pipe drains, to avoid hazards where stream.resume() is overridden. 1011cb0ef41Sopenharmony_ci // Thus, we need to listen to the resume *event*, not override the 1021cb0ef41Sopenharmony_ci // resume() method, and proxy *that* to the tail. 1031cb0ef41Sopenharmony_ci emit (ev, ...args) { 1041cb0ef41Sopenharmony_ci if (ev === 'resume' && this[_tail] && this[_tail].resume) 1051cb0ef41Sopenharmony_ci this[_tail].resume() 1061cb0ef41Sopenharmony_ci return super.emit(ev, ...args) 1071cb0ef41Sopenharmony_ci } 1081cb0ef41Sopenharmony_ci 1091cb0ef41Sopenharmony_ci // writable interface -> head 1101cb0ef41Sopenharmony_ci [_setHead] (stream) { 1111cb0ef41Sopenharmony_ci this[_head] = stream 1121cb0ef41Sopenharmony_ci stream.on('drain', () => this[_onDrain](stream)) 1131cb0ef41Sopenharmony_ci } 1141cb0ef41Sopenharmony_ci [_onDrain] (stream) { 1151cb0ef41Sopenharmony_ci if (stream === this[_head]) 1161cb0ef41Sopenharmony_ci this.emit('drain') 1171cb0ef41Sopenharmony_ci } 1181cb0ef41Sopenharmony_ci write (chunk, enc, cb) { 1191cb0ef41Sopenharmony_ci return this[_head].write(chunk, enc, cb) && 1201cb0ef41Sopenharmony_ci (this.flowing || this.buffer.length === 0) 1211cb0ef41Sopenharmony_ci } 1221cb0ef41Sopenharmony_ci end (chunk, enc, cb) { 1231cb0ef41Sopenharmony_ci this[_head].end(chunk, enc, cb) 1241cb0ef41Sopenharmony_ci return this 1251cb0ef41Sopenharmony_ci } 1261cb0ef41Sopenharmony_ci} 1271cb0ef41Sopenharmony_ci 1281cb0ef41Sopenharmony_cimodule.exports = Pipeline 129