AkashBabu/supervised-emitter

View on GitHub
src/supervisedEmitter.ts

Summary

Maintainability
A
2 hrs
Test Coverage
import LFUCache from '@akashbabu/lfu-cache';
import { DLL, DLLItem } from '@akashbabu/node-dll';
import Logger from './lib/logger';
import { doesPatternMatch, isPatternEvent, sanitizeEvent } from './pattern';
import TaskQueue, { ITaskQueue } from './lib/taskQueue';
import { pipe } from './lib/pipe';
import getMapKeys from './lib/getMapKeys';
import patternHandler from './patternHandler';
import { InternalEvents, InternalEventsRev, IInternalEvents } from './internalEvents';

import {
  ISupervisedEmitter, IState, IGetScope, IMiddleware,
  IHandler, IOptions, ISubscription, ISubPipeline, IOptionsInt,
} from './interfaces';
import mergeOptions from './lib/mergeOptions';

/**
 * SupervisedEmitter is an event emitter library
 * which supports middlewares, event-tracing, glob subscriptions etc
 *
 * It's main applications can be found in
 * State management, sagas, communication between
 * component irrespective of whereever it is in the DOM tree
 */
export default class SupervisedEmitter implements ISupervisedEmitter {
  public static patternHandler = patternHandler;
  public static InternalEvents: IInternalEvents = InternalEvents;

  private state: IState;
  private logger = new Logger(Logger.LEVEL.DEBUG, false);

  // ThreadRunner to run publish pipelines from task queue
  private taskQueue: ITaskQueue<Promise<any>>;

  // Pre-compiling regex for efficiency
  private scopeReg = new RegExp('^__scope_[0-9]+_/(.+)$');

  // Options for internal usage after merging the default options
  private options: IOptionsInt;

  // Middlewares
  private middlewares: IMiddleware;

  /**
   * Creates a new instance of SupervisedEmitter
   *
   * **Example**
   *
   * Initializing with no middlewares:
   * ```JS
   * import SupervisedEmitter from 'supervised-emitter';
   *
   * const SE = new SupervisedEmitter();
   * ```
   *
   * Initializing with middlewares and options:
   * ```JS
   * const SE = new SupervisedEmitter(
   *   [eventTraceMiddleware],
   *   {debug: true, lfu: {max: 50}}
   * );
   * ```
   *
   * @param middlewares List of middlewares. Remember that all these
   *     middlewares will be piped to form a pipeline. i.e. the output of
   *     each of the middleware is passed in `data`(in the context) to the next
   *     middleware in the pipeline (top-down execution)
   * @param options Options for debugging and LFU
   */
  constructor(
    middlewares: IMiddleware[] = [({ data }) => data],
    options: IOptions = {},
  ) {
    this.middlewares = pipe(...middlewares);

    this.options = mergeOptions(options, {
      debug: false,
      lfu: {
        max: 100,
        evictCount: 10,
        maxAge: 10 * 60 * 1000,
      },
      publishConcurrency: 100,
      lifeCycleEvents: false,
    });

    this.logger[this.options.debug ? 'enable' : 'disable']();

    // initialize state
    this.state = this.getFreshState();

    this.taskQueue = new TaskQueue<Promise<any>>(
      this.publisher.bind(this),
      { maxRunners: this.options.publishConcurrency },
    );

    this.logger.debug('INITIALIZED');

    this.publishInternalEvents(InternalEvents.ON_INIT, {});
  }

