
View on GitHub


55 mins
Test Coverage
import type { Fn, Fn0 } from "";
import { State } from "./api.js";
import { __nextID } from "./idgen.js";
import { LOGGER } from "./logger.js";
import { Subscription } from "./subscription.js";
import { defWorker } from "./defworker.js";

export interface TunnelOpts<A> {
     * Tunnelled worker instance, source blob or script URL.
     * If `interrupt` is enabled, the worker MUST be given as blob or URL.
    src: Worker | Blob | string | Fn0<Worker>;
     * Max. number of worker instances to use. Only useful if
     * `interrupt` is disabled. If more than one worker is used,
     * incoming stream values will be assigned in a round robin manner
     * and result value ordering will be non-deterministic. Workers will
     * be instantiated on demand.
     * Default: 1
    maxWorkers?: number;
     * Optional subscription ID to use.
    id?: string;
     * Optional function to extract transferables from incoming stream values,
     * e.g. ArrayBuffers. See:
    transferables?: Fn<A, any[]>;
     * If given and greater than zero, all workers will be terminated
     * after given period (in millis) after the parent stream is done.
     * Default: 1000
    terminate?: number;
     * If true, the worker will be terminated and restarted for each new
     * stream value. This is useful to avoid executing extraneous work
     * and ensures only the most rececent stream value is being processed.
     * Default: false
    interrupt?: boolean;

 * Returns a {@link Subscription} which processes received values via
 * the configured worker(s) and then passes values received back from
 * the worker(s) downstream, thereby allowing workers to be used
 * transparently for stream processing.
 * @remarks
 * Multiple worker instances are supported for concurrent processing.
 * See the {@link TunnelOpts} for details.
 * Also see {@link forkJoin} and {@link postWorker}.
 * @param opts -
export const tunnel = <A, B>(opts: TunnelOpts<A>) => new Tunnel<A, B>(opts);

 * @see {@link tunnel} for reference & examples.
export class Tunnel<A, B> extends Subscription<A, B> {
    workers: Worker[];
    src: Worker | Blob | string | Fn0<Worker>;
    transferables?: Fn<A, any[]>;
    terminate: number;
    interrupt: boolean;

    index: number;

    constructor(opts: TunnelOpts<A>) {
        super(undefined, { id: || `tunnel-${__nextID()}` });
        this.src = opts.src;
        this.workers = new Array(opts.maxWorkers || 1);
        this.transferables = opts.transferables;
        this.terminate = opts.terminate || 1000;
        this.interrupt = opts.interrupt || false;
        this.index = 0;

    next(x: A) {
        if (this.state < State.DONE) {
            let tx;
            if (this.transferables) {
                tx = this.transferables(x);
            let worker: Worker | null = this.workers[this.index];
            if (this.interrupt && worker) {
                worker = null;
            if (!worker) {
                this.workers[this.index++] = worker = defWorker(this.src);
                this.index %= this.workers.length;
                worker.addEventListener("message", (e: MessageEvent) =>
                worker.addEventListener("error", (e: ErrorEvent) =>
            worker.postMessage(x, tx || []);

    done() {
        if (this.terminate > 0) {
            setTimeout(() => {
      "terminating workers...");
                this.workers.forEach((worker) => worker && worker.terminate());
                delete (<any>this).workers;
            }, this.terminate);