RocketChat/Rocket.Chat

View on GitHub
apps/meteor/app/statistics/server/lib/SAUMonitor.ts

Summary

Maintainability
C
1 day
Test Coverage
import type { ISession, ISessionDevice, ISocketConnectionLogged, IUser } from '@rocket.chat/core-typings';
import { cronJobs } from '@rocket.chat/cron';
import { Logger } from '@rocket.chat/logger';
import { Sessions, Users } from '@rocket.chat/models';
import mem from 'mem';
import { Meteor } from 'meteor/meteor';
import UAParser from 'ua-parser-js';

import { getMostImportantRole } from '../../../../lib/roles/getMostImportantRole';
import { getClientAddress } from '../../../../server/lib/getClientAddress';
import { aggregates } from '../../../../server/models/raw/Sessions';
import { sauEvents } from '../../../../server/services/sauMonitor/events';
import { UAParserMobile, UAParserDesktop } from './UAParserCustom';

type DateObj = { day: number; month: number; year: number };

const getDateObj = (dateTime = new Date()): DateObj => ({
    day: dateTime.getDate(),
    month: dateTime.getMonth() + 1,
    year: dateTime.getFullYear(),
});

const logger = new Logger('SAUMonitor');

const getUserRoles = mem(
    async (userId: string): Promise<string[]> => {
        const user = await Users.findOneById<Pick<IUser, 'roles'>>(userId, { projection: { roles: 1 } });

        return user?.roles || [];
    },
    { maxAge: 5000 },
);

/**
 * Server Session Monitor for SAU(Simultaneously Active Users) based on Meteor server sessions
 */
export class SAUMonitorClass {
    private _started: boolean;

    private _dailyComputeJobName: string;

    private _dailyFinishSessionsJobName: string;

    private scheduler = cronJobs;

    constructor() {
        this._started = false;
        this._dailyComputeJobName = 'aggregate-sessions';
        this._dailyFinishSessionsJobName = 'finish-sessions';
    }

    async start(): Promise<void> {
        if (this.isRunning()) {
            return;
        }

        await this._startMonitoring();

        this._started = true;
        logger.debug('[start]');
    }

    async stop(): Promise<void> {
        if (!this.isRunning()) {
            return;
        }

        this._started = false;

        if (await this.scheduler.has(this._dailyComputeJobName)) {
            await this.scheduler.remove(this._dailyComputeJobName);
        }
        if (await this.scheduler.has(this._dailyFinishSessionsJobName)) {
            await this.scheduler.remove(this._dailyFinishSessionsJobName);
        }

        logger.debug('[stop]');
    }

    isRunning(): boolean {
        return this._started === true;
    }

    async _startMonitoring(): Promise<void> {
        try {
            this._handleAccountEvents();
            this._handleOnConnection();
            await this._startCronjobs();
        } catch (err: any) {
            throw new Meteor.Error(err);
        }
    }

    private _handleOnConnection(): void {
        if (this.isRunning()) {
            return;
        }

        sauEvents.on('socket.disconnected', async ({ id, instanceId }) => {
            if (!this.isRunning()) {
                return;
            }

            await Sessions.closeByInstanceIdAndSessionId(instanceId, id);
        });
    }

    private _handleAccountEvents(): void {
        if (this.isRunning()) {
            return;
        }

        sauEvents.on('accounts.login', async ({ userId, connection }) => {
            if (!this.isRunning()) {
                return;
            }

            const roles = await getUserRoles(userId);

            const mostImportantRole = getMostImportantRole(roles);

            const loginAt = new Date();
            const params = { userId, roles, mostImportantRole, loginAt, ...getDateObj() };
            await this._handleSession(connection, params);
        });

        sauEvents.on('accounts.logout', async ({ userId, connection }) => {
            if (!this.isRunning()) {
                return;
            }
            const { id: sessionId } = connection;

            await Sessions.logoutBySessionIdAndUserId({ sessionId, userId });
        });
    }

    private async _handleSession(
        connection: ISocketConnectionLogged,
        params: Pick<ISession, 'userId' | 'mostImportantRole' | 'loginAt' | 'day' | 'month' | 'year' | 'roles'>,
    ): Promise<void> {
        const data = this._getConnectionInfo(connection, params);

        if (!data) {
            return;
        }

        const searchTerm = this._getSearchTerm(data);

        await Sessions.createOrUpdate({ ...data, searchTerm });
    }

    private async _finishSessionsFromDate(yesterday: Date, today: Date): Promise<void> {
        if (!this.isRunning()) {
            return;
        }

        const { day, month, year } = getDateObj(yesterday);
        const beforeDateTime = new Date(year, month - 1, day, 23, 59, 59, 999);

        const currentDate = getDateObj(today);
        const nextDateTime = new Date(currentDate.year, currentDate.month - 1, currentDate.day);

        const cursor = Sessions.findSessionsNotClosedByDateWithoutLastActivity({ year, month, day });

        const batch = [];

        for await (const session of cursor) {
            // create a new session for the current day
            batch.push({
                ...session,
                ...currentDate,
                createdAt: nextDateTime,
            });

            if (batch.length === 500) {
                await Sessions.createBatch(batch);
                batch.length = 0;
            }
        }

        if (batch.length > 0) {
            await Sessions.createBatch(batch);
        }

        // close all sessions from current 'date'
        await Sessions.updateActiveSessionsByDate(
            { year, month, day },
            {
                lastActivityAt: beforeDateTime,
            },
        );

        // TODO missing an action to perform on dangling sessions (for example remove sessions not closed one month ago)
    }