  /**
   * Subscribes to a given event and pipes all the
   * handlers passed for this event.
   *
   * Please note that each handler must pass on the
   * data that must be handled by the next handler, as
   * all these handlers will be piped (compose in reverse direction).
   *
   * Chaining subscriptions is also possible. Please see the
   * example below for more details.
   *
   * For more info on `pipe` visit:
   * https://medium.com/free-code-camp/pipe-and-compose-in-javascript-5b04004ac937
   *
   * **Example**
   *
   * ```JS
   * const subscription = SE.subscribe('foo/bar',
   *   ({data}) => {
   *     console.log(data); //=> 1
   *     return data + 1,
   *   },
   *   ({data}) => {
   *     console.log(data); //=> 2
   *   }
   * ).subscribe('foo/*',
   *   ({data}) => console.log(data) //=> 1
   * );
   *
   * await SE.publish('/foo/bar', 1);
   *
   * subscription.unsubscribe();
   * ```
   *
   * @param event Subscription event
   * @param handlers List of handlers
   *
   * @returns Subscription for chaining more subscriptions or
   *    for unsubscribing from all the subscriptions
   */
  public subscribe(event: string, ...handlers: IHandler[]): ISubscription {
    event = sanitizeEvent(event) as string;

    // Check if this is a new or existing event
    if (!this.state.subscribers.get(event)) {
      // Cache has to be updated if the
      // new pattern event matches with the
      // publish event in cache
      if (isPatternEvent(event)) {
        this.addPatternEventToCache(event);
        this.state.patternEvents.push(event);
      } else {
        this.addNormalEventToCache(event);
      }

      this.state.subscribers.set(event, new DLL<ISubPipeline>());
    }

    // We're using dll for maintaining a list
    // of handlers for the ease of splicing
    // handlers during unsubscription
    const eventHandler = (this.state.subscribers.get(event) as DLL<ISubPipeline>).push({
      // Compose all the subscribers passed at once.
      // Users can use this feature if needed, else
      // can choose to go with the classical approach
      // of subscribing as many times as the handlers.
      pipeline: pipe(...handlers),
    });

    this.logger.debug(`SUBSCRIBED => ${event}`);

    // This checks avoid infinite publishing of internal eventss
    if (!this.isInternalEvent(event)) {
      this.publishInternalEvents(InternalEvents.ON_SUBSCRIBE, { subEvent: event });
    }

    const self = this;
    return {
      /**
       * Unsubscribes from this subscription
       */
      unsubscribe() {
        self.unsubscribe(event, eventHandler);
      },

      /**
       * This method allows chaining subscription to
       * multiple events via the same subscription
       */
      subscribe(cEvent: string, ...cHandlers: IHandler[]): ISubscription {
        const subscription = self.subscribe(cEvent, ...cHandlers);
        const prevSubscription = this;

        return {
          ...subscription,
          unsubscribe() {
            prevSubscription.unsubscribe();
            subscription.unsubscribe();
          },
        };
      },
    };
  }

  /**
   * Similar to [[subscribe]], but it listens only to
   * the first event and unsubscribes itself thereafter.
   *
   * **Example**
   *
   * ```JS
   * let calls = 0;
   * const subscription = SE.subscribeOnce('foo/bar', () => calls++)
   *
   * await SE.publish('/foo/bar', 'test');
   * await SE.publish('/foo/bar', 'test');
   *
   * console.log(calls) //=> 1
   *
   * subscription.unsubscribe();
   * ```
   *
   * @param event Subscription event
   * @param handlers List of handlers
   *
   * @returns Subscription for chaining more subscriptions or
   *    for unsubscribing from all the subscriptions
   */
  public subscribeOnce(event: string, ...handlers: IHandler[]): Promise<any> {
    return new Promise((resolve, reject) => {
      const unsubscriber: IHandler = ({ pipelinePromise, data }) => {
        pipelinePromise?.then((sData) => {
          subscription.unsubscribe();
          resolve(sData);
        }).catch((err) => {
          subscription.unsubscribe();
          reject(err);
        });

        return data;
      };

      const subscription = this.subscribe(event, unsubscriber, ...handlers);

      this.logger.debug(`SUBSCRIBED_ONCE => ${event}`);
    });
  }

  /**
   * Waits untill the required event is received.
   * This is especially useful when writing flows of
   * execution.
   *
   * **Example**
   * In a request life-cycle
   * if some action needs to be take post a response
   * has been received, then it can be written as follow
   * ```JS
   * SE.publish('req/profiles/load')
   * await SE.waitTill('req/profiles/success')
   *
   * SE.publish('req/profiles/sort')
   * ```
   *
   * @param event Subscription event
   *
   * @returns a Promise that resolves to data
   */
  public waitTill(event: string): Promise<any> {
    return this.subscribeOnce(event);
  }

