thi-ng/umbrella

View on GitHub
packages/transducers-async/src/merge.ts

Summary

Maintainability
B
4 hrs
Test Coverage
/**
 * Async iterator version of [thi.ng/rstream's merge()
 * construct](https://docs.thi.ng/umbrella/rstream/functions/merge.html).
 *
 * @remarks
 * Also see {@link sync} for an alternative way of merging.
 *
 * @param src
 */
export async function* merge<T>(
    src: AsyncIterable<T>[]
): AsyncIterableIterator<T> {
    const iters = <{ id: number; iter: AsyncIterator<any> }[]>(
        src.map((v, id) => ({ id, iter: v[Symbol.asyncIterator]() }))
    );
    let n = iters.length;
    const $remove = (id: number) => {
        iters.splice(id, 1);
        if (!--n) return true;
        for (let i = id; i < n; i++) iters[i].id--;
    };
    // array of in-flight promises
    const promises = iters.map((iter) =>
        iter.iter.next().then((res) => ({ iter, res }))
    );
    while (true) {
        const { iter, res } = await Promise.race(promises);
        if (res.done) {
            promises.splice(iter.id, 1);
            if ($remove(iter.id)) return;
        } else {
            yield res.value;
            promises[iter.id] = iter.iter.next().then((res) => ({ res, iter }));
        }
    }
}