e-ucm/rage-analytics-backend

View on GitHub
lib/services/storm.js

Summary

Maintainability
B
5 hrs
Test Coverage
'use strict';

var Q = require('q');
var shell = require('shelljs');
var Fs = require('fs');
var Handlebars = require('handlebars');

var storm = function (stormConfig, zookeeperUrl, kafkaUrl) {

    var genYMLAndLaunch = function (src, ymlPath, stormJar, config) {
        var deferred = Q.defer();

        var configTemplate = Handlebars.compile(src);
        Fs.writeFile(ymlPath, configTemplate(config), function (err) {
            if (err) {
                var retErr = {
                    message: 'Failed to write' + ymlPath + ' file, ' + err,
                    code: 0
                };
                return deferred.reject(retErr);
            }

            // Execute Topology
            var command = stormConfig.path + '/storm jar ' + stormJar +
                ' -c \'nimbus.seeds=["nimbus"]\' org.apache.storm.flux.Flux --remote ' + ymlPath;
            console.log('Shell executing command', command);
            shell.exec(command,
                {async: true},
                function (code, stdout, stderr) {
                    if (code !== 0) {
                        return deferred.reject({
                            message: 'Failed to end topology, code: ' + code +
                            ', stderr: ' + stderr + ', stdout: ' + stdout,
                            code: code
                        });
                    }
                    if (stderr) {
                        return deferred.reject({
                            message: 'Failed to end topology but exit code was 0, stderr: ' +
                            stderr + ', stdout: ' + stdout
                        });
                    }
                    deferred.resolve(stdout);
                });
        });

        return deferred.promise;
    };

    return {
        startTopology: function (topologyName, analysisFolder, kafkaTopicName) {
            var config = {
                topologyName: topologyName,
                kafkaTopicName: kafkaTopicName,
                zookeeperURL: zookeeperUrl,
                kafkaURL: kafkaUrl,
                elasticsearchUrl: stormConfig.elasticsearchHost
            };
            var fsOptions = {
                encoding: 'utf-8'
            };

            var tryLoadYAML = function(ymlPath) {
                var defyml = Q.defer();

                Fs.readFile(ymlPath, fsOptions, function (err, src) {
                    if (err) {
                        // If the default flux.yml file couldn't be read, return an error;
                        return defyml.reject(new Error('Failed to read ' + ymlPath));
                    }

                    defyml.resolve(src);
                });

                return defyml.promise;
            };
            // Check if the files in the uploaded /analysis folder exist
            // If they exist, use them (expected: realtime.jar and flux.yaml)

            function startAnalysis(folder) {
                var ymlpath = folder + 'flux.yml';

                // If the uploaded flux.yml file could not be read, try to start the default topology
                return tryLoadYAML(ymlpath)
                    .then(function(src) {
                        return genYMLAndLaunch(src, folder + 'fluxinstance.yml', folder + 'realtime.jar', config);
                    });
            }


            return startAnalysis(analysisFolder);
        },
        endTopology: function (sessionId) {
            var deferred = Q.defer();
            shell.exec(stormConfig.path + '/storm kill ' + sessionId.toString(),
                {async: true},
                function (code, stdout, stderr) {
                    if (code !== 0) {
                        return deferred.reject(new Error('Failed to end topology, code: ' + code +
                            ', stderr: ' + stderr + ', stdout: ' + stdout));
                    }
                    if (stderr) {
                        return deferred.reject(new Error('Failed to end topology but exit code was 0, stderr: ' +
                            stderr + ', stdout: ' + stdout));
                    }
                    deferred.resolve();
                });
            return deferred.promise;
        }
    };
};

module.exports = storm;