  /**
   * Publishes the given event to all the matching
   * subscribers.
   *
   * NOTE: This is an asynchronous call, so if you want to
   * publish events one after the other, then you will have
   * to `await` on each publish call.
   * Please see the example below for more details.
   *
   * **Example**
   *
   * Simple publish (fire and forget):
   * ```JS
   * SE.publish('foo/bar', 1);
   *
   * SE.publish('foo/bar', 'hello world');
   * ```
   *
   * Publish one after the other (execute all the subscription pipelines before moving to next publish):
   * ```JS
   * await SE.publish('publish/first', 'first');
   *
   * // This will be published only after all the
   * // matching subscription pipelines of the above
   * // publish events have been completed
   * await SE.publish('publish/second', 'second');
   * ```
   *
   * @param pubEvent Event to publish the given data
   * @param data Any data that need to be published
   *
   * @returns Awaitable publish
   */
  public publish(pubEvent: string, data: any = null): Promise<any> {
    return this.taskQueue.add(pubEvent, data);
  }

  /**
   * Adds scope to a event by prefixing
   * it with a incrementing counter string(__scope_<counter>_/),
   * such that everytime this is called the
   * subscribers can listen only on scoped events.
   * This is especially useful when you don't want
   * other subscribers to listen to this event.
   * Then this behaves more like a camouflage event,
   * which is visible only to scoped subscribers.
   *
   * This is especially useful when multiple
   * instances of the same class is listening and
   * is interested only in events of its own instance.
   *
   * **Example**
   *
   * In React, if you're using the same component in
   * multiple places but your actions(Show popup, make a request etc)
   * are different in each place, then you may achieve it like this:
   * ```JSX
   * /// container.jsx
   * const [{scope}] = useState({scope: SE.getScope()});
   *
   * SE.subscribe(scope('btn/click'), ({data}) => {
   *   // ...
   * });
   *
   * <ChildComponent scope={scope} />
   *
   *
   * /// In ChildComponent.jsx
   * SE.publish(this.props.scope('btn/click'),  data)
   * ```
   *
   * @returns Function(Closure) that can add scope to events
   */
  public getScope(): IGetScope {
    const rand = this.state.scopeId++;
    return (event: string) => `__scope_${rand}_/${event}`;
  }

  /**
   * Strips out the scope part in the given
   * scoped event.
   *
   * i.e, it converts __scope_<number>_/foo/bar => foo/bar
   *
   * This method can be used in your middlewares
   * to unshell the scope part in the topic and run
   * your logics.
   *
   * @param event Scoped event
   *
   * @returns Event without scope part
   */
  public unScope(event: string): string {
    const match = event.match(this.scopeReg);

    return match ? match[1] : event;
  }

  /**
   * Returns a fresh / new checkout of
   * state
   */
  private getFreshState(): IState {
    return {
      subscribers: new Map(), // map of event vs pipelines (DLL)
      patternEvents: [], // list of pattern events
      subEventsCache: new LFUCache(this.options.lfu), // cache for maintaining pubEvents vs matching subEvents
      scopeId: 0, // use to generate a new scope part
    };
  }

  /**
   * Goes through every publish event in cache
   * and checks if it matches the given patterns
   * and if match found, then add this pattern
   * to cached events
   *
   * @param patternEvent Pattern event
   */
  private addPatternEventToCache(patternEvent: string) {
    this.state.subEventsCache.forEach(([pubEvent, cachedEvents]) => {
      if (doesPatternMatch(pubEvent, patternEvent)) { cachedEvents.set(patternEvent, true); }
    });
  }

  /**
   * Checks if the matching pubEvent is present
   * in the cache, if so then it adds this event
   *
   * @param event Normal event (w/o pattern)
   */
  private addNormalEventToCache(event: string) {
    const matchingEvents = this.state.subEventsCache.peek(event);

    if (matchingEvents instanceof Map) {
      matchingEvents.set(event, true);
    }
  }

