thi-ng/umbrella

View on GitHub
packages/rstream/src/subscription.ts

Summary

Maintainability
A
0 mins
Test Coverage
import type { Fn, Maybe } from "@thi.ng/api";
import { SEMAPHORE } from "@thi.ng/api/api";
import { peek } from "@thi.ng/arrays/peek";
import { isPlainObject } from "@thi.ng/checks/is-plain-object";
import { assert } from "@thi.ng/errors/assert";
import { illegalState } from "@thi.ng/errors/illegal-state";
import { NULL_LOGGER } from "@thi.ng/logger/null";
import type { Reduced, Reducer, Transducer } from "@thi.ng/transducers";
import { comp } from "@thi.ng/transducers/comp";
import { map } from "@thi.ng/transducers/map";
import { push } from "@thi.ng/transducers/push";
import { isReduced, unreduced } from "@thi.ng/transducers/reduced";
import {
    CloseMode,
    State,
    type CommonOpts,
    type ISubscriber,
    type ISubscription,
    type SubscriptionOpts,
    type TransformableOpts,
    type WithErrorHandlerOpts,
    type WithTransform,
} from "./api.js";
import { __optsWithID } from "./idgen.js";
import { LOGGER } from "./logger.js";

/**
 * Creates a new {@link Subscription} instance, the fundamental datatype and
 * building block provided by this package.
 *
 * @remarks
 * Most other types in rstream, including {@link Stream}s, are `Subscription`s
 * and all can be:
 *
 * - connected into directed graphs (sync or async & not necessarily DAGs)
 * - transformed using transducers (incl. support for early termination)
 * - can have any number of subscribers (optionally each w/ their own
 *   transducers)
 * - recursively unsubscribe themselves from parent after their last subscriber
 *   unsubscribed (configurable)
 * - will go into a non-recoverable error state if none of the subscribers has
 *   an error handler itself
 * - implement the
 *   [`IDeref`](https://docs.thi.ng/umbrella/api/interfaces/IDeref.html)
 *   interface
 *
 * If a transducer is provided (via the `xform` option), all received values
 * will be first processed by the transducer and only its transformed result(s)
 * (if any) will be passed to downstream subscribers. Any uncaught errors
 * *inside* the transducer will cause this subscription's error handler to be
 * called and will stop this subscription from receiving any further values (by
 * default, unless overridden).
 *
 * Subscription behavior can be customized via the additional (optional) options
 * arg. See {@link CommonOpts} and {@link SubscriptionOpts} for further details.
 *
 * @example
 * ```ts
 * import { subscription, trace } from "@thi.ng/rstream";
 * import { filter } from "@thi.ng/transducers";
 *
 * // as reactive value mechanism (same as with stream() above)
 * s = subscription();
 * s.subscribe(trace("s1"));
 * s.subscribe(trace("s2"), { xform: filter((x) => x > 25) });
 *
 * // external trigger
 * s.next(23);
 * // s1 23
 * s.next(42);
 * // s1 42
 * // s2 42
 * ```
 *
 * @param sub -
 * @param opts -
 */
export const subscription = <A, B>(
    sub?: Partial<ISubscriber<B>>,
    opts?: Partial<SubscriptionOpts<A, B>>
) => new Subscription(sub, opts);

export class Subscription<A, B> implements ISubscription<A, B> {
    id: string;
    closeIn: CloseMode;
    closeOut: CloseMode;
    parent?: ISubscription<any, A>;
    __owner?: ISubscription<any, any>;

    protected xform?: Reducer<A, B[]>;
    protected cacheLast: boolean;
    protected last: any = SEMAPHORE;
    protected state = State.IDLE;
    protected subs: Partial<ISubscriber<B>>[] = [];

    constructor(
        protected wrapped?: Partial<ISubscriber<B>>,
        opts?: Partial<SubscriptionOpts<A, B>>
    ) {
        opts = __optsWithID(`sub`, {
            closeIn: CloseMode.LAST,
            closeOut: CloseMode.LAST,
            cache: true,
            ...opts,
        });
        this.parent = opts.parent;
        this.id = opts.id!;
        this.closeIn = opts.closeIn!;
        this.closeOut = opts.closeOut!;
        this.cacheLast = opts.cache!;
        opts.xform && (this.xform = opts.xform(push()));
    }

