deanius/polyrhythm

View on GitHub
src/EffectManager.ts

Summary

Maintainability
A
2 hrs
Test Coverage
import { Observable, ObservableInput, Subscription, Observer } from 'rxjs';
import { switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs/operators';

/**
 * A function returning an Observable representing an effect.
 * Also may return Promises, generators, per RxJS' ObservableInput type.
 */
interface EffectFactory<T, U> {
  (item: T): ObservableInput<U>;
}

/**
 * Defines an extended Observer whose next methods themselves return Observables.
 */
interface EffectFactoryObserver<T, U> {
  next: EffectFactory<T, U>;
  complete?: Observer<T>['complete'];
  error?: Observer<T>['error'];
}

/** Defines an Effect execution container, applies a given concurrency strategy
 * to the Observables yielded by a given EffectFactory Function.
 * @example: const serialAjaxSub = new QueuingEffectManager(from([1,2,3])).subscribe({ next(v){ return ajax(url, {v}) }});

 */
interface EffectManager<T> {
  subscribe<U>(effectObserver: EffectFactoryObserver<T, U>): Subscription;
}

class EffectManagerBase<T> implements EffectManager<T> {
  constructor(
    protected stream: Observable<T>,
    protected combiner: typeof switchMap
  ) {}

  subscribe<U>(effectObserver: EffectFactoryObserver<T, U>) {
    //prettier-ignore
    return this.stream
          .pipe(this.combiner(effectObserver.next))
          .subscribe({
            complete: effectObserver.complete,
            error: effectObserver.error || (() => null),
          });
  }
}

/** An EffectManager that replaces any existing effect execution by
 * canceling it, and starting a new one. switchMap.
 * */
export class ReplacingEffectManager<T> extends EffectManagerBase<T> {
  constructor(stream: Observable<T>) {
    super(stream, switchMap);
  }
}

/** An EffectManager that begins effects ASAP with unbounded concurrency. mergeMap.
 * */
export class ASAPEffectManager<T> extends EffectManagerBase<T> {
  constructor(stream: Observable<T>) {
    super(stream, mergeMap);
  }
}

/** An EffectManager that queues effects without bound. concatMap.
 * */
export class QueuingEffectManager<T> extends EffectManagerBase<T> {
  constructor(stream: Observable<T>) {
    super(stream, concatMap);
  }
}

/** An EffectManager that does not begin effects if one was executing. exhaustMap.
 * */
export class ThrottlingEffectManager<T> extends EffectManagerBase<T> {
  constructor(stream: Observable<T>) {
    super(stream, exhaustMap);
  }
}