Tireo/pg-patch

View on GitHub
lib/process.js

Summary

Maintainability
C
1 day
Test Coverage
'use strict';

const q = require('q');
const path = require('path');

const fileScanner = require('./file-scanner');
const dbManager = require('./db-manager');
const fsp = require('./fs-promise');

const common = require('./common');
const actions = common.action;
const transactionMode = common.transactionMode;
const dryRun = common.dryRunMode;

const PgPatchConsoleReporter = require('./reporters/console-reporter');
const PgPatchBasicReporter = require('./reporters/basic-reporter');

let PgPatchProcess = function (config) {
    config = config || {};

    this.targetVersion = common.determineValue(config.targetVersion, null);
    this.sourceVersion = common.determineValue(config.sourceVersion, null);
    this.customPatchData = common.determineValue(config.customPatchData, null);

    this.dryRun = common.determineValue(dryRun[config.dryRun], null);

    if (this.dryRun) { //force single transaction mode
        this.transactionMode = transactionMode.SINGLE;
    } else {
        this.transactionMode = common.determineValue(transactionMode[config.transactionMode], transactionMode.PER_VERSION_STEP);
    }

    this.reporters = [];
    this.createInitialReporters(config);

    (this.scanner = new fileScanner(config)).process = this;
    (this.db = new dbManager(config)).process = this;
};

