RocketChat/Rocket.Chat

View on GitHub
apps/meteor/server/services/federation/service.ts

Summary

Maintainability
C
1 day
Test Coverage
import { ServiceClassInternal } from '@rocket.chat/core-services';
import type { IFederationService } from '@rocket.chat/core-services';

import type { FederationRoomServiceSender } from './application/room/sender/RoomServiceSender';
import type { FederationUserServiceSender } from './application/user/sender/UserServiceSender';
import type { IFederationBridge } from './domain/IFederationBridge';
import { FederationFactory } from './infrastructure/Factory';
import type { InMemoryQueue } from './infrastructure/queue/InMemoryQueue';
import type { RocketChatFileAdapter } from './infrastructure/rocket-chat/adapters/File';
import type { RocketChatMessageAdapter } from './infrastructure/rocket-chat/adapters/Message';
import type { RocketChatNotificationAdapter } from './infrastructure/rocket-chat/adapters/Notification';
import type { RocketChatRoomAdapter } from './infrastructure/rocket-chat/adapters/Room';
import type { RocketChatSettingsAdapter } from './infrastructure/rocket-chat/adapters/Settings';
import type { RocketChatUserAdapter } from './infrastructure/rocket-chat/adapters/User';
import { FederationRoomSenderConverter } from './infrastructure/rocket-chat/converters/RoomSender';
import { FederationHooks } from './infrastructure/rocket-chat/hooks';

import './infrastructure/rocket-chat/well-known';

export abstract class AbstractFederationService extends ServiceClassInternal {
    private cancelSettingsObserver: () => void;

    private internalQueueInstance: InMemoryQueue;

    private internalSettingsAdapter: RocketChatSettingsAdapter;

    private internalRoomServiceSender: FederationRoomServiceSender;

    private internalUserServiceSender: FederationUserServiceSender;

    private internalRoomAdapter: RocketChatRoomAdapter;

    private internalUserAdapter: RocketChatUserAdapter;

    private internalFileAdapter: RocketChatFileAdapter;

    private internalMessageAdapter: RocketChatMessageAdapter;

    private internalNotificationAdapter: RocketChatNotificationAdapter;

    private isRunning = false;

    protected PROCESSING_CONCURRENCY = 1;

    protected bridge: IFederationBridge;

    protected abstract setupInternalEphemeralListeners(): Promise<void>;

    protected abstract setupInternalValidators(): Promise<void>;

    protected abstract setupInternalActionListeners(): Promise<void>;

    protected abstract onEnableFederation(): Promise<void>;

    protected abstract onDisableFederation(): Promise<void>;

    constructor(
        federationBridge: IFederationBridge,
        internalQueueInstance: InMemoryQueue,
        internalSettingsAdapter: RocketChatSettingsAdapter,
    ) {
        super();
        this.internalQueueInstance = internalQueueInstance;
        this.internalSettingsAdapter = internalSettingsAdapter;
        this.bridge = federationBridge;
        this.internalFileAdapter = FederationFactory.buildInternalFileAdapter();
        this.internalRoomAdapter = FederationFactory.buildInternalRoomAdapter();
        this.internalUserAdapter = FederationFactory.buildInternalUserAdapter();
        this.internalMessageAdapter = FederationFactory.buildInternalMessageAdapter();
        this.internalNotificationAdapter = FederationFactory.buildInternalNotificationAdapter();
        this.internalRoomServiceSender = FederationFactory.buildRoomServiceSender(
            this.internalRoomAdapter,
            this.internalUserAdapter,
            this.internalFileAdapter,
            this.internalMessageAdapter,
            this.internalSettingsAdapter,
            this.internalNotificationAdapter,
            this.bridge,
        );
        this.internalUserServiceSender = FederationFactory.buildUserServiceSender(
            this.internalRoomAdapter,
            this.internalUserAdapter,
            this.internalFileAdapter,
            this.internalSettingsAdapter,
            this.bridge,
        );
        this.setEventListeners();
    }

