aaronmccall/verymodel-riak

View on GitHub
lib/defaults.js

Summary

Maintainability
A
1 hr
Test Coverage
var _       = require('underscore');
var indexes = require('./indexes');
var request = require('./request_helpers.js');
var streams = require('./streams');

var defaults = {
    // ### definition
    definition: {
        // **id**: The "id" field is where we'll store the Riak key. By default it will just be an unvalidated, public field
        id: {},
        // **indexes**: We'll need a way to retrieve all of the fields that should be indexed, so by default that will be all of the fields defined as indexes
        indexes: {
            private: true,
            derive: indexes.derive
        },
        // **value**: By default, all non-private fields that aren't the id are expected to be the value payload
        value: {
            private: true,
            derive: function () {
                var model = this.__verymeta.model;
                if (!model.options.values) model.options.values = _.compact(_.map(model.definition, function (def, key) {
                    var isKeyField = key === model.options.keyField;
                    var isAllKey = key === model.options.allKey;
                    // By default we'll use all of the public fields except id
                    return (def.private || isKeyField || isAllKey) ? false : key;
                }));
                return _.pick(this, model.options.values);
            }
        }
    },
    // ### model methods
    methods: {
        // **getAllKey**: Allows us to share a bucket between different model types
        getAllKey: function () {
            if (this.options.allKey === '$bucket') {
                return {key: this.options.allKey, def: {default: this.getBucket()}};
            }
            var allKeyDef = this.options.allKey && this.definition[this.options.allKey],
                // default + required ensures that the allKey is always populated
                // private ensures it's not stored as part of the object's data
                // static ensures that the default value is not overwritten
                allKeyIsValid = allKeyDef && (allKeyDef.default &&
                                allKeyDef.required && allKeyDef.static);
            if (allKeyDef && allKeyIsValid) {
                return { key: this.options.allKey + '_bin', def: allKeyDef};
            }
        },
        // **getBucket**: Returns default bucket optionally appending an additional namespace
        getBucket: function (append) {
            var bucket = this.options.bucket;
            if (!bucket && this.options.riak) bucket = this.options.riak.bucket;
            if (bucket && !append) return bucket;
            if (bucket && append) return [bucket, append].join(this.options.namespaceSeparator||"::");
            throw new Error('Please set a Riak bucket via options.bucket');
        },

        // **getLogger**: Returns specified logger if defined or creates one and returns it
        getLogger: function _logger() {
            return this.options.logger || (this.options.logger = require('bucker').createNullLogger());
        },

        // **getRequest**: Builds Riak request objects. Signature varies according to type and developer preference. Supported request types are: del, get, index, mapreduce, and search.
        //   - Build request to get a single object by its key:
        //     - type and key: `model.getRequest('get', 'my-riak-key')` or
        //     - type and object: `model.getRequest('get', {key: 'my-riak-key'})`
        //   - Build request to get a list of keys for an index exact match:
        //     - type, index, key: `model.getRequest('index', 'my_index_bin', 'foo')` or
        //     - type and object: `model.getRequest('index', {index: 'my_index_bin', key: 'foo'})`
        //   - Build request to get a list of keys via an index range search:
        //     - type, index, min, max: `model.getRequest('index', 'my_index_bin', 'bar', 'foo')` or 
        //     - type and object: `model.getRequest('index', {index: 'my_index_bin', range_min: 'bar', range_max: 'foo'})`
        //   - Build request for mapreduce:
        //     - type, index, key, query array: `model.getRequest('mapreduce', 'my_index', 'my_key', […map/reduce phases…])` or
        //     - type and object: `model.getRequest('mapreduce', {inputs: …my inputs…, query: […map/reduce phases…]})` or
        //     - type, inputs array, query array: `model.getRequest('mapreduce', […my inputs…], […map/reduce phases…])`
        //   - Build request to search:
        //     - type, index, q: `model.getRequest('search', 'my_index', 'name:Bill')` or
        //     - type and object: `model.getRequest('search', {index: 'my_index', q: 'name:Bill'})`
        //   - Finally, any type of request can be created according to the following format:
        //     - `model.getRequest({ type: 'index', options: {index: 'my_index_bin', key: 'foo'}})`
        //       where type is any one of the types listed and options
        getRequest: function (type) {
            if (_.isObject(type) && type.type && type.options) {
                return request[type.type](this, [type.options]);
            }
            return request[type](this, _.rest(arguments));
        },
        // **_indexQuery**: Index query wrapper that returns a stream
        _indexQuery: function () {
            var args = _.rest(arguments, 0);
            args.unshift('index');
            var request = this.getRequest.apply(this, args);
            // Return the readable stream
            return this.getClient().getIndex(request);
        },
        // **all**: Streams or calls back with all instances of this model that are stored in Riak
        // or an index-filtered set of them, depending on whether filtering args are passed.
        // If the first argument is a function, it will be called with the result.
        all: function () {
            var args = _.rest(arguments, 0),
                streaming = typeof args[0] !== 'function',
                cb = !streaming ? args[0] : null,
                requestArgs = _.rest(args, (streaming ? 0 : 1)),
                bucket;
            if (args.length > 1 && typeof args[1] === 'object') {
                bucket = args[1].bucket;
            }
            var logger = this.getLogger();
            logger.debug('query prepared: %j, streaming: %s', requestArgs, streaming);
            // All stream handling is done via a Transform stream that
            // receives our key stream and transmits instances
            var stream = this._indexQuery.apply(this, requestArgs),
                streamOpts = {model: this, bucket: bucket};

            return stream.pipe(new streams.KeyToValueStream(_.defaults({}, streamOpts)))
                         .pipe(new streams.InstanceStream(_.defaults({callback: cb}, streamOpts)));
        },
        // **find**: Simplifies index lookups and can be called with the values or options object signatures.
        // - values: find('index', 'key_or_min', ['max',] [function (err, instances) {}])
        // - options object (shown with range query—-substitute key for range_min/range_max for exact match):
        //   find({index: 'index', range_min: 'min', range_max: 'max'}, [function (err, instance)])
        find: function (index) {
            var args = _.rest(arguments),
                hasCb = typeof _.last(args) === 'function',
                cb = hasCb && args.pop();
            if (typeof index === 'string' && !index.match(/_(bin|int)$/)) {
                index = indexes.getName(index, this.definition[index]);
            }
            args.unshift(index);
            if (cb) {
                args.unshift(cb);
            }
            return this.all.apply(this, args);
        },

        // **find**: searches for matching 
        // **indexesToData**: Reformats indexes from Riak so that they can be applied to model instances
        indexesToData: indexes.rehydrate,
        // **replyToData**: Reformats riak reply into the appropriate format to feed into an instance's loadData method
        replyToData: function (reply) {
            this.getLogger().debug('Function [replyToData]');
            if (!reply || !reply.content) {
                return {};
            }
            var content = (reply.content.length > 1) ? this.options.resolveSiblings(reply.content) : reply.content[0];
            // reformat our data for VeryModel
            var indexes = {};
            indexes = this.indexesToData(content.indexes);
            var data = _.extend(content.value, indexes);
            if (reply.key) {
                data[this.options.keyField] = reply.key;
            }
            if (reply.vclock) {
                data.vclock = reply.vclock;
            }
            return data;
        },
        _getQuery: function (id, bucket, cb) {
            var reqArgs = ['get', id];
            if (typeof bucket === 'function' && typeof cb === 'undefined') {
                cb = bucket;
                bucket = undefined;
            }
            if (bucket) reqArgs.push(bucket);
            this.getClient().get(this.getRequest.apply(this, reqArgs), cb);
        },
        _getInstance: function (id, reply, bucket) {
            // Resolve siblings, if necessary, or just grab our content
            var data = this.replyToData(reply);
            data[this.options.keyField] = id;
            if (bucket && this.definition.bucket) data.bucket = bucket;
            var instance = this.create(data);
            if (bucket && !instance.bucket) instance.bucket = bucket;
            return instance;
        },
        // **load**: Load an object's data from Riak and creates a model instance from it.
        load: function (id, bucket, cb) {
            var self = this;
            if (typeof bucket === 'function' && typeof cb === 'undefined') {
                cb = bucket;
                bucket = undefined;
            }
            this._getQuery(id, bucket, function (err, reply) {
                if (err||_.isEmpty(reply)) return cb(err||new Error('No matching key found.'));
                self._last = self._getInstance(id, reply, bucket);
                // Override default toJSON method to make more Hapi compatible
                if (typeof cb === 'function') {
                    cb(null, self._last);
                }
            });
        },
        // **remove**: Remove an instance from Riak
        delete: function (id, bucket, cb) {
            var self = this;
            if (typeof bucket === 'function' && typeof cb === 'undefined') {
                cb = bucket;
                bucket = undefined;
            }
            this.getLogger().debug('request to delete id(%s)', id);
            this.getClient().del(this.getRequest('del', id, bucket), function (err) {
                if (err) return cb(err);
                self.getLogger().debug('successfully deleted');
                cb();
            });
        }
    },
    // ### options
    options: {
        // - Default allKey is Riak's magic 'give me all the keys' index
        allKey: '$bucket',
        // - Default key field is id
        keyField: 'id',
        // - pagination is on by default to prevent overloading the server
        max_results: 100,
        paginate: true,
        // - Default sibling handler is "last one wins"
        resolveSiblings: function (siblings) {
            return _.max(siblings, function (sibling) {
                return parseFloat(sibling.last_mod + '.' + sibling.last_mod_usecs);
            });
        }
    },
    // ### instanceMethods
    instanceMethods: {
        // wrapper around this instance's 
        delete: function (callback) {
            return this.__verymeta.model.delete(this.id, this.getBucket(), callback);
        },
        // **prepare**: Prepare a Riak request object from this instance.
        prepare: function () {
            var content = {
                value: JSON.stringify(this.value),
                content_type: 'application/json'
            };
            var indexes = this.indexes;
            if (indexes.length) content.indexes = indexes;
            var payload = {
                content: content,
                bucket: this.getBucket(),
                return_body: true
            };
            if (this.id) {
                payload.key = this.id;
            }
            if (this.vclock) {
                payload.vclock = this.vclock;
            }
            return payload;
        },
        // **save**: Put this instance to Riak.
        // May be called with an options object, currently only for the purpose
        // of passing { validate: false } to bypass validation
        save: function (cb, opts) {
            if (!opts || opts.validate !== false) {
                var errors = this.doValidate();
                if (errors.length) return cb(errors);
            }
            var self = this;
            var logger = this.getLogger();
            var payload = this.prepare();

            this.getClient().put(payload, function (err, reply) {
                logger.debug('riak put %s', (err == null ? 'succeeded' : ('failed: ' + err)));
                if (!err) {
                    if (!self.id && reply.key)  self.id = reply.key;
                    if (reply.vclock) self.vclock = reply.vclock;
                }
                if (reply.content.length > 1 && typeof cb !== 'boolean') {
                    self.loadData(self.__verymeta.model.replyToData(reply));
                    // The boolean arg prevents a race condition when
                    // reply.content.length continues to be > 1
                    self.save(true);
                }
                if (typeof cb === 'function') {
                    cb(err, self);
                }
            });
        },
        // **getClient**: Proxy method to get the Riak client from model
        getClient: function () { return this.__verymeta.model.getClient(); },

        // **getBucket**: return instance-level bucket property or fall back to model's getBucket
        getBucket: function () {
            return (typeof this.bucket !== 'undefined') ? this.bucket : this.__verymeta.model.getBucket();
        },
        // **getLogger**: return this instance's logger (if defined) or fall back to model's getLogger
        getLogger: function () {
            return (typeof this.logger !== 'undefined') ? this.logger : this.__verymeta.model.getLogger();
        }
    }
};

// backwards compatibility
defaults.methods.remove = defaults.methods.delete;

// Add some logging
var logify = function (obj, name) {
    if (typeof obj[name] !== 'function') return;
    var method = obj[name];
    obj[name] = _.wrap(method, function (method) {
        this.getLogger().debug('Function [%s]', name);
        return method.apply(this, _.rest(_.toArray(arguments)));
    });
};
['load', 'delete', 'remove', 'find', 'all'].forEach(_.partial(logify, defaults.methods));
['delete', 'prepare', 'save'].forEach(_.partial(logify, defaults.instanceMethods));

module.exports = defaults;