Lines Matching refs:stream

49 const eos = require('internal/streams/end-of-stream');
51 let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
143 // Indicates whether the stream has finished destroying.
154 function ReadableState(options, stream, isDuplex) {
158 // values for the readable and the writable sides of the duplex stream.
161 isDuplex = stream instanceof Stream.Duplex;
166 // Object stream flag. Used to make read(n) ignore n and to
196 // Indicates whether the stream has errored. When true no further
199 // stream has failed.
286 function readableAddChunk(stream, chunk, encoding, addToFront) {
288 const state = stream._readableState;
316 errorOrDestroy(stream, err);
319 onEofChunk(stream, state);
323 errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
327 addChunk(stream, state, chunk, true);
329 errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
337 addChunk(stream, state, chunk, false);
339 maybeReadMore(stream, state);
341 addChunk(stream, state, chunk, false);
346 maybeReadMore(stream, state);
356 function addChunk(stream, state, chunk, addToFront) {
358 stream.listenerCount('data') > 0) {
368 stream.emit('data', chunk);
378 emitReadable(stream);
380 maybeReadMore(stream, state);
493 // synthetic stream cases, such as passthrough streams, _read
588 function onEofChunk(stream, state) {
604 emitReadable(stream);
611 emitReadable_(stream);
618 function emitReadable(stream) {
619 const state = stream._readableState;
625 process.nextTick(emitReadable_, stream);
629 function emitReadable_(stream) {
630 const state = stream._readableState;
633 stream.emit('readable');
637 // The stream needs another readable event if:
647 flow(stream);
657 function maybeReadMore(stream, state) {
660 process.nextTick(maybeReadMore_, stream, state);
664 function maybeReadMore_(stream, state) {
672 // when the stream consumer calls read() instead.
673 // - No data in the buffer, and the stream is in flowing mode. In this mode
676 // continuing the flow if the stream consumer has just subscribed to the
681 // - The stream has ended (state.ended).
683 // case where the stream has called the implementation defined _read()
693 stream.read(0);
825 // User incorrectly emitted 'error' directly on the stream.
936 // Try start flowing on next tick if stream isn't explicitly paused.
1014 // pause() and resume() are remnants of the legacy readable stream API
1030 function resume(stream, state) {
1033 process.nextTick(resume_, stream, state);
1037 function resume_(stream, state) {
1040 stream.read(0);
1044 stream.emit('resume');
1045 flow(stream);
1047 stream.read(0);
1061 function flow(stream) {
1062 const state = stream._readableState;
1064 while (state.flowing && stream.read() !== null);
1067 // Wrap an old-style stream as the async data source.
1068 // This is *not* part of the readable stream interface.
1070 Readable.prototype.wrap = function(stream) {
1074 // 'error' on the wrapped stream? Would require
1075 // a static factory method, e.g. Readable.wrap(stream).
1077 stream.on('data', (chunk) => {
1078 if (!this.push(chunk) && stream.pause) {
1080 stream.pause();
1084 stream.on('end', () => {
1088 stream.on('error', (err) => {
1092 stream.on('close', () => {
1096 stream.on('destroy', () => {
1101 if (paused && stream.resume) {
1103 stream.resume();
1108 const streamKeys = ObjectKeys(stream);
1111 if (this[i] === undefined && typeof stream[i] === 'function') {
1112 this[i] = stream[i].bind(stream);
1130 function streamToAsyncIterator(stream, options) {
1131 if (typeof stream.read !== 'function') {
1132 stream = Readable.wrap(stream, { objectMode: true });
1135 const iter = createAsyncIterator(stream, options);
1136 iter.stream = stream;
1140 async function* createAsyncIterator(stream, options) {
1144 if (this === stream) {
1152 stream.on('readable', next);
1155 const cleanup = eos(stream, { writable: false }, (err) => {
1163 const chunk = stream.destroyed ? null : stream.read();
1180 (error === undefined || stream._readableState.autoDestroy)
1182 destroyImpl.destroyer(stream, null);
1184 stream.off('readable', next);
1198 // r.readable === false means that this is part of a Duplex stream
1308 // We ignore the value if the stream
1383 function endReadable(stream) {
1384 const state = stream._readableState;
1389 process.nextTick(endReadableNT, state, stream);
1393 function endReadableNT(state, stream) {
1400 stream.emit('end');
1402 if (stream.writable && stream.allowHalfOpen === false) {
1403 process.nextTick(endWritableNT, stream);
1407 const wState = stream._writableState;
1416 stream.destroy();
1422 function endWritableNT(stream) {
1423 const writable = stream.writable && !stream.writableEnded &&
1424 !stream.destroyed;
1426 stream.end();