RackHD/on-taskgraph

View on GitHub
lib/services/workflow-api-service.js

Summary

Maintainability
D
3 days
Test Coverage
// Copyright © 2016-2017 Dell Inc. or its subsidiaries.  All Rights Reserved.

'use strict';

var di = require('di');

module.exports = workflowApiServiceFactory;
di.annotate(workflowApiServiceFactory, new di.Provide('Http.Services.Api.Workflows'));
di.annotate(workflowApiServiceFactory,
    new di.Inject(
        'Protocol.TaskGraphRunner',
        'TaskGraph.Store',
        'Services.Waterline',
        'TaskGraph.TaskGraph',
        'Protocol.Events',
        'Logger',
        'Errors',
        'Promise',
        'Constants',
        '_',
        'Services.Environment',
        'Services.Lookup',
        'Services.GraphProgress'
    )
);

function workflowApiServiceFactory(
    taskGraphProtocol,
    taskGraphStore,
    waterline,
    TaskGraph,
    eventsProtocol,
    Logger,
    Errors,
    Promise,
    Constants,
    _,
    env,
    lookupService,
    graphProgressService
) {
    var logger = Logger.initialize(workflowApiServiceFactory);

    function WorkflowApiService() {
    }

    WorkflowApiService.prototype.createAndRunGraph = function(configuration, nodeId) {
        var self = this;
        return Promise.try(function() {
            if (!configuration.name || !_.isString(configuration.name)) {
                throw new Errors.BadRequestError('Graph name is missing or in wrong format');
            }
        })
        .then(function() {
            if (nodeId) {
                return waterline.nodes.needByIdentifier(nodeId)
                .then(function(node) {
                    if(node.sku) {
                        return [node, env.get("config." + configuration.name, configuration.name,
                            [node.sku,  Constants.Scope.Global])];
                    }
                    return [node, configuration.name];
                }).spread(function(node, name) {
                    return [
                        self.findGraphDefinitionByName(name),
                        taskGraphStore.findActiveGraphForTarget(node.id),
                        node
                    ];
                });
            } else {
                return [self.findGraphDefinitionByName(configuration.name), null, null];
            }
        })
        .spread(function(definition, activeGraph, node) {
            if (activeGraph) {
                throw new Error("Unable to run multiple task graphs against a single target.");
            }
            var context = configuration.context || {};
            return Promise.resolve().then(function() {
                if(node) {
                    context = _.defaults(context, { target: node.id });
                    return lookupService.nodeIdToProxy(node.id)
                    .catch(function(error) {
                        // allow the proxy lookup to fail since not all nodes
                        // wanting to run a workflow may have an entry
                        logger.error('nodeIdToProxy Lookup', {error:error});
                    });
                } else {
                    return undefined;
                }
            }).then(function(proxy) {
                if(proxy) {
                    context.proxy = proxy;
                }
                return self.createActiveGraph(
                        definition, configuration.options, context, configuration.domain, true);
            });
        })
        .tap(function(graph) {
            return eventsProtocol.publishGraphStarted(graph.instanceId, {
                graphId: graph.instanceId,
                graphName: graph.name,
                status: graph._status
            }, graph.node)
            .catch(function(error) {
                logger.error('Error publishing graph started event', {
                    graphId: graph.instanceId,
                    _status: graph._status,
                    error: error
                });
            });
        })
        .tap(function(graph) {
            return graphProgressService.publishGraphStarted(graph, {swallowError: true});
        })
        .then(function(graph) {
            self.runTaskGraph(graph.instanceId, configuration.domain);
            return graph;
        });
    };

    WorkflowApiService.prototype.findGraphDefinitionByName = function(graphName) {
        return taskGraphStore.getGraphDefinitions(graphName)
        .then(function(graph) {
            if (_.isEmpty(graph)) {
                throw new Errors.NotFoundError('Graph definition not found for ' + graphName);
            } else {
                return graph[0];
            }
        });
    };

    WorkflowApiService.prototype.createActiveGraph = function(
            definition, options, context, domain) {
        return this.createGraph(definition, options, context, domain)
        .then(function(graph) {
            graph._status = Constants.Task.States.Running;
            return graph.persist();
        });
    };

    WorkflowApiService.prototype.createGraph = function(definition, options, context, domain) {
        domain = domain || Constants.DefaultTaskDomain;
        return Promise.resolve()
        .then(function() {
            return TaskGraph.create(domain, {
                definition: definition,
                options: options || {},
                context: context
            });
        })
        .catch(function(error) {
            logger.error('createGraph fails', {
                definition: definition,
                options: options,
                error: error
            });
            if (!error.status) {
                var badRequestError = new Errors.BadRequestError(error.message);
                badRequestError.stack = error.stack;
                throw badRequestError;
            }
            throw error;
        });
    };

    WorkflowApiService.prototype.runTaskGraph = function(graphId, domain) {
        return taskGraphProtocol.runTaskGraph(graphId, domain)
        .catch(function(error) {
            logger.error('Error publishing event to run task graph', {
                error: error,
                graphId: graphId,
                domain: domain
            });
        });
    };

    WorkflowApiService.prototype.cancelTaskGraph = function(graphId) {
        return waterline.graphobjects.needOne({ instanceId: graphId })
        .then(function(workflow) {
            if (!workflow.active()) {
                throw new Errors.TaskCancellationError(
                    graphId + ' is not an active workflow'
                );
            }

            return taskGraphProtocol.cancelTaskGraph(graphId);
        });
    };

    WorkflowApiService.prototype.deleteTaskGraph = function(graphId) {
        // Taskgraph deletion sequence:
        // 1) Get the graph object by ID
        // 2) Check if the returned workflow is running.
        // 3) If it is running, throw an error. Otherwise go on to step 4.
        // 4) Delete the graph object from the task graph store.
        return waterline.graphobjects.needOne({ instanceId: graphId })
        .then(function(workflow) {
            if (workflow.active()) {
                throw new Errors.ForbiddenError(
                    'Forbidden to delete an active workflow ' + graphId);
            }
            return taskGraphStore.deleteGraph(graphId);
        })
        .then(_.first);
    };

    WorkflowApiService.prototype.defineTaskGraph = function(definition) {
        // Do validation before persisting a definition
        return TaskGraph.validateDefinition(Constants.DefaultTaskDomain, { definition: definition })
        .then(function() {
            return taskGraphStore.persistGraphDefinition(definition);
        })
        .then(function(definition) {
            return definition.injectableName;
        })
        .catch(function(error) {
            logger.error('defineTaskGraph fails', {
                definition: definition,
                error: error
            });
            if (!error.status) {
                var badRequestError = new Errors.BadRequestError(error.message);
                badRequestError.stack = error.stack;
                throw badRequestError;
            }
            throw error;
        });
    };

    WorkflowApiService.prototype.defineTask = function(definition) {
        return taskGraphStore.persistTaskDefinition(definition);
    };

    WorkflowApiService.prototype.getWorkflowsTasksByName = function(injectableName) {
        return taskGraphStore.getTaskDefinitions(injectableName);
    };

    WorkflowApiService.prototype.deleteWorkflowsTasksByName = function(injectableName) {
        return taskGraphStore.getTaskDefinitions(injectableName)
            .then(function (task){
                if(_.isEmpty(task)){
                    throw new Errors.NotFoundError(
                        'Task definition not found for ' + injectableName
                    );
                }else{
                    return taskGraphStore.deleteTaskByName(injectableName);
                }
            });
    };

    WorkflowApiService.prototype.putWorkflowsTasksByName = function(definition, injectableName) {
        return taskGraphStore.getTaskDefinitions(injectableName)
            .then(function (task){
                if(_.isEmpty(task)){
                    throw new Errors.NotFoundError(
                        'Task definition not found for ' + injectableName
                    );
                }else{
                    return taskGraphStore.persistTaskDefinition(definition);
                }
            });
    };

    WorkflowApiService.prototype.getGraphDefinitions = function(injectableName) {
        return taskGraphStore.getGraphDefinitions(injectableName);
    };

    WorkflowApiService.prototype.getTaskDefinitions = function(injectableName) {
        return taskGraphStore.getTaskDefinitions(injectableName);
    };

    WorkflowApiService.prototype.findActiveGraphForTarget = function(target) {
        return waterline.graphobjects.findOne({
            node: target,
            _status: Constants.Task.ActiveStates
        });
    };

    WorkflowApiService.prototype.getWorkflowsByNodeId = function(id, query) {
        var nodeId = ({node: id});
        var mergedQuery = _.merge({}, nodeId, query);
        return waterline.graphobjects.find(mergedQuery);
    };

    WorkflowApiService.prototype.getAllWorkflows = function(query, options) {
        options = options || {};

        return Promise.try(function() {
            query = waterline.graphobjects.find(query);

            if (options.skip) { query.skip(options.skip); }
            if (options.limit) { query.limit(options.limit); }

            return query;
        });
    };

    WorkflowApiService.prototype.getWorkflowByInstanceId = function(instanceId) {
        return waterline.graphobjects.needOne({ instanceId: instanceId });
    };

    WorkflowApiService.prototype.destroyGraphDefinition = function(injectableName) {
        return taskGraphStore.destroyGraphDefinition(injectableName);
    };

    return new WorkflowApiService();
}