deanius/polyrhythm

View on GitHub
src/utils.ts

Summary

Maintainability
A
3 hrs
Test Coverage
import {
  of,
  from,
  defer,
  NEVER,
  timer,
  Observable,
  throwError,
  empty,
  concat,
} from 'rxjs';
import {
  map,
  mergeMap,
  concatMap,
  exhaustMap,
  switchMap,
} from 'rxjs/operators';
import { AwaitableObservable, ConcurrencyMode, Listener, Thunk } from './types';
import { toggleMap } from './toggleMap';
export { toggleMap } from './toggleMap';
export { concat } from 'rxjs';
export { map, tap, scan } from 'rxjs/operators';

/**
 * Returns a random hex string, like a Git SHA. Not guaranteed to
 * be unique - just to within about 1 in 10,000.
 */
export const randomId = (length: number = 7) => {
  return Math.floor(Math.pow(2, length * 4) * Math.random())
    .toString(16)
    .padStart(length, '0');
};

/**
 * Returns an Observable of the value, or result of the function call, after
 * the number of milliseconds given. After is lazy and cancelable! So nothing happens until .subscribe
 * is called explicitly (via subscribe) or implicitly (toPromise(), await).
 * For a delay of 0, the function is executed synchronously when .subscribe is called.
 * @returns An Observable of the object or thunk return value. It is 'thenable', so may also be awaited directly.
 * ```
 * // Examples:
 * // awaited Promise
 * await after(100, () => new Date())
 * // unawaited Promise
 * after(100, () => new Date()).toPromise()
 * // unresolving Promise
 * after(Infinity, () => new Date()).toPromise()
 * ```
 */
export function after<T>(
  ms: number,
  objOrFn?: T | Thunk<T> | Observable<T>
): AwaitableObservable<T> {
  const delay = ms <= 0 ? of(0) : ms === Infinity ? NEVER : timer(ms);

  const resultMapper =
    typeof objOrFn === 'function'
      ? (objOrFn as (value: Number) => any)
      : () => objOrFn;

  // prettier-ignore
  const resultObs: Observable<T> = delay.pipe(
    isObservable<T>(objOrFn)
      ? mergeMap(() => objOrFn)
      : map(resultMapper)
  );

  // after is a 'thenable, thus usable with await.
  // ref: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/await
  // @ts-ignore
  resultObs.then = function(resolve, reject) {
    return resultObs.toPromise().then(resolve, reject);
  };

  return resultObs as AwaitableObservable<T>;
}

function isObservable<T>(obj: any): obj is Observable<T> {
  return obj?.subscribe !== undefined;
}

/** Executes the given function on the microtask queue.
 * The microtask queue flushes before the macrotask queue.
 * @returns A Promise for its return value
 * @see https://developer.mozilla.org/en-US/docs/Web/API/HTML_DOM_API/Microtask_guide
 * @see queueMicrotask, Promise.resolve
 */
export function microq(fn: Function) {
  // @ts-ignore
  return Promise.resolve().then(fn);
}

/** Executes the given function on the macrotask queue.
 * The macrotask queue flushes after the microstask queue.
 * @returns A Promise for its return value
 * @see https://developer.mozilla.org/en-US/docs/Web/API/HTML_DOM_API/Microtask_guide
 * @see setTimeout
 */
export function macroq(fn: Function) {
  return new Promise(resolve => {
    return setTimeout(() => resolve(fn()), 0);
  });
}

const getTimestamp = () => new Date().getTime();

/** Returns a Promise for the point in time at which all existing queued microtasks (e.g. Promise.resolve()) have completed. */
export function microflush(): Promise<number> {
  return Promise.resolve().then(() => getTimestamp());
}

/** Returns a Promise for the point in time at which all existing queued macrotasks (e.g. setTimeout 0) have completed. */
export function macroflush(): Promise<number> {
  return new Promise(resolve => {
    return setTimeout(() => resolve(getTimestamp()), 0);
  });
}

/** Creates a derived Observable, running the listener in the given ConcurrencyMode
 * turning sync errors into Observable error notifications
 */
export function combineWithConcurrency<T, U>(
  o: Observable<T>,
  listener: Listener<T, U>,
  mode: ConcurrencyMode,
  individualPipes = [],
  individualEnder = empty(),
  individualStarter = () => empty()
) {
  const combine = operatorForMode(mode);
  const mappedEvents = (e: T): Observable<U> => {
    try {
      const _results = listener(e);
      // @ts-ignore
      return concat(
        // @ts-ignore
        individualStarter(e),
        // @ts-ignore
        toObservable<U>(_results).pipe(...individualPipes),
        individualEnder
      );
    } catch (ex) {
      return throwError(ex);
    }
  };
  const combined = o.pipe(
    // @ts-ignore
    combine(mappedEvents)
  );
  return combined;
}

/** Controls what types can be returned from an `on` handler:
    Primitive types: `of()`
    Promises: `from()`
    Observables: pass-through
*/
function toObservable<T>(_results: any): Observable<T> {
  if (typeof _results === 'undefined') return empty();

  if (typeof _results === 'function') {
    return _results.length === 1 ? new Observable(_results) : defer(_results);
  }

  // An Observable is preferred
  if (_results.subscribe) return _results;

  // A Subscrition is ok - can be canceled but not awaited
  if (_results.unsubscribe)
    return new Observable(() => {
      // an Observable's return value is its cleanup function
      return () => _results.unsubscribe();
    });

  // A Promise  is acceptable
  if (_results.then) return from(_results as Promise<T>);

  // All but string iterables will be expanded (generators, arrays)
  if (
    typeof _results[Symbol.iterator] === 'function' &&
    typeof _results !== 'string'
  )
    return from(_results as Generator<T>);

  // otherwise we convert it to a single-item Observable
  return of(_results);
}

export function operatorForMode(
  mode: ConcurrencyMode = ConcurrencyMode.parallel
) {
  switch (mode) {
    case ConcurrencyMode.ignore:
      return exhaustMap;
    case ConcurrencyMode.parallel:
      return mergeMap;
    case ConcurrencyMode.serial:
      return concatMap;
    case ConcurrencyMode.toggle:
      return toggleMap;
    case ConcurrencyMode.replace:
    default:
      return switchMap;
  }
}