1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22// a transform stream is a readable/writable stream where you do 23// something with the data. Sometimes it's called a "filter", 24// but that's not a great name for it, since that implies a thing where 25// some bits pass through, and others are simply ignored. (That would 26// be a valid example of a transform, of course.) 27// 28// While the output is causally related to the input, it's not a 29// necessarily symmetric or synchronous transformation. For example, 30// a zlib stream might take multiple plain-text writes(), and then 31// emit a single compressed chunk some time in the future. 32// 33// Here's how this works: 34// 35// The Transform stream has all the aspects of the readable and writable 36// stream classes. When you write(chunk), that calls _write(chunk,cb) 37// internally, and returns false if there's a lot of pending writes 38// buffered up. When you call read(), that calls _read(n) until 39// there's enough pending readable data buffered up. 40// 41// In a transform stream, the written data is placed in a buffer. When 42// _read(n) is called, it transforms the queued up data, calling the 43// buffered _write cb's as it consumes chunks. If consuming a single 44// written chunk would result in multiple output chunks, then the first 45// outputted bit calls the readcb, and subsequent chunks just go into 46// the read buffer, and will cause it to emit 'readable' if necessary. 47// 48// This way, back-pressure is actually determined by the reading side, 49// since _read has to be called to start processing a new chunk. However, 50// a pathological inflate type of transform can cause excessive buffering 51// here. For example, imagine a stream where every byte of input is 52// interpreted as an integer from 0-255, and then results in that many 53// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in 54// 1kb of data being output. In this case, you could write a very small 55// amount of input, and end up with a very large amount of output. In 56// such a pathological inflating mechanism, there'd be no way to tell 57// the system to stop doing the transform. A single 4MB write could 58// cause the system to run out of memory. 59// 60// However, even in such a pathological case, only a single written chunk 61// would be consumed, and then the rest would wait (un-transformed) until 62// the results of the previous transformed chunk were consumed. 63 64'use strict'; 65 66const { 67 ObjectSetPrototypeOf, 68 Symbol, 69} = primordials; 70 71module.exports = Transform; 72const { 73 ERR_METHOD_NOT_IMPLEMENTED, 74} = require('internal/errors').codes; 75const Duplex = require('internal/streams/duplex'); 76const { getHighWaterMark } = require('internal/streams/state'); 77ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); 78ObjectSetPrototypeOf(Transform, Duplex); 79 80const kCallback = Symbol('kCallback'); 81 82function Transform(options) { 83 if (!(this instanceof Transform)) 84 return new Transform(options); 85 86 // TODO (ronag): This should preferably always be 87 // applied but would be semver-major. Or even better; 88 // make Transform a Readable with the Writable interface. 89 const readableHighWaterMark = options ? getHighWaterMark(this, options, 'readableHighWaterMark', true) : null; 90 if (readableHighWaterMark === 0) { 91 // A Duplex will buffer both on the writable and readable side while 92 // a Transform just wants to buffer hwm number of elements. To avoid 93 // buffering twice we disable buffering on the writable side. 94 options = { 95 ...options, 96 highWaterMark: null, 97 readableHighWaterMark, 98 // TODO (ronag): 0 is not optimal since we have 99 // a "bug" where we check needDrain before calling _write and not after. 100 // Refs: https://github.com/nodejs/node/pull/32887 101 // Refs: https://github.com/nodejs/node/pull/35941 102 writableHighWaterMark: options.writableHighWaterMark || 0, 103 }; 104 } 105 106 Duplex.call(this, options); 107 108 // We have implemented the _read method, and done the other things 109 // that Readable wants before the first _read call, so unset the 110 // sync guard flag. 111 this._readableState.sync = false; 112 113 this[kCallback] = null; 114 115 if (options) { 116 if (typeof options.transform === 'function') 117 this._transform = options.transform; 118 119 if (typeof options.flush === 'function') 120 this._flush = options.flush; 121 } 122 123 // When the writable side finishes, then flush out anything remaining. 124 // Backwards compat. Some Transform streams incorrectly implement _final 125 // instead of or in addition to _flush. By using 'prefinish' instead of 126 // implementing _final we continue supporting this unfortunate use case. 127 this.on('prefinish', prefinish); 128} 129 130function final(cb) { 131 if (typeof this._flush === 'function' && !this.destroyed) { 132 this._flush((er, data) => { 133 if (er) { 134 if (cb) { 135 cb(er); 136 } else { 137 this.destroy(er); 138 } 139 return; 140 } 141 142 if (data != null) { 143 this.push(data); 144 } 145 this.push(null); 146 if (cb) { 147 cb(); 148 } 149 }); 150 } else { 151 this.push(null); 152 if (cb) { 153 cb(); 154 } 155 } 156} 157 158function prefinish() { 159 if (this._final !== final) { 160 final.call(this); 161 } 162} 163 164Transform.prototype._final = final; 165 166Transform.prototype._transform = function(chunk, encoding, callback) { 167 throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()'); 168}; 169 170Transform.prototype._write = function(chunk, encoding, callback) { 171 const rState = this._readableState; 172 const wState = this._writableState; 173 const length = rState.length; 174 175 this._transform(chunk, encoding, (err, val) => { 176 if (err) { 177 callback(err); 178 return; 179 } 180 181 if (val != null) { 182 this.push(val); 183 } 184 185 if ( 186 wState.ended || // Backwards compat. 187 length === rState.length || // Backwards compat. 188 rState.length < rState.highWaterMark 189 ) { 190 callback(); 191 } else { 192 this[kCallback] = callback; 193 } 194 }); 195}; 196 197Transform.prototype._read = function() { 198 if (this[kCallback]) { 199 const callback = this[kCallback]; 200 this[kCallback] = null; 201 callback(); 202 } 203}; 204