imqueue/pg-pubsub

View on GitHub
src/PgIpLock.ts

Summary

Maintainability
C
7 hrs
Test Coverage
A
100%
/*!
 * Copyright (c) 2018, imqueue.com <support@imqueue.com>
 *
 * Permission to use, copy, modify, and/or distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
 * REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
 * AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
 * INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
 * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
 * OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
 * PERFORMANCE OF THIS SOFTWARE.
 */
import { Notification } from 'pg';
import { ident, literal } from 'pg-format';
import { clearInterval } from 'timers';
import {
    SCHEMA_NAME,
    SHUTDOWN_TIMEOUT,
} from './constants';
import { AnyLock } from './types';
import { PgIpLockOptions } from './types/PgIpLockOptions';
import Timeout = NodeJS.Timeout;

/**
 * Implements manageable inter-process locking mechanism over
 * existing PostgreSQL connection for a given `LISTEN` channel.
 *
 * It uses periodic locks acquire retries and implements graceful shutdown
 * using `SIGINT`, `SIGTERM` and `SIGABRT` OS signals, by which safely releases
 * an acquired lock, which causes an event to other similar running instances
 * on another processes (or on another hosts) to capture free lock.
 *
 * By running inside Docker containers this would work flawlessly on
 * implementation auto-scaling services, as docker destroys containers
 * gracefully.
 *
 * Currently, the only known issue could happen only if, for example, database
 * or software (or hardware) in the middle will cause a silent disconnect. For
 * some period of time, despite the fact that there are other live potential
 * listeners some messages can go into void. This time period can be tuned by
 * bypassing wanted `acquireInterval` argument. By the way, take into account
 * that too short period and number of running services may cause huge flood of
 * lock acquire requests to a database, so selecting the proper number should be
 * a thoughtful trade-off between overall system load and reliability level.
 *
 * Usually you do not need to instantiate this class directly - it will be done
 * by a PgPubSub instances on their needs. Therefore, you may re-use this piece
 * of code in some other implementations, so it is exported as is.
 */
export class PgIpLock implements AnyLock {
    /**
     * DB lock schema name getter
     *
     * @return {string}
     */
    public get schemaName(): string {
        const suffix = this.uniqueKey ? '_unique' : '';

        return ident(SCHEMA_NAME + suffix);
    }

    /**
     * Calls destroy() on all created instances at a time
     *
     * @return {Promise<void>}
     */
    public static async destroy(): Promise<void> {
        await Promise.all(
            PgIpLock.instances.slice().map(lock => lock.destroy()),
        );
    }

    /**
     * Returns true if at least one instance was created, false - otherwise
     *
     * @return {boolean}
     */
    public static hasInstances(): boolean {
        return PgIpLock.instances.length > 0;
    }

    private static instances: PgIpLock[] = [];
    private acquired = false;
    private notifyHandler: (message: Notification) => void;
    private acquireTimer?: Timeout;

    /**
     * @constructor
     * @param {string} channel - source channel name to manage locking on
     * @param {PgIpLockOptions} options - lock instantiate options
     * @param {string} [uniqueKey] - unique key for specific message
     */
    public constructor(
        public readonly channel: string,
        public readonly options: PgIpLockOptions,
        public readonly uniqueKey?: string,
    ) {
        this.channel = `__${PgIpLock.name}__:${
            channel.replace(RX_LOCK_CHANNEL, '')
        }`;
        PgIpLock.instances.push(this);
    }

    /**
     * Initializes inter-process locks storage in database and starts
     * listening of lock release events, as well as initializes lock
     * acquire retry timer.
     *
     * @return {Promise<void>}
     */
    public async init(): Promise<void> {
        if (!await this.schemaExists()) {
            try {
                await this.createSchema();
                await Promise.all([this.createLock(), this.createDeadlockCheck()]);
            } catch (e) {
                /*ignore*/
            }
        }

        if (this.notifyHandler && !this.uniqueKey) {
            this.options.pgClient.on('notification', this.notifyHandler);
        }

        if (!~PgIpLock.instances.indexOf(this)) {
            PgIpLock.instances.push(this);
        }

        if (!this.uniqueKey) {
            await this.listen();

            // noinspection TypeScriptValidateTypes
            !this.acquireTimer && (this.acquireTimer = setInterval(
                () => !this.acquired && this.acquire(),
                this.options.acquireInterval,
            ));
        }
    }

    /**
     * This would provide release handler which will be called once the
     * lock is released and the channel name would be bypassed to a given
     * handler
     *
     * @param {(channel: string) => void} handler
     */
    public onRelease(handler: (channel: string) => void): void {
        if (!!this.notifyHandler) {
            throw new TypeError(
                'Release handler for IPC lock has been already set up!',
            );
        }

        this.notifyHandler = (message): void => {
            // istanbul ignore else
            // we should skip messages from pub/sub channels and listen
            // only to those which are ours
            if (message.channel === this.channel) {
                handler(this.channel.replace(RX_LOCK_CHANNEL, ''));
            }
        };

        this.options.pgClient.on('notification', this.notifyHandler);
    }

