Discord-InterChat/InterChat

View on GitHub
src/services/BroadcastService.ts

Summary

Maintainability
B
4 hrs
Test Coverage
import HubManager from '#main/managers/HubManager.js';
import HubSettingsManager from '#main/managers/HubSettingsManager.js';
import MessageFormattingService from '#main/services/MessageFormattingService.js';
import Logger from '#main/utils/Logger.js';
import { BroadcastOpts, ReferredMsgData } from '#main/utils/network/Types.js';
import { generateJumpButton as getJumpButton } from '#utils/ComponentUtils.js';
import { ConnectionMode } from '#utils/Constants.js';
import { getAttachmentURL } from '#utils/ImageUtils.js';
import storeMessageData, { NetworkWebhookSendResult } from '#utils/network/storeMessageData.js';
import { getReferredContent, getReferredMsgData } from '#utils/network/utils.js';
import { censor } from '#utils/ProfanityUtils.js';
import { trimAndCensorBannedWebhookWords } from '#utils/Utils.js';
import { Connection } from '@prisma/client';
import { HexColorString, Message, WebhookClient, WebhookMessageCreateOptions } from 'discord.js';

const BATCH_SIZE = 15;
const CONCURRENCY_LIMIT = 10;

export class BroadcastService {
  private webhookClients: Map<string, WebhookClient> = new Map();

  constructor() {
    setInterval(() => this.cleanupWebhookClients(), 5 * 60 * 1000); // 5 minutes
  }

  private getWebhookClient(webhookURL: string): WebhookClient {
    let client = this.webhookClients.get(webhookURL);
    if (!client) {
      client = new WebhookClient({ url: webhookURL });
      this.webhookClients.set(webhookURL, client);
    }
    return client;
  }

  private cleanupWebhookClients() {
    this.webhookClients.forEach((client, url) => {
      client.destroy();
      this.webhookClients.delete(url);
    });
  }

  async broadcastMessage(
    message: Message<true>,
    hub: HubManager,
    hubConnections: Connection[],
    connection: Connection,
  ) {
    const attachmentURL = await this.resolveAttachmentURL(message);
    const username = this.getUsername(hub.settings, message);
    const censoredContent = censor(message.content);
    const referredMessage = await this.fetchReferredMessage(message);
    const referredMsgData = await getReferredMsgData(referredMessage);
    const referredContent = this.getReferredContent(referredMsgData);

    // Sort connections by last active first
    const sortedHubConnections = hubConnections.sort(
      (a, b) => b.lastActive.getTime() - a.lastActive.getTime(),
    );

    Logger.debug(`Broadcasting message to ${sortedHubConnections.length} connections`);

    // Split connections into batches
    const batches = this.chunkArray(sortedHubConnections, BATCH_SIZE);
    const allResults: NetworkWebhookSendResult[] = [];

    // Process batches with concurrency limit
    for (const batch of batches) {
      const batchPromises = batch.map((conn) =>
        this.sendToConnection(message, hub, conn, {
          attachmentURL,
          referredMsgData,
          embedColor: connection.embedColor as HexColorString,
          username,
          censoredContent,
          referredContent,
        }),
      );

      Logger.debug(`Sending batch of ${batch.length} messages`);
      const batchResults = await this.processWithConcurrency(batchPromises, CONCURRENCY_LIMIT);
      allResults.push(...batchResults);
      Logger.debug(`Sent batch of ${batch.length} messages`);
    }

    // Batch store message data
    await storeMessageData(message, allResults, connection.hubId, referredMsgData.dbReferrence);
  }

  private chunkArray<T>(array: T[], size: number): T[][] {
    const chunks: T[][] = [];
    for (let i = 0; i < array.length; i += size) {
      chunks.push(array.slice(i, i + size));
    }
    return chunks;
  }

  private async processWithConcurrency<T>(
    promises: Promise<T>[],
    concurrency: number,
  ): Promise<T[]> {
    const results: T[] = [];
    let index = 0;

    async function next(): Promise<void> {
      const currentIndex = index++;
      if (currentIndex >= promises.length) return;

      try {
        const result = await promises[currentIndex];
        results[currentIndex] = result;
      }
      catch (error) {
        results[currentIndex] = error;
      }

      await next();
    }

    // Start initial batch of promises
    const initialPromises = Array(Math.min(concurrency, promises.length))
      .fill(null)
      .map(() => next());

    await Promise.all(initialPromises);
    return results;
  }
  async resolveAttachmentURL(message: Message) {
    return (
      message.attachments.first()?.url ?? (await getAttachmentURL(message.content)) ?? undefined
    );
  }

  private async fetchReferredMessage(message: Message<true>): Promise<Message | null> {
    return message.reference ? await message.fetchReference().catch(() => null) : null;
  }

  private getReferredContent(data: ReferredMsgData) {
    if (data.referredMessage && data.dbReferrence) {
      const mode =
        data.dbReferrence.broadcastMsgs.get(data.referredMessage.channelId)?.mode ??
        ConnectionMode.Compact;
      return getReferredContent(data.referredMessage, mode);
    }
  }

  private getUsername(settings: HubSettingsManager, message: Message<true>): string {
    return trimAndCensorBannedWebhookWords(
      settings.has('UseNicknames')
        ? (message.member?.displayName ?? message.author.displayName)
        : message.author.username,
    );
  }

  private async sendToConnection(
    message: Message<true>,
    hub: HubManager,
    connection: Connection,
    opts: BroadcastOpts & {
      username: string;
      censoredContent: string;
      referredContent: string | undefined;
      referredMsgData: ReferredMsgData;
    },
  ): Promise<NetworkWebhookSendResult> {
    try {
      const messageFormat = this.getMessageFormat(message, connection, hub, opts);
      const messageRes = await this.sendMessage(connection.webhookURL, messageFormat);
      const mode = connection.compact ? ConnectionMode.Compact : ConnectionMode.Embed;

      return { messageRes, webhookURL: connection.webhookURL, mode };
    }
    catch (e) {
      Logger.error(
        `Failed to send message to ${connection.channelId} in server ${connection.serverId}`,
        e,
      );
      return { error: e.message, webhookURL: connection.webhookURL };
    }
  }

  private getMessageFormat(
    message: Message<true>,
    connection: Connection,
    hub: HubManager,
    opts: BroadcastOpts & {
      username: string;
      censoredContent: string;
      referredContent: string | undefined;
      referredMsgData: ReferredMsgData;
    },
  ): WebhookMessageCreateOptions {
    const { dbReferrence, referredAuthor } = opts.referredMsgData;
    const author = { username: opts.username, avatarURL: message.author.displayAvatarURL() };
    const jumpButton = this.getJumpButton(
      referredAuthor?.username ?? 'Unknown',
      connection,
      dbReferrence,
    );
    const servername = trimAndCensorBannedWebhookWords(message.guild.name);

    const messageFormatter = new MessageFormattingService(connection);
    return messageFormatter.format(message, {
      ...opts,
      hub: hub.data,
      author,
      servername,
      jumpButton,
    });
  }

  private getJumpButton(
    username: string,
    { channelId, serverId }: Connection,
    dbReferrence: ReferredMsgData['dbReferrence'],
  ) {
    const reply = dbReferrence?.broadcastMsgs.get(channelId) ?? dbReferrence;
    return reply?.messageId
      ? [getJumpButton(username, { channelId, serverId, messageId: reply.messageId })]
      : undefined;
  }

  private async sendMessage(webhookUrl: string, data: WebhookMessageCreateOptions) {
    const webhook = this.getWebhookClient(webhookUrl);
    return await webhook.send(data);
  }
}