trufflesuite/truffle

View on GitHub
packages/dashboard-message-bus-client/lib/client.ts

Summary

Maintainability
A
1 hr
Test Coverage
import {
  DashboardMessageBusClientOptions,
  SendOptions,
  ResolvedDashboardMessageBusClientOptions,
  SubscriptionOptions
} from "./types";
import {
  createMessage,
  Message,
  Response
} from "@truffle/dashboard-message-bus-common";

import { DashboardMessageBusConnection } from "./connection";
import delay from "delay";
import debugModule from "debug";
import {
  DashboardMessageBusSubscription,
  PublishMessageLifecycle
} from "./lifecycle";
import { waitForOutstandingPromises } from "@truffle/promise-tracker";

const debug = debugModule(`dashboard-message-bus-client:client`);

export class DashboardMessageBusClient {
  private _options: ResolvedDashboardMessageBusClientOptions;

  private _publishConnection: DashboardMessageBusConnection;
  private _subscribeConnection: DashboardMessageBusConnection;
  private _subscriptions: DashboardMessageBusSubscription<Message>[] = [];

  get options(): ResolvedDashboardMessageBusClientOptions {
    return { ...this._options };
  }

  constructor(options: DashboardMessageBusClientOptions) {
    this._options = {
      host: "localhost",
      port: 24012,
      maxRetries: 1,
      retryDelayMsec: 100,
      ...(options ?? {})
    };

    const { host, port, publishPort, subscribePort } = this._options;
    this._publishConnection = new DashboardMessageBusConnection({
      host,
      port,
      publishPort,
      connectionType: "publish"
    });

    this._subscribeConnection = new DashboardMessageBusConnection({
      host,
      port,
      subscribePort,
      connectionType: "subscribe"
    });

    this._subscribeConnection.on("message", this._messageHandler.bind(this));
  }

  async ready() {
    await this._withRetriesAsync(async () => {
      Promise.all([
        this._publishConnection.connect(),
        this._subscribeConnection.connect()
      ]);
    });
  }

  async publish<MessageType extends Message, ResponseType extends Response>(
    options: SendOptions
  ): Promise<PublishMessageLifecycle<MessageType, ResponseType>> {
    const { type, payload } = options;
    let message = createMessage(type, payload);
    try {
      await this.ready();

      const lifecycle = new PublishMessageLifecycle({
        message,
        connection: this._publishConnection
      });

      return await this._withRetriesAsync(
        (async () => {
          debug("publisher sending message %o", message);
          await this._publishConnection.send(message);
          return lifecycle;
        }).bind(this)
      );
    } catch (err) {
      debug("sending message %o failed due to error %s", message, err);
      throw err;
    }
  }

  subscribe<MessageType extends Message>(
    options: SubscriptionOptions
  ): DashboardMessageBusSubscription<MessageType> {
    const subscription = new DashboardMessageBusSubscription<MessageType>(
      options
    );
    this._subscriptions.push(subscription);
    return subscription;
  }

  async close(force: boolean = false): Promise<void> {
    if (!force) {
      await this.waitForOutstandingPromises();
    }

    this._subscriptions.map(sub => sub._end());
    this._subscriptions = [];
    await Promise.all([
      this._subscribeConnection.close(),
      this._publishConnection.close()
    ]);
  }

  async waitForOutstandingPromises(): Promise<void> {
    await waitForOutstandingPromises({ target: this });
    return;
  }

  private _messageHandler(message: Message) {
    this._subscriptions.map(sub =>
      sub._evaluateMessage({ message, connection: this._subscribeConnection })
    );
  }

  private async _withRetriesAsync(f: Function) {
    const { maxRetries, retryDelayMsec } = this._options;

    for (let tryCount = 0; tryCount <= maxRetries; tryCount++) {
      try {
        const result = f.call(this);
        if (result.then) {
          // ensure any promise rejections are handled here so we count them as
          // failures to retry
          return await result;
        } else {
          return result;
        }
      } catch (err) {
        if (tryCount < maxRetries) {
          debug(
            "Attempt failed, %s of %s attempts remaining, delaying %s msec before retrying.",
            maxRetries - tryCount,
            maxRetries + 1,
            retryDelayMsec
          );
          await delay(retryDelayMsec);
          debug("Retrying failed operation now");
        } else {
          debug("Operation failed after %s attempts", tryCount);
          throw err;
        }
      }
    }
  }
}