11cb0ef41Sopenharmony_ci'use strict'; 21cb0ef41Sopenharmony_ci 31cb0ef41Sopenharmony_ciconst { 41cb0ef41Sopenharmony_ci Array, 51cb0ef41Sopenharmony_ci Symbol, 61cb0ef41Sopenharmony_ci} = primordials; 71cb0ef41Sopenharmony_ci 81cb0ef41Sopenharmony_ciconst { Buffer } = require('buffer'); 91cb0ef41Sopenharmony_ciconst { FastBuffer } = require('internal/buffer'); 101cb0ef41Sopenharmony_ciconst { 111cb0ef41Sopenharmony_ci WriteWrap, 121cb0ef41Sopenharmony_ci kReadBytesOrError, 131cb0ef41Sopenharmony_ci kArrayBufferOffset, 141cb0ef41Sopenharmony_ci kBytesWritten, 151cb0ef41Sopenharmony_ci kLastWriteWasAsync, 161cb0ef41Sopenharmony_ci streamBaseState, 171cb0ef41Sopenharmony_ci} = internalBinding('stream_wrap'); 181cb0ef41Sopenharmony_ciconst { UV_EOF } = internalBinding('uv'); 191cb0ef41Sopenharmony_ciconst { 201cb0ef41Sopenharmony_ci errnoException, 211cb0ef41Sopenharmony_ci} = require('internal/errors'); 221cb0ef41Sopenharmony_ciconst { owner_symbol } = require('internal/async_hooks').symbols; 231cb0ef41Sopenharmony_ciconst { 241cb0ef41Sopenharmony_ci kTimeout, 251cb0ef41Sopenharmony_ci setUnrefTimeout, 261cb0ef41Sopenharmony_ci getTimerDuration, 271cb0ef41Sopenharmony_ci} = require('internal/timers'); 281cb0ef41Sopenharmony_ciconst { isUint8Array } = require('internal/util/types'); 291cb0ef41Sopenharmony_ciconst { clearTimeout } = require('timers'); 301cb0ef41Sopenharmony_ciconst { validateFunction } = require('internal/validators'); 311cb0ef41Sopenharmony_ci 321cb0ef41Sopenharmony_ciconst kMaybeDestroy = Symbol('kMaybeDestroy'); 331cb0ef41Sopenharmony_ciconst kUpdateTimer = Symbol('kUpdateTimer'); 341cb0ef41Sopenharmony_ciconst kAfterAsyncWrite = Symbol('kAfterAsyncWrite'); 351cb0ef41Sopenharmony_ciconst kHandle = Symbol('kHandle'); 361cb0ef41Sopenharmony_ciconst kSession = Symbol('kSession'); 371cb0ef41Sopenharmony_ci 381cb0ef41Sopenharmony_cilet debug = require('internal/util/debuglog').debuglog('stream', (fn) => { 391cb0ef41Sopenharmony_ci debug = fn; 401cb0ef41Sopenharmony_ci}); 411cb0ef41Sopenharmony_ciconst kBuffer = Symbol('kBuffer'); 421cb0ef41Sopenharmony_ciconst kBufferGen = Symbol('kBufferGen'); 431cb0ef41Sopenharmony_ciconst kBufferCb = Symbol('kBufferCb'); 441cb0ef41Sopenharmony_ci 451cb0ef41Sopenharmony_cifunction handleWriteReq(req, data, encoding) { 461cb0ef41Sopenharmony_ci const { handle } = req; 471cb0ef41Sopenharmony_ci 481cb0ef41Sopenharmony_ci switch (encoding) { 491cb0ef41Sopenharmony_ci case 'buffer': 501cb0ef41Sopenharmony_ci { 511cb0ef41Sopenharmony_ci const ret = handle.writeBuffer(req, data); 521cb0ef41Sopenharmony_ci if (streamBaseState[kLastWriteWasAsync]) 531cb0ef41Sopenharmony_ci req.buffer = data; 541cb0ef41Sopenharmony_ci return ret; 551cb0ef41Sopenharmony_ci } 561cb0ef41Sopenharmony_ci case 'latin1': 571cb0ef41Sopenharmony_ci case 'binary': 581cb0ef41Sopenharmony_ci return handle.writeLatin1String(req, data); 591cb0ef41Sopenharmony_ci case 'utf8': 601cb0ef41Sopenharmony_ci case 'utf-8': 611cb0ef41Sopenharmony_ci return handle.writeUtf8String(req, data); 621cb0ef41Sopenharmony_ci case 'ascii': 631cb0ef41Sopenharmony_ci return handle.writeAsciiString(req, data); 641cb0ef41Sopenharmony_ci case 'ucs2': 651cb0ef41Sopenharmony_ci case 'ucs-2': 661cb0ef41Sopenharmony_ci case 'utf16le': 671cb0ef41Sopenharmony_ci case 'utf-16le': 681cb0ef41Sopenharmony_ci return handle.writeUcs2String(req, data); 691cb0ef41Sopenharmony_ci default: 701cb0ef41Sopenharmony_ci { 711cb0ef41Sopenharmony_ci const buffer = Buffer.from(data, encoding); 721cb0ef41Sopenharmony_ci const ret = handle.writeBuffer(req, buffer); 731cb0ef41Sopenharmony_ci if (streamBaseState[kLastWriteWasAsync]) 741cb0ef41Sopenharmony_ci req.buffer = buffer; 751cb0ef41Sopenharmony_ci return ret; 761cb0ef41Sopenharmony_ci } 771cb0ef41Sopenharmony_ci } 781cb0ef41Sopenharmony_ci} 791cb0ef41Sopenharmony_ci 801cb0ef41Sopenharmony_cifunction onWriteComplete(status) { 811cb0ef41Sopenharmony_ci debug('onWriteComplete', status, this.error); 821cb0ef41Sopenharmony_ci 831cb0ef41Sopenharmony_ci const stream = this.handle[owner_symbol]; 841cb0ef41Sopenharmony_ci 851cb0ef41Sopenharmony_ci if (stream.destroyed) { 861cb0ef41Sopenharmony_ci if (typeof this.callback === 'function') 871cb0ef41Sopenharmony_ci this.callback(null); 881cb0ef41Sopenharmony_ci return; 891cb0ef41Sopenharmony_ci } 901cb0ef41Sopenharmony_ci 911cb0ef41Sopenharmony_ci // TODO (ronag): This should be moved before if(stream.destroyed) 921cb0ef41Sopenharmony_ci // in order to avoid swallowing error. 931cb0ef41Sopenharmony_ci if (status < 0) { 941cb0ef41Sopenharmony_ci const ex = errnoException(status, 'write', this.error); 951cb0ef41Sopenharmony_ci if (typeof this.callback === 'function') 961cb0ef41Sopenharmony_ci this.callback(ex); 971cb0ef41Sopenharmony_ci else 981cb0ef41Sopenharmony_ci stream.destroy(ex); 991cb0ef41Sopenharmony_ci return; 1001cb0ef41Sopenharmony_ci } 1011cb0ef41Sopenharmony_ci 1021cb0ef41Sopenharmony_ci stream[kUpdateTimer](); 1031cb0ef41Sopenharmony_ci stream[kAfterAsyncWrite](this); 1041cb0ef41Sopenharmony_ci 1051cb0ef41Sopenharmony_ci if (typeof this.callback === 'function') 1061cb0ef41Sopenharmony_ci this.callback(null); 1071cb0ef41Sopenharmony_ci} 1081cb0ef41Sopenharmony_ci 1091cb0ef41Sopenharmony_cifunction createWriteWrap(handle, callback) { 1101cb0ef41Sopenharmony_ci const req = new WriteWrap(); 1111cb0ef41Sopenharmony_ci 1121cb0ef41Sopenharmony_ci req.handle = handle; 1131cb0ef41Sopenharmony_ci req.oncomplete = onWriteComplete; 1141cb0ef41Sopenharmony_ci req.async = false; 1151cb0ef41Sopenharmony_ci req.bytes = 0; 1161cb0ef41Sopenharmony_ci req.buffer = null; 1171cb0ef41Sopenharmony_ci req.callback = callback; 1181cb0ef41Sopenharmony_ci 1191cb0ef41Sopenharmony_ci return req; 1201cb0ef41Sopenharmony_ci} 1211cb0ef41Sopenharmony_ci 1221cb0ef41Sopenharmony_cifunction writevGeneric(self, data, cb) { 1231cb0ef41Sopenharmony_ci const req = createWriteWrap(self[kHandle], cb); 1241cb0ef41Sopenharmony_ci const allBuffers = data.allBuffers; 1251cb0ef41Sopenharmony_ci let chunks; 1261cb0ef41Sopenharmony_ci if (allBuffers) { 1271cb0ef41Sopenharmony_ci chunks = data; 1281cb0ef41Sopenharmony_ci for (let i = 0; i < data.length; i++) 1291cb0ef41Sopenharmony_ci data[i] = data[i].chunk; 1301cb0ef41Sopenharmony_ci } else { 1311cb0ef41Sopenharmony_ci chunks = new Array(data.length << 1); 1321cb0ef41Sopenharmony_ci for (let i = 0; i < data.length; i++) { 1331cb0ef41Sopenharmony_ci const entry = data[i]; 1341cb0ef41Sopenharmony_ci chunks[i * 2] = entry.chunk; 1351cb0ef41Sopenharmony_ci chunks[i * 2 + 1] = entry.encoding; 1361cb0ef41Sopenharmony_ci } 1371cb0ef41Sopenharmony_ci } 1381cb0ef41Sopenharmony_ci const err = req.handle.writev(req, chunks, allBuffers); 1391cb0ef41Sopenharmony_ci 1401cb0ef41Sopenharmony_ci // Retain chunks 1411cb0ef41Sopenharmony_ci if (err === 0) req._chunks = chunks; 1421cb0ef41Sopenharmony_ci 1431cb0ef41Sopenharmony_ci afterWriteDispatched(req, err, cb); 1441cb0ef41Sopenharmony_ci return req; 1451cb0ef41Sopenharmony_ci} 1461cb0ef41Sopenharmony_ci 1471cb0ef41Sopenharmony_cifunction writeGeneric(self, data, encoding, cb) { 1481cb0ef41Sopenharmony_ci const req = createWriteWrap(self[kHandle], cb); 1491cb0ef41Sopenharmony_ci const err = handleWriteReq(req, data, encoding); 1501cb0ef41Sopenharmony_ci 1511cb0ef41Sopenharmony_ci afterWriteDispatched(req, err, cb); 1521cb0ef41Sopenharmony_ci return req; 1531cb0ef41Sopenharmony_ci} 1541cb0ef41Sopenharmony_ci 1551cb0ef41Sopenharmony_cifunction afterWriteDispatched(req, err, cb) { 1561cb0ef41Sopenharmony_ci req.bytes = streamBaseState[kBytesWritten]; 1571cb0ef41Sopenharmony_ci req.async = !!streamBaseState[kLastWriteWasAsync]; 1581cb0ef41Sopenharmony_ci 1591cb0ef41Sopenharmony_ci if (err !== 0) 1601cb0ef41Sopenharmony_ci return cb(errnoException(err, 'write', req.error)); 1611cb0ef41Sopenharmony_ci 1621cb0ef41Sopenharmony_ci if (!req.async && typeof req.callback === 'function') { 1631cb0ef41Sopenharmony_ci req.callback(); 1641cb0ef41Sopenharmony_ci } 1651cb0ef41Sopenharmony_ci} 1661cb0ef41Sopenharmony_ci 1671cb0ef41Sopenharmony_cifunction onStreamRead(arrayBuffer) { 1681cb0ef41Sopenharmony_ci const nread = streamBaseState[kReadBytesOrError]; 1691cb0ef41Sopenharmony_ci 1701cb0ef41Sopenharmony_ci const handle = this; 1711cb0ef41Sopenharmony_ci const stream = this[owner_symbol]; 1721cb0ef41Sopenharmony_ci 1731cb0ef41Sopenharmony_ci stream[kUpdateTimer](); 1741cb0ef41Sopenharmony_ci 1751cb0ef41Sopenharmony_ci if (nread > 0 && !stream.destroyed) { 1761cb0ef41Sopenharmony_ci let ret; 1771cb0ef41Sopenharmony_ci let result; 1781cb0ef41Sopenharmony_ci const userBuf = stream[kBuffer]; 1791cb0ef41Sopenharmony_ci if (userBuf) { 1801cb0ef41Sopenharmony_ci result = (stream[kBufferCb](nread, userBuf) !== false); 1811cb0ef41Sopenharmony_ci const bufGen = stream[kBufferGen]; 1821cb0ef41Sopenharmony_ci if (bufGen !== null) { 1831cb0ef41Sopenharmony_ci const nextBuf = bufGen(); 1841cb0ef41Sopenharmony_ci if (isUint8Array(nextBuf)) 1851cb0ef41Sopenharmony_ci stream[kBuffer] = ret = nextBuf; 1861cb0ef41Sopenharmony_ci } 1871cb0ef41Sopenharmony_ci } else { 1881cb0ef41Sopenharmony_ci const offset = streamBaseState[kArrayBufferOffset]; 1891cb0ef41Sopenharmony_ci const buf = new FastBuffer(arrayBuffer, offset, nread); 1901cb0ef41Sopenharmony_ci result = stream.push(buf); 1911cb0ef41Sopenharmony_ci } 1921cb0ef41Sopenharmony_ci if (!result) { 1931cb0ef41Sopenharmony_ci handle.reading = false; 1941cb0ef41Sopenharmony_ci if (!stream.destroyed) { 1951cb0ef41Sopenharmony_ci const err = handle.readStop(); 1961cb0ef41Sopenharmony_ci if (err) 1971cb0ef41Sopenharmony_ci stream.destroy(errnoException(err, 'read')); 1981cb0ef41Sopenharmony_ci } 1991cb0ef41Sopenharmony_ci } 2001cb0ef41Sopenharmony_ci 2011cb0ef41Sopenharmony_ci return ret; 2021cb0ef41Sopenharmony_ci } 2031cb0ef41Sopenharmony_ci 2041cb0ef41Sopenharmony_ci if (nread === 0) { 2051cb0ef41Sopenharmony_ci return; 2061cb0ef41Sopenharmony_ci } 2071cb0ef41Sopenharmony_ci 2081cb0ef41Sopenharmony_ci // After seeing EOF, most streams will be closed permanently, 2091cb0ef41Sopenharmony_ci // and will not deliver any more read events after this point. 2101cb0ef41Sopenharmony_ci // (equivalently, it should have called readStop on itself already). 2111cb0ef41Sopenharmony_ci // Some streams may be reset and explicitly started again with a call 2121cb0ef41Sopenharmony_ci // to readStart, such as TTY. 2131cb0ef41Sopenharmony_ci 2141cb0ef41Sopenharmony_ci if (nread !== UV_EOF) { 2151cb0ef41Sopenharmony_ci // CallJSOnreadMethod expects the return value to be a buffer. 2161cb0ef41Sopenharmony_ci // Ref: https://github.com/nodejs/node/pull/34375 2171cb0ef41Sopenharmony_ci stream.destroy(errnoException(nread, 'read')); 2181cb0ef41Sopenharmony_ci return; 2191cb0ef41Sopenharmony_ci } 2201cb0ef41Sopenharmony_ci 2211cb0ef41Sopenharmony_ci // Defer this until we actually emit end 2221cb0ef41Sopenharmony_ci if (stream._readableState.endEmitted) { 2231cb0ef41Sopenharmony_ci if (stream[kMaybeDestroy]) 2241cb0ef41Sopenharmony_ci stream[kMaybeDestroy](); 2251cb0ef41Sopenharmony_ci } else { 2261cb0ef41Sopenharmony_ci if (stream[kMaybeDestroy]) 2271cb0ef41Sopenharmony_ci stream.on('end', stream[kMaybeDestroy]); 2281cb0ef41Sopenharmony_ci 2291cb0ef41Sopenharmony_ci // Push a null to signal the end of data. 2301cb0ef41Sopenharmony_ci // Do it before `maybeDestroy` for correct order of events: 2311cb0ef41Sopenharmony_ci // `end` -> `close` 2321cb0ef41Sopenharmony_ci stream.push(null); 2331cb0ef41Sopenharmony_ci stream.read(0); 2341cb0ef41Sopenharmony_ci } 2351cb0ef41Sopenharmony_ci} 2361cb0ef41Sopenharmony_ci 2371cb0ef41Sopenharmony_cifunction setStreamTimeout(msecs, callback) { 2381cb0ef41Sopenharmony_ci if (this.destroyed) 2391cb0ef41Sopenharmony_ci return this; 2401cb0ef41Sopenharmony_ci 2411cb0ef41Sopenharmony_ci this.timeout = msecs; 2421cb0ef41Sopenharmony_ci 2431cb0ef41Sopenharmony_ci // Type checking identical to timers.enroll() 2441cb0ef41Sopenharmony_ci msecs = getTimerDuration(msecs, 'msecs'); 2451cb0ef41Sopenharmony_ci 2461cb0ef41Sopenharmony_ci // Attempt to clear an existing timer in both cases - 2471cb0ef41Sopenharmony_ci // even if it will be rescheduled we don't want to leak an existing timer. 2481cb0ef41Sopenharmony_ci clearTimeout(this[kTimeout]); 2491cb0ef41Sopenharmony_ci 2501cb0ef41Sopenharmony_ci if (msecs === 0) { 2511cb0ef41Sopenharmony_ci if (callback !== undefined) { 2521cb0ef41Sopenharmony_ci validateFunction(callback, 'callback'); 2531cb0ef41Sopenharmony_ci this.removeListener('timeout', callback); 2541cb0ef41Sopenharmony_ci } 2551cb0ef41Sopenharmony_ci } else { 2561cb0ef41Sopenharmony_ci this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs); 2571cb0ef41Sopenharmony_ci if (this[kSession]) this[kSession][kUpdateTimer](); 2581cb0ef41Sopenharmony_ci 2591cb0ef41Sopenharmony_ci if (callback !== undefined) { 2601cb0ef41Sopenharmony_ci validateFunction(callback, 'callback'); 2611cb0ef41Sopenharmony_ci this.once('timeout', callback); 2621cb0ef41Sopenharmony_ci } 2631cb0ef41Sopenharmony_ci } 2641cb0ef41Sopenharmony_ci return this; 2651cb0ef41Sopenharmony_ci} 2661cb0ef41Sopenharmony_ci 2671cb0ef41Sopenharmony_cimodule.exports = { 2681cb0ef41Sopenharmony_ci writevGeneric, 2691cb0ef41Sopenharmony_ci writeGeneric, 2701cb0ef41Sopenharmony_ci onStreamRead, 2711cb0ef41Sopenharmony_ci kAfterAsyncWrite, 2721cb0ef41Sopenharmony_ci kMaybeDestroy, 2731cb0ef41Sopenharmony_ci kUpdateTimer, 2741cb0ef41Sopenharmony_ci kHandle, 2751cb0ef41Sopenharmony_ci kSession, 2761cb0ef41Sopenharmony_ci setStreamTimeout, 2771cb0ef41Sopenharmony_ci kBuffer, 2781cb0ef41Sopenharmony_ci kBufferCb, 2791cb0ef41Sopenharmony_ci kBufferGen, 2801cb0ef41Sopenharmony_ci}; 281