RocketChat/Rocket.Chat

View on GitHub
apps/meteor/server/email/IMAPInterceptor.ts

Summary

Maintainability
A
1 hr
Test Coverage
import { EventEmitter } from 'events';

import { EmailInbox } from '@rocket.chat/models';
import type { ImapMessage, ImapMessageBodyInfo } from 'imap';
import IMAP from 'imap';
import type { ParsedMail } from 'mailparser';
import { simpleParser } from 'mailparser';

import { notifyOnEmailInboxChanged } from '../../app/lib/server/lib/notifyListener';
import { logger } from '../features/EmailInbox/logger';

type IMAPOptions = {
    deleteAfterRead: boolean;
    filter: any[];
    rejectBeforeTS?: Date;
    markSeen: boolean;
    maxRetries: number;
};

export declare interface IMAPInterceptor {
    on(event: 'email', listener: (email: ParsedMail) => void): this;
}

export class IMAPInterceptor extends EventEmitter {
    private imap: IMAP;

    private config: IMAP.Config;

    private backoffDurationMS = 3000;

    private backoff: NodeJS.Timeout;

    private retries = 0;

    private inboxId: string;

    constructor(
        imapConfig: IMAP.Config,
        private options: IMAPOptions = {
            deleteAfterRead: false,
            filter: ['UNSEEN'],
            markSeen: true,
            maxRetries: 10,
        },
        id: string,
    ) {
        super();

        this.config = imapConfig;

        this.imap = new IMAP({
            connTimeout: 10000,
            keepalive: true,
            ...(imapConfig.tls && { tlsOptions: { servername: imapConfig.host } }),
            ...imapConfig,
        });
        this.retries = 0;
        this.inboxId = id;
        void this.start();
    }

    openInbox(): Promise<IMAP.Box> {
        return new Promise((resolve, reject) => {
            const cb = (err: Error, mailbox: IMAP.Box) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(mailbox);
                }
            };
            this.imap.openBox('INBOX', false, cb);
        });
    }

    async start(): Promise<void> {
        // On successfully connected.
        this.imap.on('ready', async () => {
            if (this.isActive()) {
                logger.info(`IMAP connected to ${this.config.user}`);
                clearTimeout(this.backoff);
                this.retries = 0;
                this.backoffDurationMS = 3000;
                await this.openInbox();
                this.imap.on('mail', () => this.getEmails().catch((err: Error) => logger.debug('Error on getEmails: ', err.message)));
            } else {
                logger.error("Can't connect to IMAP server");
            }
        });

        this.imap.on('error', async (err: Error) => {
            logger.error({ msg: 'IMAP error', err });
            this.retries++;
            await this.reconnect();
        });

        this.imap.on('close', async () => {
            await this.reconnect();
        });
        this.retries += 1;
        return this.imap.connect();
    }

    isActive(): boolean {
        return !!(this.imap?.state && this.imap.state !== 'disconnected');
    }

    stop(callback = new Function()): void {
        logger.debug('IMAP stop called');
        this.imap.removeAllListeners();
        this.imap.once('end', () => {
            logger.debug('IMAP stopped');
            callback?.();
        });
        this.imap.end();
    }

    async reconnect(): Promise<void> {
        if (!this.isActive() && !this.canRetry()) {
            logger.info(`Max retries reached for ${this.config.user}`);
            this.stop();
            return this.selfDisable();
        }
        if (this.backoff) {
            clearTimeout(this.backoff);
            this.backoffDurationMS = 3000;
        }
        const loop = async (): Promise<void> => {
            logger.debug(`Reconnecting to ${this.config.user}: ${this.retries}`);
            if (this.canRetry()) {
                this.backoffDurationMS *= 2;
                this.backoff = setTimeout(loop, this.backoffDurationMS);
            } else {
                logger.info(`IMAP reconnection failed on inbox ${this.config.user}`);
                clearTimeout(this.backoff);
                this.stop();
                await this.selfDisable();
                return;
            }
            this.stop();
            await this.start();
        };
        this.backoff = setTimeout(loop, this.backoffDurationMS);
    }

    imapSearch(): Promise<number[]> {
        return new Promise((resolve, reject) => {
            const cb = (err: Error, results: number[]) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(results);
                }
            };
            this.imap.search(this.options.filter, cb);
        });
    }

    parseEmails(stream: NodeJS.ReadableStream, _info: ImapMessageBodyInfo): Promise<ParsedMail> {
        return new Promise((resolve, reject) => {
            const cb = (err: Error, mail: ParsedMail) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(mail);
                }
            };
            simpleParser(stream, cb);
        });
    }

    imapFetch(emailIds: number[]): Promise<number[]> {
        return new Promise((resolve, reject) => {
            const out: number[] = [];
            const messagecb = (msg: ImapMessage, seqno: number) => {
                out.push(seqno);
                const bodycb = (stream: NodeJS.ReadableStream, _info: ImapMessageBodyInfo): void => {
                    simpleParser(stream, (_err, email) => {
                        if (this.options.rejectBeforeTS && email.date && email.date < this.options.rejectBeforeTS) {
                            logger.error({ msg: `Rejecting email on inbox ${this.config.user}`, subject: email.subject });
                            return;
                        }
                        this.emit('email', email);
                        if (this.options.deleteAfterRead) {
                            this.imap.seq.addFlags(email, 'Deleted', (err) => {
                                if (err) {
                                    logger.warn(`Mark deleted error: ${err}`);
                                }
                            });
                        }
                    });
                };
                msg.once('body', bodycb);
            };
            const errorcb = (err: Error): void => {
                logger.warn(`Fetch error: ${err}`);
                reject(err);
            };
            const endcb = (): void => {
                resolve(out);
            };
            const fetch = this.imap.fetch(emailIds, {
                bodies: ['HEADER', 'TEXT', ''],
                struct: true,
                markSeen: this.options.markSeen,
            });

            fetch.on('message', messagecb);
            fetch.on('error', errorcb);
            fetch.on('end', endcb);
        });
    }

    // Fetch all UNSEEN messages and pass them for further processing
    async getEmails(): Promise<void> {
        const emailIds = await this.imapSearch();
        await this.imapFetch(emailIds);
    }

    canRetry(): boolean {
        return this.retries < this.options.maxRetries || this.options.maxRetries === -1;
    }

    async selfDisable(): Promise<void> {
        logger.info(`Disabling inbox ${this.inboxId}`);

        // Again, if there's 2 inboxes with the same email, this will prevent looping over the already disabled one
        // Active filter is just in case :)
        const { value } = await EmailInbox.setDisabledById(this.inboxId);

        if (value) {
            void notifyOnEmailInboxChanged(value, 'updated');
        }

        logger.info(`IMAP inbox ${this.inboxId} automatically disabled`);
    }
}