    private setEventListeners(): void {
        this.onEvent('user.avatarUpdate', async ({ username }): Promise<void> => {
            if (!this.isFederationEnabled()) {
                return;
            }
            if (!username) {
                return;
            }
            await this.internalUserServiceSender.afterUserAvatarChanged(username);
        });
        this.onEvent('user.typing', async ({ isTyping, roomId, user: { username } }): Promise<void> => {
            if (!roomId || !username) {
                return;
            }

            await this.internalUserServiceSender.onUserTyping(username, roomId, isTyping);
        });
        this.onEvent('user.realNameChanged', async ({ _id, name }): Promise<void> => {
            if (!this.isFederationEnabled()) {
                return;
            }
            if (!name || !_id) {
                return;
            }
            await this.internalUserServiceSender.afterUserRealNameChanged(_id, name);
        });
        this.onEvent(
            'federation.userRoleChanged',
            async (data: Record<string, any>): Promise<void> => FederationHooks.afterRoomRoleChanged(this.internalRoomServiceSender, data),
        );
    }

    private async onFederationEnabledSettingChange(isFederationEnabled: boolean): Promise<void> {
        if (!this.isRunning) {
            return;
        }

        if (isFederationEnabled) {
            await this.onDisableFederation();
            return this.onEnableFederation();
        }

        return this.onDisableFederation();
    }

    public async initialize() {
        this.internalSettingsAdapter = FederationFactory.buildInternalSettingsAdapter();
        await this.internalSettingsAdapter.initialize();
        this.cancelSettingsObserver = this.internalSettingsAdapter.onFederationEnabledStatusChanged(
            this.onFederationEnabledSettingChange.bind(this),
        );
    }

    private async noop(): Promise<void> {
        // noop
    }

    private async setupEventHandlersForExternalEvents(): Promise<void> {
        const federationRoomServiceReceiver = FederationFactory.buildRoomServiceReceiver(
            this.internalRoomAdapter,
            this.internalUserAdapter,
            this.internalMessageAdapter,
            this.internalFileAdapter,
            this.internalSettingsAdapter,
            this.internalNotificationAdapter,
            this.internalQueueInstance,
            this.bridge,
        );
        const federationMessageServiceReceiver = FederationFactory.buildMessageServiceReceiver(
            this.internalRoomAdapter,
            this.internalUserAdapter,
            this.internalMessageAdapter,
            this.internalFileAdapter,
            this.internalSettingsAdapter,
            this.bridge,
        );
        const federationUserServiceReceiver = FederationFactory.buildUserServiceReceiver(
            this.internalRoomAdapter,
            this.internalUserAdapter,
            this.internalFileAdapter,
            this.internalNotificationAdapter,
            this.internalSettingsAdapter,
            this.bridge,
        );
        const federationEventsHandler = FederationFactory.buildFederationEventHandler(
            federationRoomServiceReceiver,
            federationMessageServiceReceiver,
            federationUserServiceReceiver,
            this.internalSettingsAdapter,
        );
        this.internalQueueInstance.setHandler(federationEventsHandler.handleEvent.bind(federationEventsHandler), this.PROCESSING_CONCURRENCY);
    }

    protected getInternalSettingsAdapter(): RocketChatSettingsAdapter {
        return this.internalSettingsAdapter;
    }

    protected getInternalRoomServiceSender(): FederationRoomServiceSender {
        return this.internalRoomServiceSender;
    }

    protected getInternalUserServiceSender(): FederationUserServiceSender {
        return this.internalUserServiceSender;
    }

    protected getInternalRoomAdapter(): RocketChatRoomAdapter {
        return this.internalRoomAdapter;
    }

    protected getInternalUserAdapter(): RocketChatUserAdapter {
        return this.internalUserAdapter;
    }

    protected getInternalMessageAdapter(): RocketChatMessageAdapter {
        return this.internalMessageAdapter;
    }

    protected getInternalNotificationAdapter(): RocketChatNotificationAdapter {
        return this.internalNotificationAdapter;
    }

    protected getInternalFileAdapter(): RocketChatFileAdapter {
        return this.internalFileAdapter;
    }

    protected isFederationEnabled(): boolean {
        return this.internalSettingsAdapter.isFederationEnabled();
    }

