trufflesuite/truffle

View on GitHub
packages/dashboard-message-bus-client/lib/connection/index.ts

Summary

Maintainability
C
1 day
Test Coverage
import WebSocket from "isomorphic-ws";

// must polyfill AbortController to use axios >=0.20.0, <=0.27.2 on node <= v14.x
import "../polyfill";
import axios from "axios";
import {
  base64ToJson,
  jsonToBase64,
  Message
} from "@truffle/dashboard-message-bus-common";

import debugModule from "debug";

import { MessageBusConnectionError } from "../errors";
import {
  DashboardMessageBusConnectionEvents,
  DashboardMessageBusConnectionOptions,
  SocketEventHandlerMap
} from "./types";

import { TypedEmitter } from "tiny-typed-emitter";

import { tracked } from "@truffle/promise-tracker";
import delay from "delay";

const debug = debugModule("dashboard-message-bus-client:connection");
const debugMessage = debugModule("dashboard-message-bus-client:message");

type HandlerFactory<T> = (
  resolve: (result: T) => void,
  reject: (err?: any) => void
) => SocketEventHandlerMap;

export class DashboardMessageBusConnection extends TypedEmitter<DashboardMessageBusConnectionEvents> {
  private _connectionType: "publish" | "subscribe";
  private _socket: WebSocket | undefined;
  private _host: string;
  private _port: number;
  private _publishPort: number | undefined;
  private _subscribePort: number | undefined;
  private _connecting: boolean;

  constructor({
    host,
    port,
    publishPort,
    subscribePort,
    connectionType: socketType
  }: DashboardMessageBusConnectionOptions) {
    super();
    this._host = host;
    this._port = port;
    this._publishPort = publishPort;
    this._subscribePort = subscribePort;
    this._connectionType = socketType;
  }

  get isConnected() {
    return this._socket && this._socket.readyState === WebSocket.OPEN;
  }

  get isConnecting() {
    return this._socket && this._socket.readyState === WebSocket.CONNECTING;
  }

  get isClosing() {
    return this._socket && this._socket.readyState === WebSocket.CLOSING;
  }

  async connect(): Promise<void> {
    if (this._socket) {
      switch (this._socket.readyState) {
        case WebSocket.CONNECTING:
          debug(
            "connect: %s already attempting to connect (readyState switch)",
            this._connectionType
          );
          await delay(10);
          return this.connect();

        case WebSocket.OPEN:
          // we're already connected, just return
          debug("connect: %s already connected", this._connectionType);
          return;

        case WebSocket.CLOSING:
        case WebSocket.CLOSED:
          debug(
            "connect: %s was previously connected but has been closed",
            this._connectionType
          );
          // already closed or on our way there, so we'll just recreate it in a
          // moment
          delete this._socket;
      }
    }

    try {
      if (this._connecting) {
        debug(
          "connect: %s already attempting to connect (_connecting flag)",
          this._connectionType
        );
        await delay(10);
        return this.connect();
      }

      this._connecting = true;

      const port = await this._getMessageBusPort();

      const url = `ws://${this._host}:${port}`;

      debug(
        "connect: %s is attempting to connect to %s",
        this._connectionType,
        url
      );

      this._socket = new WebSocket(url);

      this._socket?.addEventListener(
        "message",
        ((event: WebSocket.MessageEvent) => {
          if (typeof event.data !== "string") {
            event.data = event.data.toString();
          }

          const message = base64ToJson(event.data);
          debugMessage(
            "%s connection received message %o",
            this._connectionType,
            message
          );
          this.emit("message", message);
        }).bind(this)
      );

      // connecting

      // we now have a socket that's in the process of opening, so return a
      // promise that resolves when it opens, or fails to open
      const connectPromise = this._createEventWrapperPromise<void>(
        (resolve, reject) => {
          return {
            open: () => {
              debug(
                "connect: %s connection succeeded to url %s",
                this._connectionType,
                this._socket?.url
              );
              if (this._connectionType === "subscribe") {
                this._socket?.send("ready");
              }
              resolve();
              this._connecting = false;
            },

            error: (event: WebSocket.ErrorEvent) => {
              debug(
                "connect: %s connection to url %s failed due to error %s",
                this._connectionType,
                this._socket?.url,
                event.error
              );
              reject(
                new MessageBusConnectionError({
                  message: event.error.message,
                  cause: event.error
                })
              );
              this._connecting = false;
            },

            close: (event: WebSocket.CloseEvent) => {
              debug(
                "connect: %s connection to url %s closed before successfully connecting due to code %s and reason %s",
                this._connectionType,
                this._socket?.url,
                event.code,
                event.reason
              );
              reject(
                new MessageBusConnectionError({
                  message: `Socket connection closed with code '${event.code}' and reason '${event.reason}'`
                })
              );
              this._connecting = false;
            }
          };
        }
      );

      let timedout = false;
      await Promise.race([
        connectPromise,
        async () => {
          await delay(350);
          timedout = true;
        }
      ]);
      if (timedout) {
        debug(
          "connect: %s connection to url %s timed out",
          this._connectionType,
          url
        );
      }
    } catch {
      this._connecting = false;
    }
  }

