neet/masto.js

View on GitHub
src/adapters/ws/web-socket-subscription.ts

Summary

Maintainability
A
0 mins
Test Coverage
import type WebSocket from "isomorphic-ws";

import {
  type Logger,
  type Serializer,
  type WebSocketConnector,
} from "../../interfaces";
import { type mastodon } from "../../mastodon";
import { MastoUnexpectedError } from "../errors";
import { toAsyncIterable } from "./async-iterable";

export class WebSocketSubscription implements mastodon.streaming.Subscription {
  private connection?: WebSocket;

  constructor(
    private readonly connector: WebSocketConnector,
    private readonly serializer: Serializer,
    private readonly stream: string,
    private readonly logger?: Logger,
    private readonly params?: Record<string, unknown>,
  ) {}

  async *values(): AsyncIterableIterator<mastodon.streaming.Event> {
    this.logger?.log("info", "Subscribing to stream", this.stream);

    while (this.connector.canAcquire()) {
      this.connection = await this.connector.acquire();

      const data = this.serializer.serialize("json", {
        type: "subscribe",
        stream: this.stream,
        ...this.params,
      });

      this.logger?.log("debug", "↑ WEBSOCKET", data);
      this.connection.send(data);

      const messages = toAsyncIterable(this.connection);

      for await (const message of messages) {
        const event = await this.parseMessage(message.data as string);

        if (!this.test(event)) {
          continue;
        }

        this.logger?.log("debug", "↓ WEBSOCKET", event);
        yield event;
      }
    }
  }

  unsubscribe(): void {
    if (this.connection == undefined) {
      return;
    }

    const data = this.serializer.serialize("json", {
      type: "unsubscribe",
      stream: this.stream,
      ...this.params,
    });

    this.connection.send(data);
  }

  [Symbol.asyncIterator](): AsyncIterableIterator<mastodon.streaming.Event> {
    return this.values();
  }

  /**
   * @experimental This is an experimental API.
   */
  [Symbol.dispose](): void {
    this.unsubscribe();
  }

  private test(event: mastodon.streaming.Event): boolean {
    // subscribe("hashtag", { tag: "foo" }) -> ["hashtag", "foo"]
    // subscribe("list", { list: "foo" })   -> ["list", "foo"]
    const params = this.params ?? {};
    const extra = Object.values(params) as string[];
    const stream = [this.stream, ...extra];
    return stream.every((s) => event.stream.includes(s));
  }

  private async parseMessage(
    rawEvent: string,
  ): Promise<mastodon.streaming.Event> {
    const data = this.serializer.deserialize<mastodon.streaming.RawEvent>(
      "json",
      rawEvent,
    );

    if ("error" in data) {
      throw new MastoUnexpectedError(data.error);
    }

    const payload =
      data.event === "delete" || data.payload == undefined
        ? data.payload
        : this.serializer.deserialize("json", data.payload);

    return {
      stream: data.stream,
      event: data.event,
      payload: payload,
    } as mastodon.streaming.Event;
  }
}