  /**
   * Unsubscribes the event pipeline from the event
   *
   * @param subEvent Subscribed Event
   * @param eventHandler Handler pipeline
   */
  private unsubscribe(subEvent: string, eventHandler: DLLItem<ISubPipeline>) {
    // This checks avoid infinite publishing of internal events
    if (!this.isInternalEvent(subEvent)) {
      this.publishInternalEvents(InternalEvents.ON_UNSUBSCRIBE, { subEvent });
    }

    const subscribers = this.state.subscribers.get(subEvent);
    // remove the handler from DLL
    if (subscribers) {
      subscribers.remove(eventHandler);

      // If there are no event pipelines
      // for this event, then remove the event from
      // subscribers list (for space optimization).
      if (subscribers.length === 0) {
        this.state.subscribers.delete(subEvent);
      }
    }

    this.logger.debug(`UNSUBSCRIBED => ${subEvent}`);
  }

  /**
   * Returns all the matching subscribed events
   * given publish event (including glob pattern)
   *
   * This is the place where you can optimize the
   * matching logic by caching the matched events.
   *
   * @param pubEvent Publish event
   *
   * @returns map of matching patterns vs this.state
   */
  private getSubEvents(pubEvent: string): Map<string, boolean> {
    let subEvents = this.state.subEventsCache.get(pubEvent);

    if (!subEvents) {
      const matchingEvents: Map<string, boolean> = new Map();

      // if normal event subscribers are present
      // then add it as well
      if (this.state.subscribers.get(pubEvent)) {
        matchingEvents.set(pubEvent, true);
      }

      // Check if any pattern matches pubEvent
      this.state.patternEvents.forEach((pattern) => {
        if (doesPatternMatch(pubEvent, pattern)) { matchingEvents.set(pattern, true); }
      });

      this.state.subEventsCache.set(pubEvent, matchingEvents);

      subEvents = matchingEvents;
    }

    return subEvents;
  }

  /**
   * Publishes the given data to all the subscribers
   * that match the event.
   *
   * Note: We pass only the second argument (one param)
   * to the subscribers. This is intentionally done
   * because when the eventHandlers are piped (i.e. the output
   * of one is used by the next), incase of which we'll not be
   * able to maintain a standard function signature,
   * if we allow more than one param to be passed to subscribers.
   *
   * @param pubEvent Event to publish the given data
   * @param data Any data that need to be published
   *
   * @returns Promise that resolves after publish pipeline completion
   */
  private async publisher(pubEvent: string, data: any): Promise<any> {
    pubEvent = sanitizeEvent(pubEvent) as string;

    const subEvents = this.getSubEvents(pubEvent);
    const subEventsArr = getMapKeys(subEvents);

    const ctx = {
      data,
      pubEvent,
      subEvents: subEventsArr,
    };

    const result = await this.middlewares(ctx);

    // If the flow was stopped in the middlewares
    // then end it w/o passing the data to any of
    // the subscription pipelines
    if (result === undefined) {
      return;
    }

    ctx.data = result;

    return Promise.all(subEventsArr.map<Promise<any>>(async (subEvent: string) => {
      // Subscription pipelines
      const subPipelines = this.state.subscribers.get(subEvent);

      // lazy pruning of removed subscription pipelines
      if (!subPipelines) {
        subEvents.delete(subEvent);
        return null;
      }

      const pipelinePromises = subPipelines.map<Promise<any>>(async ({ pipeline }) => {
        try {
          // use new ctx for every pipeline because
          // one pipeline must never affect the other
          // except middleware pipeline, else it gets
          // difficult to debug
          const output = await pipeline({ ...ctx });
          return output;
        } catch (error) {
          this.logger.error('Error during publish:', error);
          this.publishInternalEvents(InternalEvents.ON_ERROR, { error });
        }
      });

      return Promise.all(pipelinePromises);
    }));
  }

  private publishInternalEvents(pubEvent: string, data: any): Promise<any> {
    if (this.isInternalEvent(pubEvent) && this.options.lifeCycleEvents) {
      return this.publish(pubEvent, data);
    }

    return Promise.resolve();
  }

  /**
   * Predicate for determining whether the given
   * event is internal event or not
   *
   * @param event event to be checked for internal event
   *
   * @returns true if the given event is internal event
   */
  private isInternalEvent(event: string): boolean {
    return event in InternalEventsRev;
  }
}