  async send(message: Message): Promise<void>;
  async send(data: string): Promise<void>;
  @tracked
  async send(dataOrMessage: string | Message): Promise<void> {
    const encodedMessage =
      typeof dataOrMessage === "string"
        ? dataOrMessage
        : jsonToBase64(dataOrMessage);

    await this.connect();

    debug(
      "send: %s connection sending %o",
      this._connectionType,
      base64ToJson(encodedMessage)
    );
    this._socket?.send(encodedMessage);
  }

  async close(): Promise<void> {
    if (!this._socket) {
      return;
    }

    if (this._socket.readyState <= WebSocket.CLOSING) {
      const promise = this._createEventWrapperPromise<void>(
        (resolve, reject) => {
          return {
            error: (event: WebSocket.ErrorEvent) => {
              reject(event.error);
            },
            close: () => {
              debug("%s connection closed", this._connectionType);
              resolve();
            }
          };
        }
      );

      this._socket.close();
      return promise;
    }
  }

  private async _getMessageBusPort(): Promise<number> {
    if (this._connectionType === "subscribe" && this._subscribePort) {
      return this._subscribePort;
    }

    if (this._connectionType === "publish" && this._publishPort) {
      return this._publishPort;
    }

    // otherwise, fetch it from the server
    try {
      debug(
        "_getMessageBusPort: %s connection attempting to fetch ports",
        this._connectionType
      );
      const { data } = await axios.get(
        `http://${this._host}:${this._port}/ports`,
        {
          timeout: 350
        }
      );

      const port =
        this._connectionType === "subscribe"
          ? data.subscribePort
          : data.publishPort;

      debug(
        "_getMessageBusPort: %s connection will use port %s",
        this._connectionType,
        port
      );

      return port;
    } catch (err) {
      debug(
        "_getMessageBusPort: failed fetching ports for %s connection due to error %s",
        this._connectionType,
        err
      );
      throw err;
    }
  }

  private _createEventWrapperPromise<T>(
    handlerFactory: HandlerFactory<T>
  ): Promise<T> {
    return new Promise<T>(
      ((resolve: (result: T) => void, reject: (err?: any) => void) => {
        this._registerEventHandlers(handlerFactory.call(this, resolve, reject));
      }).bind(this)
    );
  }

  private _registerEventHandlers(handlers: SocketEventHandlerMap) {
    let wrappedHandlers: SocketEventHandlerMap = {};
    for (const eventType in handlers) {
      wrappedHandlers[eventType] = ((...args: any[]) => {
        handlers[eventType].call(this, ...args);
        this._cleanUpEventHandlers(wrappedHandlers);
      }).bind(this);
      this._socket?.addEventListener(eventType, wrappedHandlers[eventType]);
    }
  }
  private _cleanUpEventHandlers(handlers: SocketEventHandlerMap) {
    for (const eventType in handlers) {
      this._socket?.removeEventListener(eventType, handlers[eventType]);
    }
  }
}