thi-ng/umbrella

View on GitHub
packages/rstream/src/object.ts

Summary

Maintainability
A
1 hr
Test Coverage
import type { Keys, Predicate2 } from "@thi.ng/api";
import { dedupe } from "@thi.ng/transducers/dedupe";
import type { CommonOpts, ISubscription, SubscriptionOpts } from "./api.js";
import { __nextID } from "./idgen.js";
import { subscription } from "./subscription.js";

export type KeyStreams<T, K extends Keys<T>> = {
    [id in K]-?: ISubscription<T[id], T[id]>;
};

/**
 * Result object type for {@link fromObject}.
 */
export interface StreamObj<T, K extends Keys<T>> {
    /**
     * Object of managed & typed streams for registered keys.
     */
    streams: KeyStreams<T, K>;
    /**
     * Feeds new values from `x` to each registered key's stream.
     * Satifies {@link ISubscriber.next} interface.
     *
     * @param x -
     */
    next(x: T): void;
    /**
     * Calls {@link ISubscriber.done} for all streams created. Satifies
     * {@link ISubscriber.done} interface.
     */
    done(): void;
}

export interface StreamObjOpts<T, K extends Keys<T>> extends CommonOpts {
    /**
     * Array of selected `keys` (else selects all by default) for which
     * to create streams.
     */
    keys: K[];
    /**
     * If true (default), all created streams will be seeded with key
     * values from the source object.
     *
     * @defaultValue true
     */
    initial: boolean;
    /**
     * Default values to use for `undefined` values of registered keys.
     */
    defaults: Partial<T>;
    /**
     * If true, attaches
     * [`dedupe()`](https://docs.thi.ng/umbrella/transducers/functions/dedupe.html)
     * transducer to each key's value stream to avoid obsolete downstream
     * propagation when a key's value hasn't actually changed.
     *
     * @defaultValue true
     */
    dedupe: boolean;
    /**
     * Generic equality predicate to be used for `dedupe` (`===` by
     * default). Ignored if `dedupe` option is false.
     */
    equiv: Predicate2<any>;
}

/**
 * Takes an arbitrary object `src` and object of options (see
 * {@link StreamObjOpts}). Creates a new object and for each selected
 * key creates a new stream, optionally seeded with the key's value in
 * `src`. Returns new object of streams.
 *
 * @remarks
 * The structure of the returned object is
 * {@link StreamObj | as follows}:
 *
 * ```text
 * {
 *   streams: { ... },
 *   next(x): void;
 *   done(): void;
 * }
 * ```
 *
 * All streams will be stored under `streams`. The `next()` and `done()`
 * functions/methods allow the object itself to be used as subscriber
 * for an upstream subscribable (see 2nd example below):
 *
 * - `next()` - takes a object of same type as `src` and feeds each
 *   key's new value into its respective stream. If the `defaults`
 *   option is given, `undefined` key values are replaced with their
 *   specified default. If `dedupe` is enabled (default) only changed
 *   values (as per `equiv` predicate option) will be propagated
 *   downstream.
 * - `done()` - calls {@link ISubscriber.done} on all streams
 *
 * The optional `opts` arg is used to customize overall behavior of
 * `fromObject` and specify shared options for *all* created streams.
 *
 * @example
 * ```ts tangle:../export/from-object.ts
 * import { fromObject, trace } from "@thi.ng/rstream";
 *
 * type Foo = { a?: number; b: string; };
 *
 * const obj = fromObject(<Foo>{ a: 1, b: "foo" });
 *
 * obj.streams.a.subscribe(trace("a"));
 * // a 1
 *
 * obj.streams.b.subscribe(trace("b"));
 * // b foo
 *
 * obj.next({ b: "bar" });
 * // a undefined
 * // b bar
 * ```
 *
 * @example
 * ```ts tangle:../export/from-object-2.ts
 * import { fromObject, subscription, trace } from "@thi.ng/rstream";
 *
 * type Foo = { a?: number; b: string; };
 *
 * const obj = fromObject(<Foo>{}, { keys: ["a", "b"], initial: false });
 * obj.streams.a.subscribe(trace("a"));
 * obj.streams.b.subscribe(trace("b"));
 *
 * const src = subscription<Foo, Foo>();
 * // use as subscriber
 * src.subscribe(obj);
 *
 * src.next({ a: 1, b: "foo" });
 * // a 1
 * // b foo
 * ```
 *
 * @param src -
 * @param opts -
 */
export const fromObject = <T extends object, K extends Keys<T>>(
    src: T,
    opts: Partial<StreamObjOpts<T, K>> = {}
) => {
    const id = opts.id || `obj${__nextID()}`;
    const keys = opts.keys || <K[]>Object.keys(src);
    const _opts: Partial<SubscriptionOpts<any, any>> =
        opts.dedupe !== false
            ? {
                    xform: dedupe<any>(opts.equiv || ((a, b) => a === b)),
                    ...opts,
              }
            : opts;
    const streams: any = {};
    for (let k of keys) {
        streams[k] = subscription(undefined, {
            ..._opts,
            id: `${id}-${String(k)}`,
        });
    }
    const res = <StreamObj<T, K>>{
        streams,
        next(state) {
            for (let k of keys) {
                const val = state[k];
                streams[k].next(
                    opts.defaults && val === undefined ? opts.defaults[k] : val
                );
            }
        },
        done() {
            for (let k of keys) {
                streams[k].done();
            }
        },
    };
    opts.initial !== false && res.next(src);
    return res;
};