packages/event-store/src/EventStore.ts

Summary

Maintainability
A
0 mins
Test Coverage
import { IndexedDatabase } from "@valkyr/db";

import {
  createEventRecord,
  Empty,
  Event as LedgerEvent,
  EventRecord,
  EventStatus,
  EventToRecord,
  getTimestamp
} from "./Event/mod.js";
import { Projector } from "./Projector/Projector.js";
import { ReduceHandler } from "./Reducer.js";
import { Remote, RemoteAdapter, RemoteSubscription } from "./Remote.js";
import { Validator } from "./Validator.js";

type EventList<Event extends LedgerEvent> = Set<Event["type"]>;

type Options<Record extends EventRecord> = {
  remote: RemoteAdapter;
  events: EventList<Record>;
  validator?: Validator<Record>;
  projector?: Projector<Record>;
};

export class EventStore<Event extends LedgerEvent = LedgerEvent, Record extends EventRecord = EventToRecord<Event>> {
  readonly #remote: Remote;
  readonly #events: EventList<Event>;
  readonly #db: IndexedDatabase<{
    events: EventRecord;
  }>;

  readonly #validator: Validator<Record>;
  readonly #projector: Projector<Record>;

  constructor(readonly name: string, options: Options<Record>) {
    this.#remote = new Remote(options.remote);
    this.#events = options.events;
    this.#db = new IndexedDatabase({
      name: `event-store:${name}`,
      version: 1,
      registrars: [
        {
          name: "events",
          indexes: [
            ["tenant", { unique: false }],
            ["stream", { unique: false }],
            ["created", { unique: false }],
            ["recorded", { unique: false }]
          ]
        }
      ]
    });

    this.#validator = options.validator ?? new Validator<Record>();
    this.#projector = options.projector ?? new Projector<Record>();

    this.#remote.subject.subscribe(([record, hydrated]) => {
      this.insert(record as any, hydrated);
    });

    this.push = this.push.bind(this);
    this.insert = this.insert.bind(this);
  }

  /*
   |--------------------------------------------------------------------------------
   | Validation
   |--------------------------------------------------------------------------------
   */

  get validate() {
    return this.#validator.validate.bind(this.#validator);
  }

  /*
   |--------------------------------------------------------------------------------
   | Projections
   |--------------------------------------------------------------------------------
   */

  get project() {
    return this.#projector.project.bind(this.#projector);
  }

  get on() {
    return this.#projector.on.bind(this.#projector);
  }

  get once() {
    return this.#projector.once.bind(this.#projector);
  }

  get all() {
    return this.#projector.all.bind(this.#projector);
  }

  /*
   |--------------------------------------------------------------------------------
   | Collections
   |--------------------------------------------------------------------------------
   */

  get events() {
    return this.#db.collection("events");
  }

  /*
   |--------------------------------------------------------------------------------
   | Subscriber
   |--------------------------------------------------------------------------------
   */

  subscribeToTenant(tenantId: string): RemoteSubscription {
    return this.#remote.subscribe("tenant", tenantId);
  }

  subscribeToStream(streamId: string): RemoteSubscription {
    return this.#remote.subscribe("stream", streamId);
  }

  /*
   |--------------------------------------------------------------------------------
   | Write Utilities
   |--------------------------------------------------------------------------------
   */

  /**
   * Push a new event onto the local event store database.
   *
   * @remarks Push is meant to take events from the local services and insert them as new event
   * records as non hydrated events.
   *
   * @param tenant - Tenant the event belongs to.
   * @param stream - Stream the event belongs to.
   * @param event  - Event data to record.
   */
  async push<T extends Event["type"]>(
    tenant: string,
    stream: string,
    event: ExcludeEmptyFields<Extract<Event, { type: T }>>
  ) {
    if (this.#events.has((event as any).type) === false) {
      throw new Error(`Event '${(event as any).type}' is not registered with the event store!`);
    }
    const record = createEventRecord(tenant, stream, event as any);
    await this.insert(record as any, false);
  }

  /**
   * Insert a new event to the local event store database.
   *
   * @remarks This method triggers event validation and projection. If validation fails the event will
   * not be inserted. If the projection fails the projection itself should be handling the error based
   * on its own business logic.
   *
   * @remarks When hydration is true the event will be recorded with a new locally generated timestamp
   * as its being recorded locally but is not the originator of the event creation.
   *
   * @param record   - EventRecord to insert.
   * @param hydrated - Whether the event is hydrated or not. (Optional)
   */
  async insert(record: Record, hydrated = true): Promise<Record | undefined> {
    if (this.#events.has(record.type) === false) {
      return; // event record not supported by this event store
    }

    const status = await this.status(record);
    if (status.exists === true) {
      return record; // event already exists, no further actions required
    }

    if (hydrated === true) {
      record = {
        ...record,
        recorded: getTimestamp() // set locally recorded timestamp
      };
    }

    await this.validate(record);
    await this.events.insertOne(record as any);
    await this.project(record, { hydrated, outdated: status.outdated }).catch(console.log);

    if (hydrated === false) {
      this.#remote.push(record);
    }

    return record;
  }

  /**
   * Retrieves events from the local ledger and projects them against the
   * running publisher instance.
   *
   * @param stream - Stream to hydrate. (Optional)
   * @param from   - Get events starting at a specific time position. (Optional)
   */
  async rehydrate(stream?: string, from?: string) {
    const events = stream ? await this.stream(stream, from) : await this.events.find({}, { sort: { created: 1 } });
    for (const event of events) {
      await this.project(event as any, { hydrated: true, outdated: false });
    }
  }

  /**
   * Enable the ability to check an incoming events status in relation to
   * the local ledger. This is to determine what actions to take upon the
   * ledger based on the current status.
   *
   * **Exists**
   *
   * References the existence of the event in the local ledger. It is
   * determined by looking at the recorded event id which should be unique
   * to the entirety of the ledger.
   *
   * **Outdated**
   *
   * References the events created relationship to the same event type in
   * the hosted stream. If another event of the same type in the stream
   * is newer than the provided event, the provided event is considered
   * outdated.
   */
  async status({ id, stream, type, created }: Record): Promise<EventStatus> {
    const record = await this.events.findOne({ id });
    if (record) {
      return { exists: true, outdated: true };
    }
    const count = await this.events.count({
      stream,
      type,
      created: {
        $gt: created
      }
    });
    return { exists: false, outdated: count > 0 };
  }

  /*
   |--------------------------------------------------------------------------------
   | Stream Utilities
   |--------------------------------------------------------------------------------
   */

  /**
   * An event reducer aims to create an aggregate state that is as close
   * to up to date as possible. This is handy when we want to perform
   * things such as business logic on the command/action layer of the event
   * creation lifecycle.
   *
   * By default the state is as close as possible since we are operating
   * in a distributed system without a central authority or sequential
   * event bus. As such developers is advised to build with failure at a
   * later date as an option.
   *
   * This method operates by pulling all the latest known events of an event
   * stream and reduces them into a single current state representing of
   * the event stream.
   */
  async reduce<Reduce extends ReduceHandler>(stream: string, reduce: Reduce): Promise<ReturnType<Reduce> | undefined> {
    const events = await this.stream(stream);
    if (events.length === 0) {
      return undefined;
    }
    return reduce(events);
  }

  /**
   * Retrieve all events for a given stream. Provided timestamp allows for
   * providing a specific point in time to retrieve before or after based
   * on a provided sort direction.
   *
   * To ensure that we have the latest events in the stream at the time
   * of the request, we send a pull request to the attached remote service
   * before executing the local event query.
   *
   * @param stream    - Stream to retrieve events for.
   * @param cursor    - Get events from a specific point in time.
   * @param direction - Get the events in ascending or descending order.
   */
  async stream(stream: string, cursor?: string, direction: 1 | -1 = 1) {
    const filter: any = { stream };
    if (cursor !== undefined) {
      filter.created = {
        [direction === 1 ? "$gt" : "$lt"]: cursor
      };
    }
    return this.events.find(filter, { sort: { created: 1 } });
  }

  /**
   * Pull all events in order of locally recorded timestamp.
   *
   * @param stream - Stream to retrieve events for.
   * @param cursor - Get events from a specific point in time.
   */
  async pull(stream: string, cursor?: string) {
    const filter: any = { stream };
    if (cursor !== undefined) {
      filter.recorded = {
        $gt: cursor
      };
    }
    return this.events.find(filter, { sort: { recorded: 1 } });
  }
}

/*
 |--------------------------------------------------------------------------------
 | Types
 |--------------------------------------------------------------------------------
 */

type ExcludeEmptyFields<T> = {
  [K in keyof T as T[K] extends Empty ? never : K]: T[K];
};