jmdobry/waterline-rethinkdb

View on GitHub
lib/connection.js

Summary

Maintainability
B
7 hrs
Test Coverage
'use strict';

var _ = require('underscore'),
    gPool = require('generic-pool'),
    async = require('async'),
    r = require('rethinkdb');

/**
 * Default configuration.
 * @type {{
 * port: number,
 * host: string,
 * db: string,
 * authKey: string,
 * max: number,
 * min: number,
 * log: boolean|function,
 * idleTimeoutMillis: number,
 * refreshIdle: boolean,
 * reapIntervalMillis: number
 * }}
 */
var defaults = {
    // Default RethinkDB client configuration
    port: 28015,
    host: 'localhost',
    db: 'test',
    authKey: '',

    // Default node-pool configuration
    // I peaked at 2.3k inserts/second in loadTest.js with ~500 connections
    max: 20,
    min: 1,
    log: false,
    idleTimeoutMillis: 30000,
    refreshIdle: true,
    reapIntervalMillis: 1000
};

/**
 * Create and configure a new node-pool instance.
 * @param {object} options Configuration options for the new instance.
 * @returns {object} New instance of node-pool.
 */
function createPool(options) {

    //noinspection JSValidateTypes
    return gPool.Pool({
        name: options.name,
        max: options.max,
        min: options.min,
        idleTimeoutMillis: options.idleTimeoutMillis,
        log: options.log,
        refreshIdle: options.refreshIdle,
        reapIntervalMillis: options.reapIntervalMillis,
        priorityRange: 10,
        create: function (cb) {
            r.connect({
                host: options.host,
                port: options.port,
                db: options.db,
                authKey: options.authKey
            }, cb);
        },
        destroy: function (conn) {
            if (conn) {
                conn.close();
            }
        }
    });
}

/**
 * Constructs a new Connection instance.
 *
 * A Connection instance manages a pool of connections and provides a low-level abstraction for executing
 * pre-defined queries.
 *
 * @param {object} options Configuration options for the new instance.
 * @constructor
 */
function Connection(options) {

    ///////////////////////
    // Private variables //
    ///////////////////////
    var _config,
        _pool;

    ///////////////////////
    // Private functions //
    ///////////////////////
    /**
     * Simple getter for accessing the configuration options of this Connection instance.
     * @param {string} key The key of the configuration option to return.
     * @returns {*} Value of key.
     * @private
     */
    function _get(key) {
        return _config[key];
    }

    /**
     * Configure this Connection instance with the given options.
     * @param {object} options Configuration options for this instance.
     * @param {boolean} [strict] If true, reset configuration to the defaults before applying the new options.
     * @private
     */
    function _configure(options, strict) {
        var errorPrefix = 'Connection.configure(options, strict): options';

        if (!_.isObject(options)) {
            throw new Error(errorPrefix + ': must be an object!');
        } else if (options.name && !_.isString(options.name)) {
            throw new Error(errorPrefix + '.name: must be a string!');
        } else if (options.max && !_.isNumber(options.max)) {
            throw new Error(errorPrefix + '.max: must be a number!');
        } else if (options.min && !_.isNumber(options.min)) {
            throw new Error(errorPrefix + '.min: must be a number!');
        } else if (options.idleTimeoutMillis && !_.isNumber(options.idleTimeoutMillis)) {
            throw new Error(errorPrefix + '.idleTimeoutMillis: must be a number!');
        } else if (options.log && !_.isBoolean(options.log) && !_.isFunction(options.log)) {
            throw new Error(errorPrefix + '.log: must be a boolean or a function!');
        } else if (options.refreshIdle && !_.isNumber(options.refreshIdle)) {
            throw new Error(errorPrefix + '.refreshIdle: must be a number!');
        } else if (options.reapIntervalMillis && !_.isNumber(options.reapIntervalMillis)) {
            throw new Error(errorPrefix + '.reapIntervalMillis: must be a number!');
        } else if (options.priorityRange && !_.isNumber(options.priorityRange)) {
            throw new Error(errorPrefix + '.priorityRange: must be a number!');
        }

        if (_pool) {
            var oldPool = _pool;
            oldPool.drain();
        }

        if (strict) {
            _config = _.extend({}, defaults, options);
        } else {
            _config = _.extend({}, options);
        }

        _pool = createPool(_config);
    }

    ///////////////////////
    // Wrapper functions //
    ///////////////////////
    function _destroy(cb, priority) {
        _pool.destroy(cb, priority);
    }

    function _acquire(cb, priority) {
        _pool.acquire(cb, priority);
    }

    function _borrow(cb, priority) {
        _pool.borrow(cb, priority);
    }

    function _release(obj) {
        _pool.release(obj);
    }

    function _returnToPool(obj) {
        _pool.returnToPool(obj);
    }

    function _drain(cb) {
        _pool.drain(cb);
    }

    function _destroyAllNow(cb) {
        _pool.destroyAllNow(cb);
    }

    function _pooled(decorated, priority) {
        _pool.pooled(decorated, priority);
    }

    function _getPoolSize() {
        _pool.getPoolSize();
    }

    function _getName() {
        _pool.getName();
    }

    function _availableObjectsCount() {
        _pool.availableObjectsCount();
    }

    function _waitingClientsCount() {
        _pool.waitingClientsCount();
    }

    ////////////////////
    // Public methods //
    ////////////////////
    this.get = _get;
    this.configure = _configure;

    // Wrapper methods
    this.destroy = _destroy;
    this.acquire = _acquire;
    this.borrow = _borrow;
    this.release = _release;
    this.returnToPool = _returnToPool;
    this.drain = _drain;
    this.destroyAllNow = _destroyAllNow;
    this.pooled = _pooled;
    this.getPoolSize = _getPoolSize;
    this.getName = _getName;
    this.availableObjectsCount = _availableObjectsCount;
    this.waitingClientsCount = _waitingClientsCount;

    ///////////
    // Setup //
    ///////////
    _configure(options, true);
}

/**
 * Execute a pre-defined ReQL query.
 * @param {object} query Pre-defined ReQL query.
 * @param {function} cb Callback function.
 */
Connection.prototype.run = function run(query, cb) {
    var conn,
        _this = this;

    async.waterfall([
        function (next) {
            _this.acquire(next);
        },
        function (connection, next) {
            conn = connection;
            query.run(conn, next);
        }
    ], function (err, result) {
        if (conn) {
            _this.release(conn);
        }
        if (err) {
            cb(err);
        } else {
            cb(null, result);
        }
    });
};

module.exports = Connection;