    /**
     * Acquires a lock on the current channel. Returns true on success,
     * false - otherwise
     *
     * @return {Promise<boolean>}
     */
    public async acquire(): Promise<boolean> {
        try {
            this.uniqueKey
                ? await this.acquireUniqueLock()
                : await this.acquireChannelLock()
            ;
            this.acquired = true;
        } catch (err) {
            // will throw, because insert duplicates existing lock
            this.acquired = false;

            // istanbul ignore next
            if (!(err.code === 'P0001' && err.detail === 'LOCKED')) {
                this.options.logger.error(err);
            }
        }

        return this.acquired;
    }

    /**
     * Returns true if lock schema exists, false - otherwise
     *
     * @return {Promise<boolean>}
     */
    private async schemaExists(): Promise<boolean> {
        const { rows } = await this.options.pgClient.query(`
            SELECT schema_name
            FROM information_schema.schemata
            WHERE schema_name = '${this.schemaName}'
        `);

        return (rows.length > 0);
    }

    /**
     * Acquires a lock with ID
     *
     * @return {Promise<void>}
     */
    private async acquireUniqueLock(): Promise<void> {
        // noinspection SqlResolve
        await this.options.pgClient.query(`
            INSERT INTO ${this.schemaName}.lock (id, channel, app)
            VALUES (
                ${literal(this.uniqueKey)},
                ${literal(this.channel)},
                ${literal(this.options.pgClient.appName)}
            ) ON CONFLICT (id) DO
            UPDATE SET app = ${this.schemaName}.deadlock_check(
                ${this.schemaName}.lock.app,
                ${literal(this.options.pgClient.appName)}
            )
        `);
    }

    /**
     * Acquires a lock by unique channel
     *
     * @return {Promise<void>}
     */
    private async acquireChannelLock(): Promise<void> {
        // noinspection SqlResolve
        await this.options.pgClient.query(`
            INSERT INTO ${this.schemaName}.lock (channel, app)
            VALUES (
                ${literal(this.channel)},
                ${literal(this.options.pgClient.appName)}
            ) ON CONFLICT (channel) DO
                UPDATE SET app = ${this.schemaName}.deadlock_check(
                ${this.schemaName}.lock.app,
                ${literal(this.options.pgClient.appName)}
            )
        `);
    }

    /**
     * Releases acquired lock on this channel. After lock is released, another
     * running process or host would be able to acquire the lock.
     *
     * @return {Promise<void>}
     */
    public async release(): Promise<void> {
        if (this.uniqueKey) {
            // noinspection SqlResolve
            await this.options.pgClient.query(`
                DELETE FROM ${this.schemaName}.lock
                WHERE id=${literal(this.uniqueKey)}
            `);
        } else {
            if (!this.acquired) {
                return ; // nothing to release, this lock has not been acquired
            }

            // noinspection SqlResolve
            await this.options.pgClient.query(`
                DELETE FROM ${this.schemaName}.lock
                WHERE channel=${literal(this.channel)}
            `);
        }

        this.acquired = false;
    }

    /**
     * Returns current lock state, true if acquired, false - otherwise.
     *
     * @return {boolean}
     */
    public isAcquired(): boolean {
        return this.acquired;
    }

    /**
     * Destroys this lock properly.
     *
     * @return {Promise<void>}
     */
    public async destroy(): Promise<void> {
        try {
            if (this.notifyHandler) {
                this.options.pgClient.off('notification', this.notifyHandler);
            }

            if (this.acquireTimer) {
                // noinspection TypeScriptValidateTypes
                clearInterval(this.acquireTimer);
                delete this.acquireTimer;
            }

            await Promise.all([this.unlisten(), this.release()]);

            PgIpLock.instances.splice(
                PgIpLock.instances.findIndex(lock => lock === this),
                1,
            );
        } catch (err) {
            // do not crash - just log
            this.options.logger && this.options.logger.error &&
            this.options.logger.error(err);
        }
    }

    /**
     * Starts listening lock release channel
     *
     * @return {Promise<void>}
     */
    private async listen(): Promise<void> {
        await this.options.pgClient.query(`LISTEN ${ident(this.channel)}`);
    }

    /**
     * Stops listening lock release channel
     *
     * @return {Promise<void>}
     */
    private async unlisten(): Promise<void> {
        await this.options.pgClient.query(`UNLISTEN ${ident(this.channel)}`);
    }

    /**
     * Creates lock db schema
     *
     * @return {Promise<void>}
     */
    private async createSchema(): Promise<void> {
        await this.options.pgClient.query(`
            CREATE SCHEMA IF NOT EXISTS ${this.schemaName}
        `);
    }

