wikimedia/mediawiki-extensions-UploadWizard

View on GitHub
resources/uw.ConcurrentQueue.js

Summary

Maintainability
A
1 hr
Test Coverage
( function ( uw ) {

    /**
     * A queue that will execute the asynchronous function `action` for each item in the queue in
     * order, taking care not to allow more than `count` instances to be executing at the same time.
     *
     * Items can be added or removed (#addItem, #removeItem) while the queue is already being
     * executed.
     *
     * @class
     * @mixes OO.EventEmitter
     * @param {Object} options
     * @param {Function} options.action Action to execute for each item, must return a Promise
     * @param {number} options.count Number of functions to execute concurrently
     */
    uw.ConcurrentQueue = function UWConcurrentQueue( options ) {
        OO.EventEmitter.call( this );

        this.count = options.count;
        this.action = options.action;

        this.queued = [];
        this.running = [];
        this.done = [];
        this.runningPromises = [];

        this.completed = false;
        this.executing = false;
    };
    OO.initClass( uw.ConcurrentQueue );
    OO.mixinClass( uw.ConcurrentQueue, OO.EventEmitter );

    /**
     * A 'progress' event is emitted when one of the functions' promises is resolved or rejected.
     *
     * @event uw.ConcurrentQueue.progress
     */

    /**
     * A 'complete' event is emitted when all of the functions' promises have been resolved or rejected.
     *
     * @event uw.ConcurrentQueue.complete
     */

    /**
     * A 'change' event is emitted when an item is added to or removed from the queue.
     *
     * @event uw.ConcurrentQueue.change
     */

    /**
     * Add an item to the queue.
     *
     * @param {Object} item
     * @return {boolean} true
     */
    uw.ConcurrentQueue.prototype.addItem = function ( item ) {
        this.queued.push( item );
        this.emit( 'change' );
        if ( this.executing ) {
            this.executeNext();
        }
        return true;
    };

    /**
     * Remove an item from the queue.
     *
     * While it's possible to remove an item that is being executed, it doesn't stop the execution.
     *
     * @param {Object} item
     * @return {boolean} Whether the item was removed
     */
    uw.ConcurrentQueue.prototype.removeItem = function ( item ) {
        var index, found;

        found = false;

        index = this.queued.indexOf( item );
        if ( index !== -1 ) {
            this.queued.splice( index, 1 );
            found = true;
        }

        index = this.done.indexOf( item );
        if ( index !== -1 ) {
            this.done.splice( index, 1 );
            found = true;
        }

        index = this.running.indexOf( item );
        if ( index !== -1 ) {
            // Try aborting the promise if possible
            if ( this.runningPromises[ index ].abort ) {
                this.runningPromises[ index ].abort();
            }
            this.running.splice( index, 1 );
            this.runningPromises.splice( index, 1 );
            found = true;
        }

        if ( found ) {
            this.emit( 'change' );
            this.checkIfComplete();
        }

        // Ensure we're still using as many threads as requested
        this.executeNext();

        return found;
    };

    /**
     * @private
     * @param {Object} item
     */
    uw.ConcurrentQueue.prototype.promiseComplete = function ( item ) {
        var index;
        index = this.running.indexOf( item );
        // Check that this item wasn't removed while it was being executed
        if ( index !== -1 ) {
            this.running.splice( index, 1 );
            this.runningPromises.splice( index, 1 );
            this.done.push( item );
            this.emit( 'progress' );
        }

        this.checkIfComplete();

        this.executeNext();
    };

    /**
     * @private
     */
    uw.ConcurrentQueue.prototype.executeNext = function () {
        var item, promise;
        if ( this.running.length >= this.count || !this.executing ) {
            return;
        }
        item = this.queued.shift();
        if ( !item ) {
            return;
        }

        this.running.push( item );
        promise = this.action.call( null, item );
        this.runningPromises.push( promise );
        promise.always( this.promiseComplete.bind( this, item ) );
    };

    /**
     * Start executing the queue. If the queue is already executing, do nothing.
     *
     * When the queue finishes executing, a 'complete' event will be emitted.
     */
    uw.ConcurrentQueue.prototype.startExecuting = function () {
        var i;
        if ( this.executing ) {
            return;
        }
        this.completed = false;
        this.executing = true;
        for ( i = 0; i < this.count; i++ ) {
            this.executeNext();
        }
        // In case the queue was empty
        this.checkIfComplete();
    };

    /**
     * Abort executing the queue. Remove all queued items and abort running ones.
     */
    uw.ConcurrentQueue.prototype.abortExecuting = function () {
        while ( this.queued.length > 0 ) {
            this.removeItem( this.queued[ 0 ] );
        }
        while ( this.running.length > 0 ) {
            this.removeItem( this.running[ 0 ] );
        }
    };

    /**
     * @private
     */
    uw.ConcurrentQueue.prototype.checkIfComplete = function () {
        if ( this.running.length === 0 && this.queued.length === 0 ) {
            if ( !this.completed ) {
                this.completed = true;
                this.executing = false;
                this.emit( 'complete' );
            }
        }
    };

}( mw.uploadWizard ) );