    private _getSearchTerm(session: Omit<ISession, '_id' | '_updatedAt' | 'createdAt' | 'searchTerm'>): string {
        return [session.device?.name, session.device?.type, session.device?.os.name, session.sessionId, session.userId]
            .filter(Boolean)
            .join('');
    }

    private _getConnectionInfo(
        connection: ISocketConnectionLogged,
        params: Pick<ISession, 'userId' | 'mostImportantRole' | 'loginAt' | 'day' | 'month' | 'year' | 'roles'>,
    ): Omit<ISession, '_id' | '_updatedAt' | 'createdAt' | 'searchTerm'> | undefined {
        if (!connection) {
            return;
        }

        const ip = getClientAddress(connection);

        const host = connection.httpHeaders?.host ?? '';

        return {
            type: 'session',
            sessionId: connection.id,
            instanceId: connection.instanceId,
            ...(connection.loginToken && { loginToken: connection.loginToken }),
            ip,
            host,
            ...this._getUserAgentInfo(connection),
            ...params,
        };
    }

    private _getUserAgentInfo(connection: ISocketConnectionLogged): { device: ISessionDevice } | undefined {
        if (!connection?.httpHeaders?.['user-agent']) {
            return;
        }

        const uaString = connection.httpHeaders['user-agent'];

        // TODO define a type for "result" below
        // | UAParser.IResult
        // | { device: { type: string; model?: string }; browser: undefined; os: undefined; app: { name: string; version: string } }
        // | {
        //         device: { type: string; model?: string };
        //         browser: undefined;
        //         os: string;
        //         app: { name: string; version: string };
        //   }

        const result = ((): any => {
            if (UAParserMobile.isMobileApp(uaString)) {
                return UAParserMobile.uaObject(uaString);
            }

            if (UAParserDesktop.isDesktopApp(uaString)) {
                return UAParserDesktop.uaObject(uaString);
            }

            const ua = new UAParser(uaString);
            return ua.getResult();
        })();

        const info: ISessionDevice = {
            type: 'other',
            name: '',
            longVersion: '',
            os: {
                name: '',
                version: '',
            },
            version: '',
        };

        const removeEmptyProps = (obj: any): any => {
            Object.keys(obj).forEach((p) => (!obj[p] || obj[p] === undefined) && delete obj[p]);
            return obj;
        };

        if (result.browser?.name) {
            info.type = 'browser';
            info.name = result.browser.name;
            info.longVersion = result.browser.version || '';
        }

        if (typeof result.os !== 'string' && result.os?.name) {
            info.os = removeEmptyProps(result.os) || '';
        }

        if (result.device && (result.device.type || result.device.model)) {
            info.type = result.device.type || '';

            if (result.hasOwnProperty('app') && result.app?.name) {
                info.name = result.app.name;
                info.longVersion = result.app.version;
                if (result.app.bundle) {
                    info.longVersion += ` ${result.app.bundle}`;
                }
            }
        }

        if (typeof info.longVersion === 'string') {
            info.version = info.longVersion.match(/(\d+\.){0,2}\d+/)?.[0] || '';
        }

        return {
            device: info,
        };
    }

    private async _startCronjobs(): Promise<void> {
        logger.info('[aggregate] - Start Cron.');
        const dailyComputeProcessTime = '0 2 * * *';
        const dailyFinishSessionProcessTime = '5 1 * * *';
        await this.scheduler.add(this._dailyComputeJobName, dailyComputeProcessTime, async () => this._aggregate());
        await this.scheduler.add(this._dailyFinishSessionsJobName, dailyFinishSessionProcessTime, async () => {
            const yesterday = new Date();
            yesterday.setDate(yesterday.getDate() - 1);

            await this._finishSessionsFromDate(yesterday, new Date());
        });
    }

    private async _aggregate(): Promise<void> {
        if (!this.isRunning()) {
            return;
        }

        const today = new Date();

        // get sessions from 3 days ago to make sure even if a few cron jobs were skipped, we still have the data
        const threeDaysAgo = new Date(today.getFullYear(), today.getMonth(), today.getDate() - 3, 0, 0, 0, 0);

        const period = { start: getDateObj(threeDaysAgo), end: getDateObj(today) };

        logger.info({ msg: '[aggregate] - Aggregating data.', period });

        for await (const record of aggregates.dailySessions(Sessions.col, period)) {
            await Sessions.updateDailySessionById(`${record.userId}-${record.year}-${record.month}-${record.day}`, record);
        }

        await Sessions.updateAllSessionsByDateToComputed(period);
    }
}