    /**
     * Creates lock table with delete trigger, which notifies on record removal
     *
     * @return {Promise<void>}
     */
    private async createLock(): Promise<void> {
        // istanbul ignore if
        if (this.uniqueKey) {
            await this.createUniqueLock();

            return ;
        }

        await this.createChannelLock();
    }

    /**
     * Creates unique locks by IDs in the database
     *
     * @return {Promise<void>}
     */
    private async createUniqueLock(): Promise<void> {
        await this.options.pgClient.query(`
            DO $$
                BEGIN
                    IF NOT EXISTS (
                        SELECT *
                        FROM information_schema.columns
                        WHERE table_schema = '${ this.schemaName }'
                            AND table_name = 'lock'
                            AND column_name = 'id'
                    ) THEN
                        DROP TABLE IF EXISTS ${ this.schemaName }.lock;
                    END IF;
                END
            $$
        `);
        await this.options.pgClient.query(`
            CREATE TABLE IF NOT EXISTS ${ this.schemaName }."lock" (
                "id" CHARACTER VARYING NOT NULL PRIMARY KEY,
                "channel" CHARACTER VARYING NOT NULL,
                "app" CHARACTER VARYING NOT NULL
            )
        `);
        await this.options.pgClient.query(`
            DROP TRIGGER IF EXISTS notify_release_lock_trigger
                ON ${this.schemaName}.lock
        `);
    }

    /**
     * Creates locks by channel names in the database
     *
     * @return {Promise<void>}
     */
    private async createChannelLock(): Promise<void> {
        await this.options.pgClient.query(`
            DO $$
                BEGIN
                    IF EXISTS (
                        SELECT *
                        FROM information_schema.columns
                        WHERE table_schema = '${ this.schemaName }'
                            AND table_name = 'lock'
                            AND column_name = 'id'
                    ) THEN
                        DROP TABLE IF EXISTS ${ this.schemaName }.lock;
                    END IF;
                END
            $$
        `);
        await this.options.pgClient.query(`
            CREATE TABLE IF NOT EXISTS ${ this.schemaName }."lock" (
                "channel" CHARACTER VARYING NOT NULL PRIMARY KEY,
                "app" CHARACTER VARYING NOT NULL
            )
        `);
        // noinspection SqlResolve
        await this.options.pgClient.query(`
            CREATE OR REPLACE FUNCTION ${this.schemaName}.notify_lock()
            RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
            BEGIN PERFORM PG_NOTIFY(OLD.channel, '1'); RETURN OLD; END; $$
        `);

        await this.options.pgClient.query(`
            DROP TRIGGER IF EXISTS notify_release_lock_trigger 
                ON ${this.schemaName}.lock
        `);

        try {
            await this.options.pgClient.query(`
                CREATE CONSTRAINT TRIGGER notify_release_lock_trigger
                    AFTER DELETE ON ${this.schemaName}.lock
                    DEFERRABLE INITIALLY DEFERRED
                    FOR EACH ROW EXECUTE PROCEDURE ${
                    this.schemaName}.notify_lock()
            `);
        } catch (e) {
            /*ignore*/
        }
    }

    /**
     * Creates deadlocks check routine used on lock acquaintance
     *
     * @return {Promise<void>}
     */
    private async createDeadlockCheck(): Promise<void> {
        await this.options.pgClient.query(`
            CREATE OR REPLACE FUNCTION ${this.schemaName}.deadlock_check(
                old_app TEXT,
                new_app TEXT
            )
            RETURNS TEXT LANGUAGE PLPGSQL AS $$
            DECLARE num_apps INTEGER;
            BEGIN
                SELECT count(query) INTO num_apps
                FROM pg_stat_activity
                WHERE application_name = old_app;
                IF num_apps > 0 THEN
                    RAISE EXCEPTION 'Duplicate channel for app %', new_app
                    USING DETAIL = 'LOCKED';
                END IF;
                RETURN new_app;
            END; 
            $$
        `);
    }
}

export const RX_LOCK_CHANNEL = new RegExp(`^(__${PgIpLock.name}__:)+`);

let timer: any;
/**
 * Performs graceful shutdown of running process releasing all instantiated
 * locks and properly destroy all their instances.
 */
async function terminate(): Promise<void> {
    let code = 0;

    timer && clearTimeout(timer);
    timer = setTimeout(() => process.exit(code), SHUTDOWN_TIMEOUT);
    code = await destroyLock();
}

/**
 * Destroys all instanced locks and returns exit code
 */
async function destroyLock(): Promise<number> {
    // istanbul ignore if
    if (!PgIpLock.hasInstances()) {
        return 0;
    }

    try {
        await PgIpLock.destroy();

        return 0;
    } catch (err) {
        // istanbul ignore next
        ((PgIpLock.hasInstances()
                ? (PgIpLock as any).instances[0].options.logger
                : console
        ) as any)?.error(err);

        return 1;
    }
}

process.on('SIGTERM', terminate);
process.on('SIGINT',  terminate);
process.on('SIGABRT', terminate);