    deref(): Maybe<B> {
        return this.last !== SEMAPHORE ? this.last : undefined;
    }

    getState() {
        return this.state;
    }

    protected setState(state: State) {
        this.state = state;
    }

    /**
     * Creates new child subscription with given subscriber and/or
     * transducer and options.
     */
    subscribe<C>(sub: ISubscription<B, C>): ISubscription<B, C>;
    subscribe(
        sub: Partial<ISubscriber<B>>,
        opts?: Partial<CommonOpts>
    ): ISubscription<B, B>;
    subscribe<C>(
        sub: Partial<ISubscriber<C>>,
        opts?: Partial<TransformableOpts<B, C>>
    ): ISubscription<B, C>;
    subscribe(
        sub: Partial<ISubscriber<any>>,
        opts: Partial<TransformableOpts<any, any>> = {}
    ): any {
        this.ensureState();
        let $sub: ISubscriber<any>;
        if (sub instanceof Subscription && !opts.xform) {
            sub.ensureState();
            // ensure sub is still unattached
            assert(!sub.parent, `sub '${sub.id}' already has a parent`);
            sub.parent = this;
            $sub = sub;
        } else {
            $sub = new Subscription(sub, { ...opts, parent: this });
        }
        this.subs.push($sub);
        this.setState(State.ACTIVE);
        $sub.setState(State.ACTIVE);
        this.last != SEMAPHORE && $sub.next(this.last);
        return $sub;
    }

    /**
     * Creates a new child subscription using given transducers and optional
     * subscription ID. Supports up to 4 transducers and if more than one
     * transducer is given, composes them in left-to-right order using
     * [`comp()`](https://docs.thi.ng/umbrella/transducers/functions/comp.html).
     *
     * Shorthand for `subscribe(comp(xf1, xf2,...), id)`
     */
    transform<C>(
        a: Transducer<B, C>,
        opts?: Partial<WithErrorHandlerOpts>
    ): ISubscription<B, C>;
    transform<C, D>(
        a: Transducer<B, C>,
        b: Transducer<C, D>,
        opts?: Partial<WithErrorHandlerOpts>
    ): ISubscription<B, D>;
    transform<C, D, E>(
        a: Transducer<B, C>,
        b: Transducer<C, D>,
        c: Transducer<D, E>,
        opts?: Partial<WithErrorHandlerOpts>
    ): ISubscription<B, E>;
    transform<C, D, E, F>(
        a: Transducer<B, C>,
        b: Transducer<C, D>,
        c: Transducer<D, E>,
        d: Transducer<E, F>,
        opts?: Partial<WithErrorHandlerOpts>
    ): ISubscription<B, F>;
    transform<C>(
        opts: WithTransform<B, C> & Partial<WithErrorHandlerOpts>
    ): ISubscription<B, C>;
    transform(...args: any[]) {
        let sub: Maybe<Partial<ISubscriber<B>>>;
        let opts: Maybe<Partial<SubscriptionOpts<any, any>>>;
        if (isPlainObject(peek(args))) {
            opts = args.pop();
            sub = { error: (<WithErrorHandlerOpts>opts).error };
        }
        return this.subscribe(
            <any>sub,
            __optsWithID(
                "xform",
                args.length > 0
                    ? {
                            ...opts!,
                            // @ts-ignore
                            xform: comp(...args),
                      }
                    : opts
            )
        );
    }

    /**
     * Syntax sugar for {@link Subscription.transform} when using a single
     * [`map()`](https://docs.thi.ng/umbrella/transducers/functions/map.html)
     * transducer only. The given function `fn` is used as `map`'s
     * transformation fn.
     *
     * @param fn -
     * @param opts -
     */
    map<C>(
        fn: Fn<B, C>,
        opts?: Partial<WithErrorHandlerOpts>
    ): ISubscription<B, C> {
        return this.transform(map(fn), opts || {});
    }

    unsubscribe(sub?: ISubscription<B, any>) {
        return sub ? this.unsubscribeChild(sub) : this.unsubscribeSelf();
    }

    protected unsubscribeSelf() {
        LOGGER.debug(this.id, "unsub self");
        this.parent && this.parent.unsubscribe(this);
        this.state < State.UNSUBSCRIBED && (this.state = State.UNSUBSCRIBED);
        this.release();
        return true;
    }

