RackHD/on-tasks

View on GitHub
lib/jobs/base-job.js

Summary

Maintainability
F
2 wks
Test Coverage
// Copyright 2015, EMC, Inc.

'use strict';

var di = require('di');

module.exports = baseJobFactory;
di.annotate(baseJobFactory, new di.Provide('Job.Base'));
    di.annotate(baseJobFactory,
    new di.Inject(
        'Protocol.Events',
        'Protocol.Task',
        'Services.Encryption',
        'Services.Lookup',
        'Services.Messenger',
        'Constants',
        'Assert',
        'Util',
        'Promise',
        'Result',
        '_'
    )
);
function baseJobFactory(
    eventsProtocol,
    taskProtocol,
    encryption,
    lookup,
    messenger,
    Constants,
    assert,
    util,
    Promise,
    Result,
    _
) {
    function BaseJob(logger, options, context, taskId) {
        var self = this;
        assert.func(self._run);

        assert.object(logger);
        assert.uuid(taskId);
        assert.object(options);
        assert.object(context);

        self.subscriptions = [];
        self.subscriptionPromises = [];

        self.logger = logger;
        self.options = options;
        self.context = context;
        self.taskId = taskId;

        self._deferred = new Promise(function(resolve, reject) {
            self.resolve = resolve;
            self.reject = reject;
        });
    }

    BaseJob.prototype.isPending = function isPending() {
        return this._deferred.isPending();
    };

    /*
     * Add a cleanup() function if there are non-subscription
     * resources in the job that should be cleaned up on cancel().
     */
    BaseJob.prototype.cleanup = function() {
        var self = this;

        if (_.isFunction(self._cleanup)) {
            return Promise.resolve().then(function () {
                return self._cleanup();
            });
        } else {
            return Promise.resolve();
        }
    };

    BaseJob.prototype.runWrapper = function runWrapper() {
        var self = this;

        function before() {
            if (self.context.target) {
                return self._subscribeActiveTaskExists(function() {
                    return self.serialize();
                });
            } else {
                return Promise.resolve();
            }
        }

        return before().disposer(function() {
            return self.cleanup()
            .then(function() {
                // Handle resolution of created subscriptions in the case we cancel
                // while still initializing
                return Promise.all(self.subscriptionPromises);
            })
            .then(function() {
                return Promise.all(_.map(self.subscriptions, function(subscription) {
                    return subscription.dispose();
                }));
            });
        });
    };

    BaseJob.prototype.run = function run() {
        var self = this;

        return Promise.using(self.runWrapper(), function() {
            self.logger.debug("Running job.", {
                id: self.nodeId,
                module: self.logger.module,
                name: self.constructor.name,
                options: self.options,
                taskId: self.taskId,
                target: self.context.target || 'none'
            });

            self._run();
            return self._deferred;
        });
    };

    BaseJob.prototype.cancel = function(error) {
        var loggerObject = {
            taskId: this.taskId,
            name: this.constructor.name
        };
        if (error) {
            loggerObject.error = error;
        }
        this.logger.info("Stopping job.", loggerObject);
        return this._done(error);
    };

    BaseJob.prototype._done = function _done(error) {
        if (this._deferred.isPending()) {
            if (error) {
                this.reject(error);
            } else {
                this.resolve();
            }
           return this._deferred;
        }
    };

    // enables JSON.stringify(this)
    BaseJob.prototype.toJSON = function toJSON() {
        return this.serialize();
    };

    BaseJob.prototype.serialize = function serialize() {
        var redactKeys = ['subscriptions' ,'subscriptionPromises', 'logger', '_deferred'];
        return _.transform(this, function(result, v, k) {
            if (!_.contains(redactKeys, k)) {
                result[k] = v;
            }
        }, {});
    };

    BaseJob.prototype._subscribeActiveTaskExists = function _subscribeActiveTaskExists(callback) {
        var self = this;
        assert.func(callback);
        assert.string(self.context.target);
        var deferred = taskProtocol.subscribeActiveTaskExists(
                self.context.target, callback.bind(self));
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeRequestProfile = function _subscribeRequestProfile(callback) {
        var self = this;
        assert.isMongoId(self.nodeId);
        var deferred = taskProtocol.subscribeRequestProfile(self.nodeId, callback.bind(self));
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeRequestProperties = function _subscribeRequestProperties(callback) {
        var self = this;
        assert.isMongoId(self.nodeId);
        var deferred = taskProtocol.subscribeRequestProperties(self.nodeId, callback.bind(self));
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeRespondCommands = function _subscribeRespondCommands(callback) {
        var self = this;
        assert.isMongoId(self.nodeId);
        var deferred = taskProtocol.subscribeRespondCommands(self.nodeId, callback.bind(self));
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeRequestCommands = function _subscribeRequestCommands(callback) {
        var self = this;
        assert.isMongoId(self.nodeId);
        var deferred = taskProtocol.subscribeRequestCommands(self.nodeId, callback.bind(self));
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeHttpResponse = function _subscribeHttpResponse(callback) {
        var self = this;
        assert.isMongoId(self.nodeId);
        var deferred = eventsProtocol.subscribeHttpResponse(self.nodeId, callback.bind(self));
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeHttpResponseUuid = function _subscribeHttpResponseUuid(callback, id) {
        var self = this;
        //assert.isMongoId(self.nodeId);
        var deferred = eventsProtocol.subscribeHttpResponseUuid(id, callback.bind(self));
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeNodeNotification = function _subscribeNodeNotification(NodeId, callback) {
        var self = this;
        var deferred = eventsProtocol.subscribeNodeNotification(NodeId, callback.bind(self));
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeGraphFinished = function _subscribeGraphFinished(callback) {
        var self = this;
        assert.func(callback);
        assert.uuid(self.graphId);
        var deferred = eventsProtocol.subscribeGraphFinished(self.graphId, callback.bind(self));
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._publishSnmpCommandResult =
        function _publishSnmpCommandResult(uuid, data) {

        if (data && data.community) {
            data.community = encryption.encrypt(data.community);
        }

        return taskProtocol.publishSnmpCommandResult(uuid, data);
    };

    BaseJob.prototype._subscribeSnmpCommandResult =
        function _subscribeSnmpCommandResult(uuid, callback) {

        var self = this;

        var deferred = taskProtocol.subscribeSnmpCommandResult(uuid,
                self._runSnmpCommandCallback.bind(self, callback));

        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._runSnmpCommandCallback = function(callback, data) {
        var self = this;

        if (!data) {
            return;
        }
        if (data.community) {
            data.community = encryption.decrypt(data.community);
        }
        if (data.host && Constants.Regex.MacAddress.test(data.host)) {
            lookup.macAddressToIp(data.host)
            .catch(function(e) {
                self.logger.error("Failure looking up host IP for snmp poller", {
                    error: e
                });
            })
            .then(function(ipAddress) {
                if (!ipAddress) {
                    return;
                }
                data.host = ipAddress;
                callback.call(self, data);
            });
        } else {
            callback.call(self, data);
        }
    };

    BaseJob.prototype._subscribeRunSnmpCommand = function _subscribeRunSnmpCommand(uuid, callback) {
        var self = this;

        var deferred = taskProtocol.subscribeRunSnmpCommand(uuid, function(data) {
            if (data && data.community) {
                data.community = encryption.decrypt(data.community);
            }
            callback.call(self, data);
        });

        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._publishRunSnmpCommand = function _publishRunSnmpCommand(uuid, hostData) {
        return taskProtocol.publishRunSnmpCommand(uuid, hostData);
    };

    BaseJob.prototype._publishIpmiCommandResult =
        function _publishIpmiCommandResult(uuid, command, data) {

        if (data && data.password) {
            data.password = encryption.encrypt(data.password);
        }

        return taskProtocol.publishIpmiCommandResult(uuid, command, data);
    };

    BaseJob.prototype._subscribeIpmiCommandResult =
        function _subscribeIpmiCommandResult(uuid, command, callback) {

        var self = this;

        var deferred = taskProtocol.subscribeIpmiCommandResult(uuid, command, function(data) {
            if (data && data.password) {
                data.password = encryption.decrypt(data.password);
            }
            callback.call(self, data);
        });
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._publishMetricResult =
        function _publishMetricResult(uuid, metricName, data) {

        if (data && data.community) {
            data.community = encryption.encrypt(data.community);
        }
        if (data && data.password) {
            data.password = encryption.encrypt(data.password);
        }

        return taskProtocol.publishMetricResult(uuid, metricName, data);
    };

    BaseJob.prototype._subscribeMetricResult =
        function _subscribeMetricResult(uuid, metricName, callback) {

        var self = this;

        var deferred = taskProtocol.subscribeMetricResult(
            uuid,
            metricName,
            function(data, metricName) {
                if (data && data.community) {
                    data.community = encryption.encrypt(data.community);
                }
                if (data && data.password) {
                    data.password = encryption.encrypt(data.password);
                }
                callback.call(self, data, metricName);
            }
        );

        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeRunIpmiCommand =
        function _subscribeRunIpmiCommand(uuid, command, callback) {
        var self = this;

        var deferred = taskProtocol.subscribeRunIpmiCommand(
                uuid, command, self._runIpmiCommandCallback.bind(self, callback));

        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._runIpmiCommandCallback = function(callback, data) {
        var self = this;

        if (!data) {
            return;
        }
        if (data.password) {
            data.password = encryption.decrypt(data.password);
        }
        if (data.host && Constants.Regex.MacAddress.test(data.host)) {
            lookup.macAddressToIp(data.host)
            .catch(function(e) {
                self.logger.error("Failure looking up host IP for ipmi poller", {
                    error: e
                });
            })
            .then(function(ipAddress) {
                if (!ipAddress) {
                    return;
                }
                data.host = ipAddress;
                callback.call(self, data);
            });
        } else {
            callback.call(self, data);
        }
    };

    BaseJob.prototype._publishRunIpmiCommand =
        function _publishRunIpmiCommand(uuid, command, machine) {

        taskProtocol.publishRunIpmiCommand(uuid, command, machine);
    };

    BaseJob.prototype._publishPollerAlert = function _publishPollerAlert(data) {
        eventsProtocol.publishExternalEvent(data);
    };

    BaseJob.prototype._subscribeRequestPollerCache =
        function _subscribeRequestPollerCache(callback) {

        var self = this;

        var deferred = taskProtocol.subscribeRequestPollerCache(callback.bind(self));
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeGetBootProfile = function _subscribeGetBootProfile(callback) {
        var self = this;
        assert.isMongoId(self.nodeId);
        assert.func(callback);
        var deferred = taskProtocol.subscribeGetBootProfile(self.nodeId, callback.bind(self));
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeAnsibleCommand =
        function _subscribeAnsibleCommand(uuid, callback) {
        var self = this;

        var deferred = taskProtocol.subscribeAnsibleCommand(uuid,
                           self._ansibleCommandCallback.bind(self, callback));

        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._ansibleCommandCallback = function(callback, data) {
        var self = this;

        if (!data) {
            return;
        }

        callback.call(self, data);
    };

    BaseJob.prototype._publishAnsibleResult =
        function _publishAnsibleResult(uuid, data) {
        taskProtocol.publishAnsibleResult(uuid, data);
    };

    BaseJob.prototype._publishTrigger = function _publishTrigger(id, type, group) {
        return taskProtocol.publishTrigger(id, type, group);
    };

    BaseJob.prototype._subscribeTrigger = function _subscribeTrigger(id, type, group, callback) {
        var self = this;
        assert.func(callback);

        var deferred = taskProtocol.subscribeTrigger(id, type, group, callback.bind(self));

        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    BaseJob.prototype._subscribeFinishTrigger =
            function _subscribeFinishTrigger(id, group, callback) {
        var self = this;
        assert.func(callback);

        var deferred = taskProtocol.subscribeTrigger(id, 'finish', group, callback.bind(self));

        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    /**
     * @function _subscribeRedfishCommandResult
     * @description subscribe to amqp exchange to receive redfish command results
     */
    BaseJob.prototype._subscribeRedfishCommandResult =
        function subscribeRedfishCommandResult(uuid, command, callback) {
        var self = this;
        var deferred = messenger.subscribe(
            Constants.Protocol.Exchanges.Task.Name,
            ['redfish', 'command', command, 'result', uuid].join('.'),
            function(data) {
                callback(data.value);
            }
        );
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    /**
     * @function _publishRedfishCommandResult
     * @description publish the command result to an amqp exchange
     */
    BaseJob.prototype._publishRedfishCommandResult =
        function _publishRedfishCommandResult (uuid, command, result) {
        return messenger.publish(
            Constants.Protocol.Exchanges.Task.Name,
            ['redfish', 'command', command, 'result', uuid].join('.'),
            new Result({ value: result })
        );
    };

    /**
     * @function _subscribeRedfishCommand
     * @description subscribe and wait for a request from the redfish command exchange
     */
    BaseJob.prototype._subscribeRedfishCommand =
        function _subscribeRedfishCommand (uuid, callback) {
        var self = this;
        var deferred = messenger.subscribe(
            Constants.Protocol.Exchanges.Task.Name,
            'run.redfish.command' + '.' + uuid,
            function(data) {
                callback(data.value);
            }
        );
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    /**
     * @function _publishRunRedfishCommand
     * @description publish message on amqp to execute a redfish command
     */
    BaseJob.prototype._publishRunRedfishCommand =
        function _publishRunRedfishCommand (uuid, hostData) {
        return messenger.publish(
            Constants.Protocol.Exchanges.Task.Name,
            'run.redfish.command' + '.' + uuid,
            new Result({ value: hostData })
        );
    };

    /**
     * @function _subscribeWsmanCommand
     * @description subscribe and wait for a request from the wsman command exchange 
     */
    BaseJob.prototype._subscribeWsmanCommand =
        function _subscribeWsmanCommand (uuid, callback) {
        var self = this;
        var deferred = messenger.subscribe(
            Constants.Protocol.Exchanges.Task.Name,
            'run.wsman.command' + '.' + uuid,
            function(data) {
                callback(data.value);
            }
        );
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    /**
     * @function _publishRunWsmanCommand
     * @usbscribe to message bus for wsman poller command to execute 
     */
    BaseJob.prototype._publishRunWsmanCommand =
        function _publishRunWsmanCommand (uuid, hostData) {
        return messenger.publish(
            Constants.Protocol.Exchanges.Task.Name,
            'run.wsman.command' + '.' + uuid,
            new Result({ value: hostData })
        );
    };
    
    /**
     * @function _subscribeWsmanCommandResult
     * @description subscribe to amqp exchange to receive wsman command results and 
     * distribute with HTTP APIs and other modules
     */
    BaseJob.prototype._subscribeWsmanCommandResult =
        function subscribeWsmanCommandResult(uuid, command, callback) {
        var self = this;
        var deferred = messenger.subscribe(
            Constants.Protocol.Exchanges.Task.Name,
            ['wsman', 'command', command, 'result', uuid].join('.'),
            function(data) {
                callback(data.value);
            }
        );
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };
    
      /**
     * @function _publishWsmanCommandResult
     * @description publish message on amqp to execute a wsman command result
     */
    
    BaseJob.prototype._publishWsmanCommandResult = function _publishWsmanCommandResult (uuid, command, result) {
        return messenger.publish( Constants.Protocol.Exchanges.Task.Name,
            ['wsman', 'command', command, 'result', uuid].join('.'),
            new Result({ value: result })
        );
    };

    BaseJob.prototype._subscribeSelEvent = function _subscribeSelEvent(severity, pollerId, callback) {
        var self = this;
        var deferred = eventsProtocol.subscribeSelEvent(
            severity, pollerId, self.nodeId, callback.bind(self)
        );
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    /**
     * @function _subscribeRunUcsCommand
     * @description subscribe and wait for a request from the ucs command exchange.
     **/
    BaseJob.prototype._subscribeRunUcsCommand = function(uuid, callback) {
        var self = this;
        var deferred = messenger.subscribe(
            Constants.Protocol.Exchanges.Task.Name,
            'run.ucs.command.' + uuid,
            function(data){
                callback.call(self, data.value);
            });
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    /**
     * @function _subscribeUcsCommandResult
     * @description subscribe from amqp exchange to receive ucs command results from ucs job.
     **/
    BaseJob.prototype._subscribeUcsCommandResult = function(uuid, command, callback) {
        var self = this;
        var deferred = messenger.subscribe(
            Constants.Protocol.Exchanges.Task.Name,
            ['ucs', 'command', command, 'result', uuid].join('.'),
            function(data) {
                callback.call(self, data.value);
            }
        );
        self.subscriptionPromises.push(deferred);
        return deferred.then(function(subscription) {
            self.subscriptions.push(subscription);
        });
    };

    /**
     * @function _publishRunUcsCommand
     * @description publish message to ucs job to process.
     **/
    BaseJob.prototype._publishRunUcsCommand =
    function (uuid, hostData) {
        return messenger.publish(
            Constants.Protocol.Exchanges.Task.Name,
            'run.ucs.command.' + uuid,
            new Result({ value: hostData })
        );
    };

    /**
     * @function _publishUcsCommandResult
     * @description publish message which has been processed by ucs job to amqp.
     **/
    BaseJob.prototype._publishUcsCommandResult = function(uuid, command, result) {
        return messenger.publish(Constants.Protocol.Exchanges.Task.Name,
            ['ucs', 'command', command, 'result', uuid].join('.'),
            new Result({value: result})
        );
    };

    return BaseJob;
}