packages/event-store/src/Projector/Projector.ts

Summary

Maintainability
A
0 mins
Test Coverage
import type { EventRecord } from "../Event/mod.js";
import { Queue } from "../Queue.js";
import { FILTER_ALL, FILTER_CONTINUOUS, FILTER_ONCE } from "./Filters.js";
import { Projection, ProjectionEventHandler, ProjectionHandler, ProjectionState } from "./Projection.js";

export class Projector<Record extends EventRecord> {
  #listeners: Listeners<Record> = {};
  #queue: Queue<ProjectionMessage<Record>>;

  constructor() {
    this.project = this.project.bind(this);
    this.#queue = new Queue(async ({ event, state }) => {
      return Promise.all(Array.from(this.#listeners[event.type as string] || []).map((fn) => fn(event, state)));
    });
  }

  async project(event: Record, state: ProjectionState) {
    return new Promise<boolean>((resolve, reject) => {
      this.#queue.push({ event, state }, resolve, reject);
    });
  }

  /**
   * Create a single run projection handler.
   *
   * @remarks
   *
   * This method tells the projection that an event is only ever processed when
   * the event is originating directly from the local event store. A useful
   * pattern for when you want the event handler to submit data to a third
   * party service such as sending an email or submitting third party orders.
   *
   * We disallow `hydrate` and `outdated` as these events represents events
   * that has already been processed.
   */
  once<T extends Record["type"], R extends Record = Extract<Record, { type: T }>>(
    type: T,
    handler: ProjectionEventHandler<R>
  ): Projection<R> {
    return new Projection<any>(this, { type, handler, filter: FILTER_ONCE });
  }

  /**
   * Create a continuous projection handler.
   *
   * @remarks
   *
   * This method tells the projection to allow events directly from the event
   * store as well as events coming through hydration via sync, manual or
   * automatic stream rehydration operations. This is the default pattern
   * used for most events. This is where you usually project the latest data
   * to your read side models and data stores.
   *
   * We allow `hydrate` as they serve to keep the read side up to date with
   * the latest events. We disallow `outdated` as we do not want the latest
   * data to be overridden by outdated ones.
   *
   * NOTE! The nature of this pattern means that outdated events are never
   * run by this projection. Make sure to handle `outdated` events if you
   * have processing requirements that needs to know about every unknown
   * events that has occurred in the event stream.
   */
  on<T extends Record["type"], R extends Record = Extract<Record, { type: T }>>(
    type: T,
    handler: ProjectionEventHandler<R>
  ): Projection<R> {
    return new Projection<any>(this, { type, handler, filter: FILTER_CONTINUOUS });
  }

  /**
   * Create a catch all projection handler.
   *
   * @remarks
   *
   * This method is a catch all for events that does not fall under the
   * stricter definitions of once and on patterns. This is a good place
   * to deal with data that does not depend on a strict order of events.
   */
  all<T extends Record["type"], R extends Record = Extract<Record, { type: T }>>(
    type: T,
    handler: ProjectionEventHandler<R>
  ): Projection<R> {
    return new Projection<any>(this, { type, handler, filter: FILTER_ALL });
  }

  addEventListener(type: string, fn: ProjectionHandler) {
    const listeners = this.#listeners[type];
    if (listeners) {
      listeners.add(fn);
    } else {
      this.#listeners[type] = new Set([fn]);
    }
    return () => {
      this.removeEventListener(type, fn);
    };
  }

  removeEventListener(type: string, fn: ProjectionHandler) {
    this.#listeners[type]?.delete(fn);
  }
}

/*
 |--------------------------------------------------------------------------------
 | Types
 |--------------------------------------------------------------------------------
 */

export type Listeners<R extends EventRecord> = Record<string, Set<ProjectionHandler<R>> | undefined>;

export type ProjectionMessage<Event extends EventRecord = EventRecord> = {
  event: Event;
  state: ProjectionState;
};