    protected unsubscribeChild(sub: ISubscription<B, any>) {
        LOGGER.debug(this.id, "unsub child", sub.id);
        const idx = this.subs.indexOf(sub);
        if (idx >= 0) {
            this.subs.splice(idx, 1);
            if (
                this.closeOut === CloseMode.FIRST ||
                (!this.subs.length && this.closeOut !== CloseMode.NEVER)
            ) {
                this.unsubscribe();
            }
            return true;
        }
        return false;
    }

    next(x: A) {
        if (this.state >= State.DONE) return;
        this.xform ? this.dispatchXform(x) : this.dispatch(<any>x);
    }

    done() {
        LOGGER.debug(this.id, "entering done()");
        if (this.state >= State.DONE) return;
        if (this.xform) {
            if (!this.dispatchXformDone()) return;
        }
        this.state = State.DONE;
        // attempt to call .done in wrapped sub
        if (this.dispatchTo("done")) {
            // disconnect from parent & internal cleanup
            this.state < State.UNSUBSCRIBED && this.unsubscribe();
        }
        LOGGER.debug(this.id, "exiting done()");
    }

    error(e: any) {
        // only the wrapped sub's error handler gets a chance
        // to deal with the error
        const sub = this.wrapped;
        const hasErrorHandler = sub && sub.error;
        hasErrorHandler &&
            LOGGER.debug(this.id, "attempting wrapped error handler");
        // flag success if error handler returns true
        // (i.e. it could handle/recover from the error)
        // else detach this entire sub by going into error state...
        return (hasErrorHandler && sub!.error!(e)) || this.unhandledError(e);
    }

    protected unhandledError(e: any) {
        // ensure error is at least logged to console
        // even if default NULL_LOGGER is used...
        (LOGGER.parent !== NULL_LOGGER ? LOGGER : console).warn(
            this.id,
            "unhandled error:",
            e
        );
        this.unsubscribe();
        this.state = State.ERROR;
        return false;
    }

    protected dispatchTo(type: "next" | "done" | "error", x?: B) {
        let s: Maybe<Partial<ISubscriber<B>>> = this.wrapped;
        if (s) {
            try {
                s[type] && s[type]!(x!);
            } catch (e) {
                // give wrapped sub a chance to handle error
                // (if that failed then we're already in error state now & terminate)
                if (!this.error(e)) return false;
            }
        }
        // process other child subs
        const subs = type === "next" ? this.subs : [...this.subs];
        for (let i = subs.length; i-- > 0; ) {
            s = subs[i];
            try {
                s[type] && s[type]!(x!);
            } catch (e) {
                if (type === "error" || !s.error || !s.error(e)) {
                    // if no or failed handler, go into error state
                    return this.unhandledError(e);
                }
            }
        }
        return true;
    }

    protected dispatch(x: B) {
        LOGGER.debug(this.id, "dispatch", x);
        this.cacheLast && (this.last = x);
        this.dispatchTo("next", x);
    }

    protected dispatchXform(x: A) {
        let acc: B[] | Reduced<B[]>;
        try {
            acc = this.xform![2]([], x);
        } catch (e) {
            // error in transducer can only be handled by the wrapped
            // subscriber's error handler (if avail)
            this.error(e);
            // don't dispatch value(s)
            return;
        }
        if (this.dispatchXformVals(acc)) {
            isReduced(acc) && this.done();
        }
    }

    protected dispatchXformDone() {
        let acc: B[] | Reduced<B[]>;
        try {
            // collect remaining values from transducer
            acc = this.xform![1]([]);
        } catch (e) {
            // error in transducer can only be handled by the wrapped
            // subscriber's error handler (if avail)
            return this.error(e);
        }
        return this.dispatchXformVals(acc);
    }

    protected dispatchXformVals(acc: B[] | Reduced<B[]>) {
        const uacc = unreduced(acc);
        for (
            let i = 0, n = uacc.length;
            i < n && this.state < State.DONE;
            i++
        ) {
            this.dispatch(uacc[i]);
        }
        return this.state < State.ERROR;
    }

    protected ensureState() {
        if (this.state >= State.DONE) {
            illegalState(`operation not allowed in state ${this.state}`);
        }
    }

    protected release() {
        this.subs.length = 0;
        delete this.parent;
        delete this.xform;
        delete this.last;
    }
}