schahriar/supertask

View on GitHub
lib/SuperTask.js

Summary

Maintainability
B
4 hrs
Test Coverage
"use strict";

/// Required Core Modules
const vm = require('vm');
const eventEmmiter = require('events').EventEmitter;
///
/// External Modules
const Deque = require('double-ended-queue');
///
/// Internal Modules
const TaskObject = require('./TaskObject');
///

class SuperTask extends eventEmmiter { 
    constructor(size) {
        // Call eventEmmiter constructor
        super();

        this._busy_ = false;
        this.map = new Map();
        this.queue = new Deque(parseInt(size, 10) || 10000);
        this.concurrency = 1000;

        eventEmmiter.call(this);
    }

    _next() {
        // Allow _next_ to be called
        this._busy_ = false;
        
        /* At this point the source is fully compiled
        /* to a function or a function is resupplied
        /* from cache to be executed after attaching
        /* the post tracker */
        
        // Do a batch
        for (let i = 0; i < Math.min(this.queue.length, this.concurrency); i++) {
            // Get a job
            let job = this.queue.shift();
            
            // Create a tracker
            let postTracker = function(error) {
                // Calculate High Resolution time it took to run the function
                job.task.lastFinished = process.hrtime(job.task.lastStarted);
                // Calculate Time Difference
                job.task.lastDiff = job.task.lastFinished[0] * 1e9 + job.task.lastFinished[1];
                // Calculate Average Execution Time
                if (job.task.averageExecutionTime === -1) {
                    job.task.averageExecutionTime = job.task.lastDiff;
                } else {
                    job.task.averageExecutionTime = ((job.task.averageExecutionTime * job.task.executionRounds) + job.task.lastDiff) / (job.task.executionRounds + 1);
                }
                // Bump execution rounds
                job.task.executionRounds++;

                job.callback.apply(null, arguments);
            };

            // Assign lastStarted
            job.task.lastStarted = process.hrtime();
            // Push Callback to args
            job.args.push(postTracker);
            // If task is shared call handler
            if (job.task.shared && job.task.handler) {
                // Assign job to argument 0 and call handler
                job.handler(job);
            } else {
                // Call local/remote Function with context & args
                if (job.task.sandboxed) {
                    try {
                        job.task.func.apply(job.context, job.args);
                    } catch (error) {
                        postTracker(error);
                    }
                } else {
                    job.task.func.apply(job.context, job.args);
                }
            }
        }
        
        // Keep executing until Queue is empty
        if (this.queue.length > 0) {
            // Do after I/O & Tasks are cleared
            setImmediate(() => { this._next(); });
        }
    }

    _push(job) {
        // Push Job to Queue
        this.queue.push(job);
        // Batch Queue items into one call
        if (!this._busy_) {
            this._busy_ = true;
            setImmediate(() => {
                this._next();
            });
        }
    }

    _addTask(name, func, handler, type) {
        // Make sure Map Key is not taken
        if (this.map.has(name)) {
            throw new Error('Enable to create new task. A Task with the given name already exists.');
        }
        
        // Add ST_DO function to the back with current context
        // & Create task
        let task = TaskObject.create(this, name, func, handler, type);
        // Add Task's model to Map
        this.map.set(name, task.model);

        return task;
    }

    _compile(task, context) {        
        // Check if script is not compiled
        if (typeof task.func === 'string') {
            // Compile script using VM
            task.func = new vm.Script(task.func);
        }else if (typeof task.func === 'function') {
            // If script is already compiled return it
            return task.func;
        }
        // Make sure we can call run on the compiled script
        if (typeof task.func.runInContext !== 'function') {
            throw new Error("Unknown Error Occurred. Function property of Task is invalid or failed to compile.");
        }
        // Define module.exports and exports in context
        if (!context) context = {};
        context.module = {};
        context.exports = context.module.exports = {};
        
        // Create VM Context from context object
        vm.createContext(context);
        // Run Compiled Script
        task.func.runInContext(context);

        if (task.isModule) {
            // Make sure module.exports is set to a function
            // after script is run. Similar to how require(...)
            // modules work.
            if (typeof context.module.exports === 'function') {
                // Cache and Call the function
                task.func = context.module.exports;
                // Set isCompiled property to prevent recompilation
                task.isCompiled = true;

                return task.func;
            } else {
                throw new Error("Compiled Script is not a valid foreign task or module. Failed to identify module.exports as a function.");
            }
        } else {
            return task.func;
        }
    }
}

module.exports = SuperTask;