    protected async setupFederation(): Promise<void> {
        if (this.isFederationEnabled()) {
            await this.setupEventHandlersForExternalEvents();
            await this.setupInternalValidators();
            await this.setupInternalActionListeners();
            await this.setupInternalEphemeralListeners();
        }
        this.isRunning = true;
    }

    protected async cleanUpSettingObserver(): Promise<void> {
        this.cancelSettingsObserver();
        this.isRunning = false;
    }

    protected async cleanUpHandlers(): Promise<void> {
        this.internalQueueInstance.setHandler(this.noop.bind(this), this.PROCESSING_CONCURRENCY);
    }

    protected async verifyMatrixIds(matrixIds: string[]): Promise<Map<string, string>> {
        return this.bridge.verifyInviteeIds(matrixIds);
    }
}

abstract class AbstractBaseFederationService extends AbstractFederationService {
    constructor() {
        const internalQueueInstance = FederationFactory.buildFederationQueue();
        const internalSettingsAdapter = FederationFactory.buildInternalSettingsAdapter();
        const bridge = FederationFactory.buildFederationBridge(internalSettingsAdapter, internalQueueInstance);

        super(bridge, internalQueueInstance, internalSettingsAdapter);
    }

    protected async setupInternalEphemeralListeners(): Promise<void> {
        await this.getInternalNotificationAdapter().subscribeToUserTypingEventsOnFederatedRooms(
            this.getInternalNotificationAdapter().broadcastUserTypingOnRoom.bind(this.getInternalNotificationAdapter()),
        );
    }

    protected async setupInternalValidators(): Promise<void> {
        const federationRoomInternalValidator = FederationFactory.buildRoomInternalValidator(
            this.getInternalRoomAdapter(),
            this.getInternalUserAdapter(),
            this.getInternalFileAdapter(),
            this.getInternalSettingsAdapter(),
            this.bridge,
        );
        FederationFactory.setupValidators(federationRoomInternalValidator);
    }

    protected async setupInternalActionListeners(): Promise<void> {
        const federationMessageServiceSender = FederationFactory.buildMessageServiceSender(
            this.getInternalRoomAdapter(),
            this.getInternalUserAdapter(),
            this.getInternalSettingsAdapter(),
            this.getInternalMessageAdapter(),
            this.bridge,
        );
        FederationFactory.setupListenersForLocalActions(this.getInternalRoomServiceSender(), federationMessageServiceSender);
    }

    protected async onEnableFederation(): Promise<void> {
        await super.setupFederation();
        await this.startFederation();
    }

    protected async onDisableFederation(): Promise<void> {
        await this.stopFederation();
    }

    private async startFederation(): Promise<void> {
        if (!this.isFederationEnabled()) {
            return;
        }
        await this.bridge.start();
        this.bridge.logFederationStartupInfo('Running Federation V2');
        await import('./infrastructure/rocket-chat/slash-commands');
    }

    private async stopFederation(): Promise<void> {
        FederationFactory.removeAllListeners();
        await this.bridge.stop();
        await super.cleanUpHandlers();
    }

    public async stopped(): Promise<void> {
        await this.stopFederation();
        await super.cleanUpSettingObserver();
    }

    public async created(): Promise<void> {
        await super.setupFederation();
        await this.startFederation();
    }
}

export class FederationService extends AbstractBaseFederationService implements IFederationService {
    protected name = 'federation';

    public async createDirectMessageRoomAndInviteUser(
        internalInviterId: string,
        internalRoomId: string,
        externalInviteeId: string,
    ): Promise<void> {
        return this.getInternalRoomServiceSender().createDirectMessageRoomAndInviteUser(
            FederationRoomSenderConverter.toCreateDirectMessageRoomDto(internalInviterId, internalRoomId, externalInviteeId),
        );
    }

    public async verifyMatrixIds(matrixIds: string[]): Promise<Map<string, string>> {
        return super.verifyMatrixIds(matrixIds);
    }

    static async createFederationService(): Promise<FederationService> {
        const federationService = new FederationService();
        await federationService.initialize();
        return federationService;
    }

    public async stopped(): Promise<void> {
        return super.stopped();
    }

    public async created(): Promise<void> {
        return super.created();
    }
}