gideonairex/disqueue-node

View on GitHub
lib/disqueue-client.js

Summary

Maintainability
A
0 mins
Test Coverage
'use strict';

/*eslint-disable no-constant-condition*/

var redis        = require('redis');
var Parser       = require( 'redis-parser' );
var Queue        = require( 'double-ended-queue' );
var _            = require( 'lodash' );
var EventEmitter = require( 'events' ).EventEmitter;
var util         = require( 'util' );

var CommandObj = require( './command' );
var commands   = require( './commands' );

/*
 * Default config
 *
 */
var defaultPort = 7711;
var defaultHost = '127.0.0.1';

/**
 * Constants
 */
var bufStar = Buffer.from('*', 'ascii');
var bufDollar = Buffer.from('$', 'ascii');
var bufCrlf = Buffer.from('\r\n', 'ascii');

/*
 * Debugger
 *
 */
function debug ( message ) {

    if ( exports.debugMode ) {
        console.log( message );
    }

}

exports.debugMode = /\bdisque\b/i.test( process.env.NODE_DEBUG );

function DisqueClient ( netOptions, options ) {

    debug( 'Construct disqueClient' );

    var self = this;

    this.options = options || {};
    this.initRetry();

    this.connection = netOptions;
    this.connection.retry_strategy = function retry_strategy(strategyOptions) {
        if (strategyOptions.attempt > self.retryMax) {
            var e = new Error('retryMax exceeded');
            self.onError(e);
            return e;
        }

        return self.retryDelay;
    };

    this.address = [ netOptions.host, netOptions.port ].join(':');
    this.commandQueue = new Queue();

    this.redisClient = redis.createClient(this.connection);
    this.netStream = this.redisClient.stream;
    this.attachStreams();

}

util.inherits( DisqueClient, EventEmitter );

/**
 * Retry configurations
 *
 */
DisqueClient.prototype.initRetry = function () {
    this.retryMax   = this.options.retryMax || 5;
    this.retryDelay = this.options.retryDelay || 200;
};

/**
 * Attach streams
 *
 */
DisqueClient.prototype.attachStreams = function () {

    var self = this;

    this.redisClient.on( 'ready', function () {
        self.onConnect();
    });

    this.redisClient.on('error', function (error) {
        self.onError( error );
    });

    this.redisClient.on( 'end', function () {
        self.connectionGone( 'end' );
    } );

    this.netStream.on( 'drain', function () {
        self.drain();
    } );

};

/**
 * 'connect'
 *
 */
DisqueClient.prototype.onConnect = function () {

    debug( 'Connected' );

    var self = this;

    self.initParser();

    self.connected = true;
    self.emit('connected');

    if ( self.options.auth ) {

        debug( 'Authenticating' );

        self.auth( {
            'password' : self.options.auth.password
        }, function ( error ) {

            if ( error ) {
                throw ( error );
            }

            self.onReady();

        } );

    } else {

        self.onReady();

    }
};

/**
 * 'ready'
 *
 *
 */
DisqueClient.prototype.onReady = function () {

    this.ready      = true;
    this.emit( 'ready' );

};

/**
 * Initialize redis-parser
 *
 */
DisqueClient.prototype.initParser = function () {

    debug( 'Initialize parser' );

    var self = this;

    self.parser = new Parser( {
        'return_buffers' : self.returnBuffers || false,
        'returnReply' : function (reply) {
            self.sendReply(reply);
        },
        'returnError' : function (error) {
            self.sendError(error);
        }
    } );

    self.redisClient.reply_parser = self.parser;
};

/**
 * redis-parser error and reply handlers
 *
 */
DisqueClient.prototype.sendError = function ( error ) {

    var commandCreated = this.commandQueue.shift();
    commandCreated.callback( error );

};

DisqueClient.prototype.sendReply = function ( reply ) {

    var commandCreated = this.commandQueue.shift();
    commandCreated.callback( null, reply );

};

/**
 * send command
 *
 */
DisqueClient.prototype.sendCommand = function ( commandCreated ) {

    var self = this;

    commandCreated.collate( function ( error, collatedCommand ) {

        if ( error ) {
            return commandCreated.callback( error );
        }

        self.commandQueue.push( commandCreated );
        self.write( collatedCommand );

    } );

};

/**
 * write
 *
 */
DisqueClient.prototype.write = function (collatedCommand) {
    var bufLen = Buffer.from(String(collatedCommand.length), 'ascii');
    var parts = [ bufStar, bufLen, bufCrlf ];
    var size = 3 + bufLen.length;

    for (var i = 0; i < collatedCommand.length; i++) {
        var arg = collatedCommand[ i ];
        if (!Buffer.isBuffer(arg)) {
            arg = Buffer.from(String(arg));
        }

        bufLen = Buffer.from(String(arg.length), 'ascii');
        parts = parts.concat([ bufDollar, bufLen, bufCrlf, arg, bufCrlf ]);
        size += 5 + bufLen.length + arg.length;
    }

    this.netStream.write.apply( this.netStream, [ Buffer.concat(parts, size) ]);
};

/**
 * 'error'
 *
 */
DisqueClient.prototype.onError = function ( error ) {

    debug( error );
    this.emit('error', error);

};

/**
 * create commands
 *
 */
_( commands ).forEach( function ( commandMeta ) {

    DisqueClient.prototype[ commandMeta.command ] = function ( options, callback ) {
        var commandCreated = new CommandObj( commandMeta, options, callback );
        this.sendCommand( commandCreated );
    };

} );

/**
 * 'retry'
 *
 */
var retry = function ( self ) {

    self.redisClient = redis.createClient(self.connection);
    self.netStream = self.redisClient.stream;
    self.attachStreams();

};

/**
 * 'close' or 'end'
 *
 */
DisqueClient.prototype.connectionGone = function ( reason ) {

    debug( [ 'Connection gone reason:', reason ].join( ' ' ) );

    this.connected = false;
    this.ready     = false;

};

/**
 * 'drain'
 *
 */
DisqueClient.prototype.drain = function () {
    this.emit( 'drain' );
};

/*
 * Create TCP connection
 *
 */
function createConnection ( options ) {

    debug( 'Create connection' );

    options = options || {};

    var netOptions = {
        'port' : options.port || defaultPort,
        'host' : options.host || defaultHost
    };

    var disqueClient = new DisqueClient( netOptions, options );

    return disqueClient;

}

module.exports = createConnection;