redux-observable/redux-observable

View on GitHub
src/createEpicMiddleware.ts

Summary

Maintainability
B
5 hrs
Test Coverage
import { Action, Middleware, MiddlewareAPI, Dispatch } from 'redux';
import { Subject, from, queueScheduler } from 'rxjs';
import { map, mergeMap, observeOn, subscribeOn } from 'rxjs/operators';
import { StateObservable } from './StateObservable';
import { Epic } from './epic';
import { warn } from './utils/console';

interface Options<D = any> {
  dependencies?: D;
}

export interface EpicMiddleware<
  T extends Action,
  O extends T = T,
  S = void,
  D = any
> extends Middleware<{}, S, Dispatch<any>> {
  run(rootEpic: Epic<T, O, S, D>): void;
}

export function createEpicMiddleware<
  T extends Action,
  O extends T = T,
  S = void,
  D = any
>(options: Options<D> = {}): EpicMiddleware<T, O, S, D> {
  // This isn't great. RxJS doesn't publicly export the constructor for
  // QueueScheduler nor QueueAction, so we reach in. We need to do this because
  // we don't want our internal queuing mechanism to be on the same queue as any
  // other RxJS code outside of redux-observable internals.
  const QueueScheduler: any = queueScheduler.constructor;
  const uniqueQueueScheduler: typeof queueScheduler = new QueueScheduler(
    (queueScheduler as any).SchedulerAction
  );

  if (process.env.NODE_ENV !== 'production' && typeof options === 'function') {
    throw new TypeError(
      'Providing your root Epic to `createEpicMiddleware(rootEpic)` is no longer supported, instead use `epicMiddleware.run(rootEpic)`\n\nLearn more: https://redux-observable.js.org/MIGRATION.html#setting-up-the-middleware'
    );
  }

  const epic$ = new Subject<Epic<T, O, S, D>>();
  let store: MiddlewareAPI<Dispatch<any>, S>;

  const epicMiddleware: EpicMiddleware<T, O, S, D> = _store => {
    if (process.env.NODE_ENV !== 'production' && store) {
      // https://github.com/redux-observable/redux-observable/issues/389
      warn(
        'this middleware is already associated with a store. createEpicMiddleware should be called for every store.\n\nLearn more: https://goo.gl/2GQ7Da'
      );
    }
    store = _store;
    const actionSubject$ = new Subject<T>();
    const stateSubject$ = new Subject<S>();
    const action$ = actionSubject$
      .asObservable()
      .pipe(observeOn(uniqueQueueScheduler));
    const state$ = new StateObservable(
      stateSubject$.pipe(observeOn(uniqueQueueScheduler)),
      store.getState()
    );

    const result$ = epic$.pipe(
      map(epic => {
        const output$ = epic(action$, state$, options.dependencies!);

        if (!output$) {
          throw new TypeError(
            `Your root Epic "${epic.name ||
              '<anonymous>'}" does not return a stream. Double check you\'re not missing a return statement!`
          );
        }

        return output$;
      }),
      mergeMap(output$ =>
        from(output$).pipe(
          subscribeOn(uniqueQueueScheduler),
          observeOn(uniqueQueueScheduler)
        )
      )
    );

    result$.subscribe(store.dispatch);

    return next => {
      return action => {
        // Downstream middleware gets the action first,
        // which includes their reducers, so state is
        // updated before epics receive the action
        const result = next(action);

        // It's important to update the state$ before we emit
        // the action because otherwise it would be stale
        stateSubject$.next(store.getState());
        actionSubject$.next(action);

        return result;
      };
    };
  };

  epicMiddleware.run = rootEpic => {
    if (process.env.NODE_ENV !== 'production' && !store) {
      warn(
        'epicMiddleware.run(rootEpic) called before the middleware has been setup by redux. Provide the epicMiddleware instance to createStore() first.'
      );
    }
    epic$.next(rootEpic);
  };

  return epicMiddleware;
}