PgPatchProcess.prototype = {
    createInitialReporters: function (config) {
        /* istanbul ignore else */
        if (config.logLevel !== 'NONE') {
            this.addReporter(new PgPatchConsoleReporter(config));
        }

        /* istanbul ignore else */
        if (config.notify) {
            config.notify.forEach(notifyConfig => {
                this.addReporter(new PgPatchBasicReporter(notifyConfig));
            });
        }

        /* istanbul ignore else */
        if (config.reporters) {
            config.reporters.forEach(reporter => {
                this.addReporter(reporter);
            });
        }
    },
    addReporter: function (reporter) {
        this.reporters.push(reporter);
    },
    msg: function (msgType, msg, data) { //send msg to reporters
        this.reporters.forEach(reporter => {
            reporter.onMsg(msgType, msg, data);
        });
    },
    run: function () {
        let processCache = {};
        let scanner = this.scanner;
        let dbManager = this.db;

        return this.db.connect()
            .then(() => { //validate config
                return scanner.validatePatchFileTemplate();
            })
            .then(() => {  //DB check & setup
                return dbManager.initialDBSetup();
            })
            .then(() => { //scan for patch files
                this.msg("PATCH_FILE_SCAN:START");

                return scanner.scanDirectoryForPatchFiles().then(patchData => {
                    this.msg("PATCH_FILE_SCAN:END", patchData);

                    /* istanbul ignore else */
                    if (this.customPatchData) {
                        this.customPatchData.forEach(data => {
                            data.type = 'CUSTOM';
                            patchData.addData(data);
                        });
                    }

                    processCache.patchData = patchData;
                });
            }).then(() => { //create route data based on patchData
                processCache.patchData.createRouteData();
                processCache.maxPatchVersionFound = processCache.patchData.getMaxPatchVersion();
            })
            .then(() => {
                return dbManager.getCurrentPatchVersion().then(currentVersion => {
                    processCache.currentVersion = currentVersion;
                });
            })
            .then(() => { //determine sourceVersion
                if (this.sourceVersion === null || this.sourceVersion === 'current') {
                    processCache.sourceVersion = processCache.currentVersion;
                } else {
                    processCache.sourceVersion = this.sourceVersion;
                }

                /* istanbul ignore else */
                if (this.sourceVersion < 0) {
                    return Promise.reject(`Invalid sourceVersion: ${processCache.sourceVersion}`);
                }

                /* istanbul ignore else */
                if (this.sourceVersion > processCache.maxPatchVersionFound) {
                    return Promise.reject(`Source patch version not found: ${processCache.sourceVersion}; (Max found was: ${processCache.maxPatchVersionFound})`);
                }
            })
            .then(() => { //determine targetVersion
                if (this.targetVersion === null) { //assume patch to newest
                    processCache.targetVersion = processCache.maxPatchVersionFound;
                } else {
                    processCache.targetVersion = this.targetVersion;
                    if (processCache.targetVersion === 'next') {
                        processCache.targetVersion = processCache.currentVersion + 1;
                    } else /* istanbul ignore else */ if (processCache.targetVersion === 'previous') {
                        processCache.targetVersion = processCache.currentVersion - 1;
                    }
                }

                /* istanbul ignore else */
                if (processCache.targetVersion === null) { //error
                    return Promise.reject(`Target patch version could not be determined`);
                }

                /* istanbul ignore else */
                if (this.targetVersion < 0) {
                    return Promise.reject(`Invalid targetVersion: ${processCache.sourceVersion}`);
                }

                /* istanbul ignore else */
                if (this.targetVersion > processCache.maxPatchVersionFound) {
                    return Promise.reject(`Target patch version not found: ${processCache.targetVersion}; (Max found was: ${processCache.maxPatchVersionFound})`);
                }
            })
            .then(() => { // patch process start
                this.msg("PROCESS:SUMMARY", processCache); //TODO: do not pass processCache directly

                /* istanbul ignore else */
                if (processCache.sourceVersion === processCache.targetVersion) {
                    this.msg("PROCESS:NOTHING_TO_DO");
                    return;
                }

                /* istanbul ignore else */
                if (!processCache.patchData.patchRouteExists(processCache.sourceVersion, processCache.targetVersion)) {
                    return Promise.reject(`patch route could not be found`);
                }

                return this.loadSQLData(
                    processCache.sourceVersion, processCache.targetVersion, processCache.patchData.routeData
                ).then((transactionsData) => { //create SQL packages to run
                    this.msg("PROCESS:SQL_DATA_READY");
                    return this.startPatch(transactionsData);
                });
            })
            .catch(err => {
                processCache.error = err;
                this.msg("ERROR", err);
            })
            .then(() => {
                dbManager.closeIfNeeded();

                let msgData = {
                    dryRun: this.dryRun
                };

                if (processCache.error) {
                    this.msg("PROCESS_END:ERROR", msgData);
                    return Promise.reject(processCache.error);
                } else {
                    this.msg("PROCESS_END:SUCCESS", msgData);
                    return Promise.resolve();
                }
            });
    },
    startPatch: function (transactionsData) {
        if (this.transactionMode === transactionMode.PER_VERSION_STEP) {
            return this.patchInSeparateTransactions(transactionsData);
        } else if (this.transactionMode === transactionMode.SINGLE) {
            return this.patchInSingleTransaction(transactionsData);
        } else {
            return Promise.reject(`unknown transaction control mode: ${this.transactionControl}`);
        }
    },
    loadSQLData: function (sourceVersion, targetVersion, routeData) {
        let action = common.determineAction(sourceVersion, targetVersion);
        let versionSeq = common.generateVersionPatchSequence(sourceVersion, targetVersion);

        let versionSqlGetPromises = [];

        versionSeq.forEach(version => {
            let versionStepSqlDataPromises = [];
            let currentRouteDataParts = routeData[version][action];

            currentRouteDataParts.sort((a, b) => {
                return (a.description || '').localeCompare(b.description || '');
            });

            currentRouteDataParts.forEach((data) => {
                let newPromise;
                if (data.type === 'FILE') {
                    //TODO: refactor into loaders support
                    if (/\.js$/.test(data.fullPath)) {
                        newPromise = q(require(path.resolve(data.fullPath))());
                    } else {
                        newPromise = fsp.readFile(data.fullPath).then(readBuffer => {
                            return readBuffer.toString();
                        });
                    }
                } else /* istanbul ignore else */ if (data.type === 'CUSTOM') {
                    newPromise = q(data.sql);
                }

                versionStepSqlDataPromises.push(newPromise);
            });

            let sourceVersion, targetVersion;
            if (action === common.action.UPDATE) {
                sourceVersion = version - 1;
                targetVersion = version;
            } else /* istanbul ignore else */ if (action === common.action.ROLLBACK) {
                sourceVersion = version;
                targetVersion = version - 1;
            }

            versionSqlGetPromises.push(q.all(versionStepSqlDataPromises).then((versionStepSqlData) => {
                /* istanbul ignore else */
                if (this.transactionMode === transactionMode.PER_VERSION_STEP) {
                    versionStepSqlData.unshift('BEGIN;');
                    versionStepSqlData.push('COMMIT;');
                }
                return {
                    sourceVersion: sourceVersion,
                    targetVersion: targetVersion,
                    action: action,
                    sql: versionStepSqlData.join(';\n')
                };
            }));
        });

        return q.all(versionSqlGetPromises);
    },
    updatePatchHistory: function (sourceVersion, targetVersion) {
        let dbManager = this.db;
        return dbManager.updatePatchHistory(sourceVersion, targetVersion).then(() => {
            this.msg("PROCESS:DB_VERSION_UPDATED", targetVersion);
        });
    },
    patchInSeparateTransactions: function (transactionsData) {
        let patchChainPromise = q();
        let dbManager = this.db;

        transactionsData.forEach(transactionData => {
            let action = transactionData.action;
            let sourceVersion = transactionData.sourceVersion;
            let targetVersion = transactionData.targetVersion;

            patchChainPromise = patchChainPromise.then(() => {
                this.msg("PROCESS:TRANSACTION_START", {
                    action: action,
                    sourceVersion: sourceVersion,
                    targetVersion: targetVersion,
                    dryRun: this.dryRun
                });
                return dbManager.patchQuery(transactionData.sql);
            }).then(() => {
                return this.updatePatchHistory(sourceVersion, targetVersion);
            });
        });
        return patchChainPromise;
    },
    patchInSingleTransaction: function (transactionsData) {
        let sqlToRun = ['BEGIN;'];
        let action = transactionsData[0].action; //all other will be the same
        let sourceVersion = transactionsData[0].sourceVersion;
        let targetVersion = transactionsData[transactionsData.length - 1].targetVersion;
        let dbManager = this.db;

        transactionsData.forEach(transactionData => {
            sqlToRun.push(transactionData.sql);
        });

        if (this.dryRun === dryRun.TEST_SQL) {
            sqlToRun.push('ROLLBACK;');
        } else {
            sqlToRun.push('COMMIT;');
        }

        this.msg("PROCESS:TRANSACTION_START", {
            action: action,
            targetVersion: targetVersion,
            dryRun: this.dryRun,
            transactionMode: this.transactionMode
        });

        return dbManager.patchQuery(sqlToRun.join(';\n')).then(() => {
            /* istanbul ignore else */
            if (this.dryRun !== dryRun.TEST_SQL) {
                return this.updatePatchHistory(sourceVersion, targetVersion);
            }
        });
    }
};

module.exports = PgPatchProcess;