11cb0ef41Sopenharmony_ci'use strict';
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst {
41cb0ef41Sopenharmony_ci  Array,
51cb0ef41Sopenharmony_ci  Symbol,
61cb0ef41Sopenharmony_ci} = primordials;
71cb0ef41Sopenharmony_ci
81cb0ef41Sopenharmony_ciconst { Buffer } = require('buffer');
91cb0ef41Sopenharmony_ciconst { FastBuffer } = require('internal/buffer');
101cb0ef41Sopenharmony_ciconst {
111cb0ef41Sopenharmony_ci  WriteWrap,
121cb0ef41Sopenharmony_ci  kReadBytesOrError,
131cb0ef41Sopenharmony_ci  kArrayBufferOffset,
141cb0ef41Sopenharmony_ci  kBytesWritten,
151cb0ef41Sopenharmony_ci  kLastWriteWasAsync,
161cb0ef41Sopenharmony_ci  streamBaseState,
171cb0ef41Sopenharmony_ci} = internalBinding('stream_wrap');
181cb0ef41Sopenharmony_ciconst { UV_EOF } = internalBinding('uv');
191cb0ef41Sopenharmony_ciconst {
201cb0ef41Sopenharmony_ci  errnoException,
211cb0ef41Sopenharmony_ci} = require('internal/errors');
221cb0ef41Sopenharmony_ciconst { owner_symbol } = require('internal/async_hooks').symbols;
231cb0ef41Sopenharmony_ciconst {
241cb0ef41Sopenharmony_ci  kTimeout,
251cb0ef41Sopenharmony_ci  setUnrefTimeout,
261cb0ef41Sopenharmony_ci  getTimerDuration,
271cb0ef41Sopenharmony_ci} = require('internal/timers');
281cb0ef41Sopenharmony_ciconst { isUint8Array } = require('internal/util/types');
291cb0ef41Sopenharmony_ciconst { clearTimeout } = require('timers');
301cb0ef41Sopenharmony_ciconst { validateFunction } = require('internal/validators');
311cb0ef41Sopenharmony_ci
321cb0ef41Sopenharmony_ciconst kMaybeDestroy = Symbol('kMaybeDestroy');
331cb0ef41Sopenharmony_ciconst kUpdateTimer = Symbol('kUpdateTimer');
341cb0ef41Sopenharmony_ciconst kAfterAsyncWrite = Symbol('kAfterAsyncWrite');
351cb0ef41Sopenharmony_ciconst kHandle = Symbol('kHandle');
361cb0ef41Sopenharmony_ciconst kSession = Symbol('kSession');
371cb0ef41Sopenharmony_ci
381cb0ef41Sopenharmony_cilet debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
391cb0ef41Sopenharmony_ci  debug = fn;
401cb0ef41Sopenharmony_ci});
411cb0ef41Sopenharmony_ciconst kBuffer = Symbol('kBuffer');
421cb0ef41Sopenharmony_ciconst kBufferGen = Symbol('kBufferGen');
431cb0ef41Sopenharmony_ciconst kBufferCb = Symbol('kBufferCb');
441cb0ef41Sopenharmony_ci
451cb0ef41Sopenharmony_cifunction handleWriteReq(req, data, encoding) {
461cb0ef41Sopenharmony_ci  const { handle } = req;
471cb0ef41Sopenharmony_ci
481cb0ef41Sopenharmony_ci  switch (encoding) {
491cb0ef41Sopenharmony_ci    case 'buffer':
501cb0ef41Sopenharmony_ci    {
511cb0ef41Sopenharmony_ci      const ret = handle.writeBuffer(req, data);
521cb0ef41Sopenharmony_ci      if (streamBaseState[kLastWriteWasAsync])
531cb0ef41Sopenharmony_ci        req.buffer = data;
541cb0ef41Sopenharmony_ci      return ret;
551cb0ef41Sopenharmony_ci    }
561cb0ef41Sopenharmony_ci    case 'latin1':
571cb0ef41Sopenharmony_ci    case 'binary':
581cb0ef41Sopenharmony_ci      return handle.writeLatin1String(req, data);
591cb0ef41Sopenharmony_ci    case 'utf8':
601cb0ef41Sopenharmony_ci    case 'utf-8':
611cb0ef41Sopenharmony_ci      return handle.writeUtf8String(req, data);
621cb0ef41Sopenharmony_ci    case 'ascii':
631cb0ef41Sopenharmony_ci      return handle.writeAsciiString(req, data);
641cb0ef41Sopenharmony_ci    case 'ucs2':
651cb0ef41Sopenharmony_ci    case 'ucs-2':
661cb0ef41Sopenharmony_ci    case 'utf16le':
671cb0ef41Sopenharmony_ci    case 'utf-16le':
681cb0ef41Sopenharmony_ci      return handle.writeUcs2String(req, data);
691cb0ef41Sopenharmony_ci    default:
701cb0ef41Sopenharmony_ci    {
711cb0ef41Sopenharmony_ci      const buffer = Buffer.from(data, encoding);
721cb0ef41Sopenharmony_ci      const ret = handle.writeBuffer(req, buffer);
731cb0ef41Sopenharmony_ci      if (streamBaseState[kLastWriteWasAsync])
741cb0ef41Sopenharmony_ci        req.buffer = buffer;
751cb0ef41Sopenharmony_ci      return ret;
761cb0ef41Sopenharmony_ci    }
771cb0ef41Sopenharmony_ci  }
781cb0ef41Sopenharmony_ci}
791cb0ef41Sopenharmony_ci
801cb0ef41Sopenharmony_cifunction onWriteComplete(status) {
811cb0ef41Sopenharmony_ci  debug('onWriteComplete', status, this.error);
821cb0ef41Sopenharmony_ci
831cb0ef41Sopenharmony_ci  const stream = this.handle[owner_symbol];
841cb0ef41Sopenharmony_ci
851cb0ef41Sopenharmony_ci  if (stream.destroyed) {
861cb0ef41Sopenharmony_ci    if (typeof this.callback === 'function')
871cb0ef41Sopenharmony_ci      this.callback(null);
881cb0ef41Sopenharmony_ci    return;
891cb0ef41Sopenharmony_ci  }
901cb0ef41Sopenharmony_ci
911cb0ef41Sopenharmony_ci  // TODO (ronag): This should be moved before if(stream.destroyed)
921cb0ef41Sopenharmony_ci  // in order to avoid swallowing error.
931cb0ef41Sopenharmony_ci  if (status < 0) {
941cb0ef41Sopenharmony_ci    const ex = errnoException(status, 'write', this.error);
951cb0ef41Sopenharmony_ci    if (typeof this.callback === 'function')
961cb0ef41Sopenharmony_ci      this.callback(ex);
971cb0ef41Sopenharmony_ci    else
981cb0ef41Sopenharmony_ci      stream.destroy(ex);
991cb0ef41Sopenharmony_ci    return;
1001cb0ef41Sopenharmony_ci  }
1011cb0ef41Sopenharmony_ci
1021cb0ef41Sopenharmony_ci  stream[kUpdateTimer]();
1031cb0ef41Sopenharmony_ci  stream[kAfterAsyncWrite](this);
1041cb0ef41Sopenharmony_ci
1051cb0ef41Sopenharmony_ci  if (typeof this.callback === 'function')
1061cb0ef41Sopenharmony_ci    this.callback(null);
1071cb0ef41Sopenharmony_ci}
1081cb0ef41Sopenharmony_ci
1091cb0ef41Sopenharmony_cifunction createWriteWrap(handle, callback) {
1101cb0ef41Sopenharmony_ci  const req = new WriteWrap();
1111cb0ef41Sopenharmony_ci
1121cb0ef41Sopenharmony_ci  req.handle = handle;
1131cb0ef41Sopenharmony_ci  req.oncomplete = onWriteComplete;
1141cb0ef41Sopenharmony_ci  req.async = false;
1151cb0ef41Sopenharmony_ci  req.bytes = 0;
1161cb0ef41Sopenharmony_ci  req.buffer = null;
1171cb0ef41Sopenharmony_ci  req.callback = callback;
1181cb0ef41Sopenharmony_ci
1191cb0ef41Sopenharmony_ci  return req;
1201cb0ef41Sopenharmony_ci}
1211cb0ef41Sopenharmony_ci
1221cb0ef41Sopenharmony_cifunction writevGeneric(self, data, cb) {
1231cb0ef41Sopenharmony_ci  const req = createWriteWrap(self[kHandle], cb);
1241cb0ef41Sopenharmony_ci  const allBuffers = data.allBuffers;
1251cb0ef41Sopenharmony_ci  let chunks;
1261cb0ef41Sopenharmony_ci  if (allBuffers) {
1271cb0ef41Sopenharmony_ci    chunks = data;
1281cb0ef41Sopenharmony_ci    for (let i = 0; i < data.length; i++)
1291cb0ef41Sopenharmony_ci      data[i] = data[i].chunk;
1301cb0ef41Sopenharmony_ci  } else {
1311cb0ef41Sopenharmony_ci    chunks = new Array(data.length << 1);
1321cb0ef41Sopenharmony_ci    for (let i = 0; i < data.length; i++) {
1331cb0ef41Sopenharmony_ci      const entry = data[i];
1341cb0ef41Sopenharmony_ci      chunks[i * 2] = entry.chunk;
1351cb0ef41Sopenharmony_ci      chunks[i * 2 + 1] = entry.encoding;
1361cb0ef41Sopenharmony_ci    }
1371cb0ef41Sopenharmony_ci  }
1381cb0ef41Sopenharmony_ci  const err = req.handle.writev(req, chunks, allBuffers);
1391cb0ef41Sopenharmony_ci
1401cb0ef41Sopenharmony_ci  // Retain chunks
1411cb0ef41Sopenharmony_ci  if (err === 0) req._chunks = chunks;
1421cb0ef41Sopenharmony_ci
1431cb0ef41Sopenharmony_ci  afterWriteDispatched(req, err, cb);
1441cb0ef41Sopenharmony_ci  return req;
1451cb0ef41Sopenharmony_ci}
1461cb0ef41Sopenharmony_ci
1471cb0ef41Sopenharmony_cifunction writeGeneric(self, data, encoding, cb) {
1481cb0ef41Sopenharmony_ci  const req = createWriteWrap(self[kHandle], cb);
1491cb0ef41Sopenharmony_ci  const err = handleWriteReq(req, data, encoding);
1501cb0ef41Sopenharmony_ci
1511cb0ef41Sopenharmony_ci  afterWriteDispatched(req, err, cb);
1521cb0ef41Sopenharmony_ci  return req;
1531cb0ef41Sopenharmony_ci}
1541cb0ef41Sopenharmony_ci
1551cb0ef41Sopenharmony_cifunction afterWriteDispatched(req, err, cb) {
1561cb0ef41Sopenharmony_ci  req.bytes = streamBaseState[kBytesWritten];
1571cb0ef41Sopenharmony_ci  req.async = !!streamBaseState[kLastWriteWasAsync];
1581cb0ef41Sopenharmony_ci
1591cb0ef41Sopenharmony_ci  if (err !== 0)
1601cb0ef41Sopenharmony_ci    return cb(errnoException(err, 'write', req.error));
1611cb0ef41Sopenharmony_ci
1621cb0ef41Sopenharmony_ci  if (!req.async && typeof req.callback === 'function') {
1631cb0ef41Sopenharmony_ci    req.callback();
1641cb0ef41Sopenharmony_ci  }
1651cb0ef41Sopenharmony_ci}
1661cb0ef41Sopenharmony_ci
1671cb0ef41Sopenharmony_cifunction onStreamRead(arrayBuffer) {
1681cb0ef41Sopenharmony_ci  const nread = streamBaseState[kReadBytesOrError];
1691cb0ef41Sopenharmony_ci
1701cb0ef41Sopenharmony_ci  const handle = this;
1711cb0ef41Sopenharmony_ci  const stream = this[owner_symbol];
1721cb0ef41Sopenharmony_ci
1731cb0ef41Sopenharmony_ci  stream[kUpdateTimer]();
1741cb0ef41Sopenharmony_ci
1751cb0ef41Sopenharmony_ci  if (nread > 0 && !stream.destroyed) {
1761cb0ef41Sopenharmony_ci    let ret;
1771cb0ef41Sopenharmony_ci    let result;
1781cb0ef41Sopenharmony_ci    const userBuf = stream[kBuffer];
1791cb0ef41Sopenharmony_ci    if (userBuf) {
1801cb0ef41Sopenharmony_ci      result = (stream[kBufferCb](nread, userBuf) !== false);
1811cb0ef41Sopenharmony_ci      const bufGen = stream[kBufferGen];
1821cb0ef41Sopenharmony_ci      if (bufGen !== null) {
1831cb0ef41Sopenharmony_ci        const nextBuf = bufGen();
1841cb0ef41Sopenharmony_ci        if (isUint8Array(nextBuf))
1851cb0ef41Sopenharmony_ci          stream[kBuffer] = ret = nextBuf;
1861cb0ef41Sopenharmony_ci      }
1871cb0ef41Sopenharmony_ci    } else {
1881cb0ef41Sopenharmony_ci      const offset = streamBaseState[kArrayBufferOffset];
1891cb0ef41Sopenharmony_ci      const buf = new FastBuffer(arrayBuffer, offset, nread);
1901cb0ef41Sopenharmony_ci      result = stream.push(buf);
1911cb0ef41Sopenharmony_ci    }
1921cb0ef41Sopenharmony_ci    if (!result) {
1931cb0ef41Sopenharmony_ci      handle.reading = false;
1941cb0ef41Sopenharmony_ci      if (!stream.destroyed) {
1951cb0ef41Sopenharmony_ci        const err = handle.readStop();
1961cb0ef41Sopenharmony_ci        if (err)
1971cb0ef41Sopenharmony_ci          stream.destroy(errnoException(err, 'read'));
1981cb0ef41Sopenharmony_ci      }
1991cb0ef41Sopenharmony_ci    }
2001cb0ef41Sopenharmony_ci
2011cb0ef41Sopenharmony_ci    return ret;
2021cb0ef41Sopenharmony_ci  }
2031cb0ef41Sopenharmony_ci
2041cb0ef41Sopenharmony_ci  if (nread === 0) {
2051cb0ef41Sopenharmony_ci    return;
2061cb0ef41Sopenharmony_ci  }
2071cb0ef41Sopenharmony_ci
2081cb0ef41Sopenharmony_ci  // After seeing EOF, most streams will be closed permanently,
2091cb0ef41Sopenharmony_ci  // and will not deliver any more read events after this point.
2101cb0ef41Sopenharmony_ci  // (equivalently, it should have called readStop on itself already).
2111cb0ef41Sopenharmony_ci  // Some streams may be reset and explicitly started again with a call
2121cb0ef41Sopenharmony_ci  // to readStart, such as TTY.
2131cb0ef41Sopenharmony_ci
2141cb0ef41Sopenharmony_ci  if (nread !== UV_EOF) {
2151cb0ef41Sopenharmony_ci    // CallJSOnreadMethod expects the return value to be a buffer.
2161cb0ef41Sopenharmony_ci    // Ref: https://github.com/nodejs/node/pull/34375
2171cb0ef41Sopenharmony_ci    stream.destroy(errnoException(nread, 'read'));
2181cb0ef41Sopenharmony_ci    return;
2191cb0ef41Sopenharmony_ci  }
2201cb0ef41Sopenharmony_ci
2211cb0ef41Sopenharmony_ci  // Defer this until we actually emit end
2221cb0ef41Sopenharmony_ci  if (stream._readableState.endEmitted) {
2231cb0ef41Sopenharmony_ci    if (stream[kMaybeDestroy])
2241cb0ef41Sopenharmony_ci      stream[kMaybeDestroy]();
2251cb0ef41Sopenharmony_ci  } else {
2261cb0ef41Sopenharmony_ci    if (stream[kMaybeDestroy])
2271cb0ef41Sopenharmony_ci      stream.on('end', stream[kMaybeDestroy]);
2281cb0ef41Sopenharmony_ci
2291cb0ef41Sopenharmony_ci    // Push a null to signal the end of data.
2301cb0ef41Sopenharmony_ci    // Do it before `maybeDestroy` for correct order of events:
2311cb0ef41Sopenharmony_ci    // `end` -> `close`
2321cb0ef41Sopenharmony_ci    stream.push(null);
2331cb0ef41Sopenharmony_ci    stream.read(0);
2341cb0ef41Sopenharmony_ci  }
2351cb0ef41Sopenharmony_ci}
2361cb0ef41Sopenharmony_ci
2371cb0ef41Sopenharmony_cifunction setStreamTimeout(msecs, callback) {
2381cb0ef41Sopenharmony_ci  if (this.destroyed)
2391cb0ef41Sopenharmony_ci    return this;
2401cb0ef41Sopenharmony_ci
2411cb0ef41Sopenharmony_ci  this.timeout = msecs;
2421cb0ef41Sopenharmony_ci
2431cb0ef41Sopenharmony_ci  // Type checking identical to timers.enroll()
2441cb0ef41Sopenharmony_ci  msecs = getTimerDuration(msecs, 'msecs');
2451cb0ef41Sopenharmony_ci
2461cb0ef41Sopenharmony_ci  // Attempt to clear an existing timer in both cases -
2471cb0ef41Sopenharmony_ci  //  even if it will be rescheduled we don't want to leak an existing timer.
2481cb0ef41Sopenharmony_ci  clearTimeout(this[kTimeout]);
2491cb0ef41Sopenharmony_ci
2501cb0ef41Sopenharmony_ci  if (msecs === 0) {
2511cb0ef41Sopenharmony_ci    if (callback !== undefined) {
2521cb0ef41Sopenharmony_ci      validateFunction(callback, 'callback');
2531cb0ef41Sopenharmony_ci      this.removeListener('timeout', callback);
2541cb0ef41Sopenharmony_ci    }
2551cb0ef41Sopenharmony_ci  } else {
2561cb0ef41Sopenharmony_ci    this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs);
2571cb0ef41Sopenharmony_ci    if (this[kSession]) this[kSession][kUpdateTimer]();
2581cb0ef41Sopenharmony_ci
2591cb0ef41Sopenharmony_ci    if (callback !== undefined) {
2601cb0ef41Sopenharmony_ci      validateFunction(callback, 'callback');
2611cb0ef41Sopenharmony_ci      this.once('timeout', callback);
2621cb0ef41Sopenharmony_ci    }
2631cb0ef41Sopenharmony_ci  }
2641cb0ef41Sopenharmony_ci  return this;
2651cb0ef41Sopenharmony_ci}
2661cb0ef41Sopenharmony_ci
2671cb0ef41Sopenharmony_cimodule.exports = {
2681cb0ef41Sopenharmony_ci  writevGeneric,
2691cb0ef41Sopenharmony_ci  writeGeneric,
2701cb0ef41Sopenharmony_ci  onStreamRead,
2711cb0ef41Sopenharmony_ci  kAfterAsyncWrite,
2721cb0ef41Sopenharmony_ci  kMaybeDestroy,
2731cb0ef41Sopenharmony_ci  kUpdateTimer,
2741cb0ef41Sopenharmony_ci  kHandle,
2751cb0ef41Sopenharmony_ci  kSession,
2761cb0ef41Sopenharmony_ci  setStreamTimeout,
2771cb0ef41Sopenharmony_ci  kBuffer,
2781cb0ef41Sopenharmony_ci  kBufferCb,
2791cb0ef41Sopenharmony_ci  kBufferGen,
2801cb0ef41Sopenharmony_ci};
281