11cb0ef41Sopenharmony_ci// Copyright Joyent, Inc. and other Node contributors. 21cb0ef41Sopenharmony_ci// 31cb0ef41Sopenharmony_ci// Permission is hereby granted, free of charge, to any person obtaining a 41cb0ef41Sopenharmony_ci// copy of this software and associated documentation files (the 51cb0ef41Sopenharmony_ci// "Software"), to deal in the Software without restriction, including 61cb0ef41Sopenharmony_ci// without limitation the rights to use, copy, modify, merge, publish, 71cb0ef41Sopenharmony_ci// distribute, sublicense, and/or sell copies of the Software, and to permit 81cb0ef41Sopenharmony_ci// persons to whom the Software is furnished to do so, subject to the 91cb0ef41Sopenharmony_ci// following conditions: 101cb0ef41Sopenharmony_ci// 111cb0ef41Sopenharmony_ci// The above copyright notice and this permission notice shall be included 121cb0ef41Sopenharmony_ci// in all copies or substantial portions of the Software. 131cb0ef41Sopenharmony_ci// 141cb0ef41Sopenharmony_ci// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 151cb0ef41Sopenharmony_ci// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 161cb0ef41Sopenharmony_ci// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 171cb0ef41Sopenharmony_ci// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 181cb0ef41Sopenharmony_ci// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 191cb0ef41Sopenharmony_ci// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 201cb0ef41Sopenharmony_ci// USE OR OTHER DEALINGS IN THE SOFTWARE. 211cb0ef41Sopenharmony_ci 221cb0ef41Sopenharmony_ci// a transform stream is a readable/writable stream where you do 231cb0ef41Sopenharmony_ci// something with the data. Sometimes it's called a "filter", 241cb0ef41Sopenharmony_ci// but that's not a great name for it, since that implies a thing where 251cb0ef41Sopenharmony_ci// some bits pass through, and others are simply ignored. (That would 261cb0ef41Sopenharmony_ci// be a valid example of a transform, of course.) 271cb0ef41Sopenharmony_ci// 281cb0ef41Sopenharmony_ci// While the output is causally related to the input, it's not a 291cb0ef41Sopenharmony_ci// necessarily symmetric or synchronous transformation. For example, 301cb0ef41Sopenharmony_ci// a zlib stream might take multiple plain-text writes(), and then 311cb0ef41Sopenharmony_ci// emit a single compressed chunk some time in the future. 321cb0ef41Sopenharmony_ci// 331cb0ef41Sopenharmony_ci// Here's how this works: 341cb0ef41Sopenharmony_ci// 351cb0ef41Sopenharmony_ci// The Transform stream has all the aspects of the readable and writable 361cb0ef41Sopenharmony_ci// stream classes. When you write(chunk), that calls _write(chunk,cb) 371cb0ef41Sopenharmony_ci// internally, and returns false if there's a lot of pending writes 381cb0ef41Sopenharmony_ci// buffered up. When you call read(), that calls _read(n) until 391cb0ef41Sopenharmony_ci// there's enough pending readable data buffered up. 401cb0ef41Sopenharmony_ci// 411cb0ef41Sopenharmony_ci// In a transform stream, the written data is placed in a buffer. When 421cb0ef41Sopenharmony_ci// _read(n) is called, it transforms the queued up data, calling the 431cb0ef41Sopenharmony_ci// buffered _write cb's as it consumes chunks. If consuming a single 441cb0ef41Sopenharmony_ci// written chunk would result in multiple output chunks, then the first 451cb0ef41Sopenharmony_ci// outputted bit calls the readcb, and subsequent chunks just go into 461cb0ef41Sopenharmony_ci// the read buffer, and will cause it to emit 'readable' if necessary. 471cb0ef41Sopenharmony_ci// 481cb0ef41Sopenharmony_ci// This way, back-pressure is actually determined by the reading side, 491cb0ef41Sopenharmony_ci// since _read has to be called to start processing a new chunk. However, 501cb0ef41Sopenharmony_ci// a pathological inflate type of transform can cause excessive buffering 511cb0ef41Sopenharmony_ci// here. For example, imagine a stream where every byte of input is 521cb0ef41Sopenharmony_ci// interpreted as an integer from 0-255, and then results in that many 531cb0ef41Sopenharmony_ci// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in 541cb0ef41Sopenharmony_ci// 1kb of data being output. In this case, you could write a very small 551cb0ef41Sopenharmony_ci// amount of input, and end up with a very large amount of output. In 561cb0ef41Sopenharmony_ci// such a pathological inflating mechanism, there'd be no way to tell 571cb0ef41Sopenharmony_ci// the system to stop doing the transform. A single 4MB write could 581cb0ef41Sopenharmony_ci// cause the system to run out of memory. 591cb0ef41Sopenharmony_ci// 601cb0ef41Sopenharmony_ci// However, even in such a pathological case, only a single written chunk 611cb0ef41Sopenharmony_ci// would be consumed, and then the rest would wait (un-transformed) until 621cb0ef41Sopenharmony_ci// the results of the previous transformed chunk were consumed. 631cb0ef41Sopenharmony_ci 641cb0ef41Sopenharmony_ci'use strict'; 651cb0ef41Sopenharmony_ci 661cb0ef41Sopenharmony_ciconst { 671cb0ef41Sopenharmony_ci ObjectSetPrototypeOf, 681cb0ef41Sopenharmony_ci Symbol, 691cb0ef41Sopenharmony_ci} = primordials; 701cb0ef41Sopenharmony_ci 711cb0ef41Sopenharmony_cimodule.exports = Transform; 721cb0ef41Sopenharmony_ciconst { 731cb0ef41Sopenharmony_ci ERR_METHOD_NOT_IMPLEMENTED, 741cb0ef41Sopenharmony_ci} = require('internal/errors').codes; 751cb0ef41Sopenharmony_ciconst Duplex = require('internal/streams/duplex'); 761cb0ef41Sopenharmony_ciconst { getHighWaterMark } = require('internal/streams/state'); 771cb0ef41Sopenharmony_ciObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); 781cb0ef41Sopenharmony_ciObjectSetPrototypeOf(Transform, Duplex); 791cb0ef41Sopenharmony_ci 801cb0ef41Sopenharmony_ciconst kCallback = Symbol('kCallback'); 811cb0ef41Sopenharmony_ci 821cb0ef41Sopenharmony_cifunction Transform(options) { 831cb0ef41Sopenharmony_ci if (!(this instanceof Transform)) 841cb0ef41Sopenharmony_ci return new Transform(options); 851cb0ef41Sopenharmony_ci 861cb0ef41Sopenharmony_ci // TODO (ronag): This should preferably always be 871cb0ef41Sopenharmony_ci // applied but would be semver-major. Or even better; 881cb0ef41Sopenharmony_ci // make Transform a Readable with the Writable interface. 891cb0ef41Sopenharmony_ci const readableHighWaterMark = options ? getHighWaterMark(this, options, 'readableHighWaterMark', true) : null; 901cb0ef41Sopenharmony_ci if (readableHighWaterMark === 0) { 911cb0ef41Sopenharmony_ci // A Duplex will buffer both on the writable and readable side while 921cb0ef41Sopenharmony_ci // a Transform just wants to buffer hwm number of elements. To avoid 931cb0ef41Sopenharmony_ci // buffering twice we disable buffering on the writable side. 941cb0ef41Sopenharmony_ci options = { 951cb0ef41Sopenharmony_ci ...options, 961cb0ef41Sopenharmony_ci highWaterMark: null, 971cb0ef41Sopenharmony_ci readableHighWaterMark, 981cb0ef41Sopenharmony_ci // TODO (ronag): 0 is not optimal since we have 991cb0ef41Sopenharmony_ci // a "bug" where we check needDrain before calling _write and not after. 1001cb0ef41Sopenharmony_ci // Refs: https://github.com/nodejs/node/pull/32887 1011cb0ef41Sopenharmony_ci // Refs: https://github.com/nodejs/node/pull/35941 1021cb0ef41Sopenharmony_ci writableHighWaterMark: options.writableHighWaterMark || 0, 1031cb0ef41Sopenharmony_ci }; 1041cb0ef41Sopenharmony_ci } 1051cb0ef41Sopenharmony_ci 1061cb0ef41Sopenharmony_ci Duplex.call(this, options); 1071cb0ef41Sopenharmony_ci 1081cb0ef41Sopenharmony_ci // We have implemented the _read method, and done the other things 1091cb0ef41Sopenharmony_ci // that Readable wants before the first _read call, so unset the 1101cb0ef41Sopenharmony_ci // sync guard flag. 1111cb0ef41Sopenharmony_ci this._readableState.sync = false; 1121cb0ef41Sopenharmony_ci 1131cb0ef41Sopenharmony_ci this[kCallback] = null; 1141cb0ef41Sopenharmony_ci 1151cb0ef41Sopenharmony_ci if (options) { 1161cb0ef41Sopenharmony_ci if (typeof options.transform === 'function') 1171cb0ef41Sopenharmony_ci this._transform = options.transform; 1181cb0ef41Sopenharmony_ci 1191cb0ef41Sopenharmony_ci if (typeof options.flush === 'function') 1201cb0ef41Sopenharmony_ci this._flush = options.flush; 1211cb0ef41Sopenharmony_ci } 1221cb0ef41Sopenharmony_ci 1231cb0ef41Sopenharmony_ci // When the writable side finishes, then flush out anything remaining. 1241cb0ef41Sopenharmony_ci // Backwards compat. Some Transform streams incorrectly implement _final 1251cb0ef41Sopenharmony_ci // instead of or in addition to _flush. By using 'prefinish' instead of 1261cb0ef41Sopenharmony_ci // implementing _final we continue supporting this unfortunate use case. 1271cb0ef41Sopenharmony_ci this.on('prefinish', prefinish); 1281cb0ef41Sopenharmony_ci} 1291cb0ef41Sopenharmony_ci 1301cb0ef41Sopenharmony_cifunction final(cb) { 1311cb0ef41Sopenharmony_ci if (typeof this._flush === 'function' && !this.destroyed) { 1321cb0ef41Sopenharmony_ci this._flush((er, data) => { 1331cb0ef41Sopenharmony_ci if (er) { 1341cb0ef41Sopenharmony_ci if (cb) { 1351cb0ef41Sopenharmony_ci cb(er); 1361cb0ef41Sopenharmony_ci } else { 1371cb0ef41Sopenharmony_ci this.destroy(er); 1381cb0ef41Sopenharmony_ci } 1391cb0ef41Sopenharmony_ci return; 1401cb0ef41Sopenharmony_ci } 1411cb0ef41Sopenharmony_ci 1421cb0ef41Sopenharmony_ci if (data != null) { 1431cb0ef41Sopenharmony_ci this.push(data); 1441cb0ef41Sopenharmony_ci } 1451cb0ef41Sopenharmony_ci this.push(null); 1461cb0ef41Sopenharmony_ci if (cb) { 1471cb0ef41Sopenharmony_ci cb(); 1481cb0ef41Sopenharmony_ci } 1491cb0ef41Sopenharmony_ci }); 1501cb0ef41Sopenharmony_ci } else { 1511cb0ef41Sopenharmony_ci this.push(null); 1521cb0ef41Sopenharmony_ci if (cb) { 1531cb0ef41Sopenharmony_ci cb(); 1541cb0ef41Sopenharmony_ci } 1551cb0ef41Sopenharmony_ci } 1561cb0ef41Sopenharmony_ci} 1571cb0ef41Sopenharmony_ci 1581cb0ef41Sopenharmony_cifunction prefinish() { 1591cb0ef41Sopenharmony_ci if (this._final !== final) { 1601cb0ef41Sopenharmony_ci final.call(this); 1611cb0ef41Sopenharmony_ci } 1621cb0ef41Sopenharmony_ci} 1631cb0ef41Sopenharmony_ci 1641cb0ef41Sopenharmony_ciTransform.prototype._final = final; 1651cb0ef41Sopenharmony_ci 1661cb0ef41Sopenharmony_ciTransform.prototype._transform = function(chunk, encoding, callback) { 1671cb0ef41Sopenharmony_ci throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()'); 1681cb0ef41Sopenharmony_ci}; 1691cb0ef41Sopenharmony_ci 1701cb0ef41Sopenharmony_ciTransform.prototype._write = function(chunk, encoding, callback) { 1711cb0ef41Sopenharmony_ci const rState = this._readableState; 1721cb0ef41Sopenharmony_ci const wState = this._writableState; 1731cb0ef41Sopenharmony_ci const length = rState.length; 1741cb0ef41Sopenharmony_ci 1751cb0ef41Sopenharmony_ci this._transform(chunk, encoding, (err, val) => { 1761cb0ef41Sopenharmony_ci if (err) { 1771cb0ef41Sopenharmony_ci callback(err); 1781cb0ef41Sopenharmony_ci return; 1791cb0ef41Sopenharmony_ci } 1801cb0ef41Sopenharmony_ci 1811cb0ef41Sopenharmony_ci if (val != null) { 1821cb0ef41Sopenharmony_ci this.push(val); 1831cb0ef41Sopenharmony_ci } 1841cb0ef41Sopenharmony_ci 1851cb0ef41Sopenharmony_ci if ( 1861cb0ef41Sopenharmony_ci wState.ended || // Backwards compat. 1871cb0ef41Sopenharmony_ci length === rState.length || // Backwards compat. 1881cb0ef41Sopenharmony_ci rState.length < rState.highWaterMark 1891cb0ef41Sopenharmony_ci ) { 1901cb0ef41Sopenharmony_ci callback(); 1911cb0ef41Sopenharmony_ci } else { 1921cb0ef41Sopenharmony_ci this[kCallback] = callback; 1931cb0ef41Sopenharmony_ci } 1941cb0ef41Sopenharmony_ci }); 1951cb0ef41Sopenharmony_ci}; 1961cb0ef41Sopenharmony_ci 1971cb0ef41Sopenharmony_ciTransform.prototype._read = function() { 1981cb0ef41Sopenharmony_ci if (this[kCallback]) { 1991cb0ef41Sopenharmony_ci const callback = this[kCallback]; 2001cb0ef41Sopenharmony_ci this[kCallback] = null; 2011cb0ef41Sopenharmony_ci callback(); 2021cb0ef41Sopenharmony_ci } 2031cb0ef41Sopenharmony_ci}; 204