lts/lib/internal/streams/async_iterator.js
'use strict';
const {
ObjectCreate,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Promise,
PromiseReject,
PromiseResolve,
Symbol,
} = primordials;
const finished = require('internal/streams/end-of-stream');
const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
const kError = Symbol('error');
const kEnded = Symbol('ended');
const kLastPromise = Symbol('lastPromise');
const kHandlePromise = Symbol('handlePromise');
const kStream = Symbol('stream');
function createIterResult(value, done) {
return { value, done };
}
function readAndResolve(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
const data = iter[kStream].read();
// We defer if data is null. We can be expecting either 'end' or 'error'.
if (data !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(createIterResult(data, false));
}
}
}
function onReadable(iter) {
// We wait for the next tick, because it might
// emit an error with `process.nextTick()`.
process.nextTick(readAndResolve, iter);
}
function wrapForNext(lastPromise, iter) {
return (resolve, reject) => {
lastPromise.then(() => {
if (iter[kEnded]) {
resolve(createIterResult(undefined, true));
return;
}
iter[kHandlePromise](resolve, reject);
}, reject);
};
}
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);
const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
get stream() {
return this[kStream];
},
next() {
// If we have detected an error in the meanwhile
// reject straight away.
const error = this[kError];
if (error !== null) {
return PromiseReject(error);
}
if (this[kEnded]) {
return PromiseResolve(createIterResult(undefined, true));
}
if (this[kStream].destroyed) {
// We need to defer via nextTick because if .destroy(err) is
// called, the error will be emitted via nextTick, and
// we cannot guarantee that there is no error lingering around
// waiting to be emitted.
return new Promise((resolve, reject) => {
process.nextTick(() => {
if (this[kError]) {
reject(this[kError]);
} else {
resolve(createIterResult(undefined, true));
}
});
});
}
// If we have multiple next() calls we will wait for the previous Promise to
// finish. This logic is optimized to support for await loops, where next()
// is only called once at a time.
const lastPromise = this[kLastPromise];
let promise;
if (lastPromise) {
promise = new Promise(wrapForNext(lastPromise, this));
} else {
// Fast path needed to support multiple this.push()
// without triggering the next() queue.
const data = this[kStream].read();
if (data !== null) {
return PromiseResolve(createIterResult(data, false));
}
promise = new Promise(this[kHandlePromise]);
}
this[kLastPromise] = promise;
return promise;
},
return() {
return new Promise((resolve, reject) => {
const stream = this[kStream];
// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
if (ended) {
resolve(createIterResult(undefined, true));
return;
}
finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
} else {
resolve(createIterResult(undefined, true));
}
});
stream.destroy();
});
},
}, AsyncIteratorPrototype);
const createReadableStreamAsyncIterator = (stream) => {
const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {
[kStream]: { value: stream, writable: true },
[kLastResolve]: { value: null, writable: true },
[kLastReject]: { value: null, writable: true },
[kError]: { value: null, writable: true },
[kEnded]: {
value: stream.readableEnded || stream._readableState.endEmitted,
writable: true
},
// The function passed to new Promise is cached so we avoid allocating a new
// closure at every run.
[kHandlePromise]: {
value: (resolve, reject) => {
const data = iterator[kStream].read();
if (data) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(data, false));
} else {
iterator[kLastResolve] = resolve;
iterator[kLastReject] = reject;
}
},
writable: true,
},
});
iterator[kLastPromise] = null;
finished(stream, { writable: false }, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
const reject = iterator[kLastReject];
// Reject if we are waiting for data in the Promise returned by next() and
// store the error.
if (reject !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
reject(err);
}
iterator[kError] = err;
return;
}
const resolve = iterator[kLastResolve];
if (resolve !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(undefined, true));
}
iterator[kEnded] = true;
});
stream.on('readable', onReadable.bind(null, iterator));
return iterator;
};
module.exports = createReadableStreamAsyncIterator;