RackHD/on-core

View on GitHub
lib/common/model.js

Summary

Maintainability
F
1 wk
Test Coverage
// Copyright 2015, EMC, Inc.

'use strict';

module.exports = ModelFactory;

ModelFactory.$provide = 'Model';
ModelFactory.$inject = [
    'pluralize',
    'Promise',
    '_',
    'Util',
    'Assert',
    'Errors',
    'Waterline',
    'Protocol.Waterline',
    'Services.Waterline'
];

function ModelFactory (
    pluralize,
    Promise,
    _,
    util,
    assert,
    Errors,
    Waterline,
    waterlineProtocol,
    waterline
) {
    var url = require('url');

    function byLastUpdated(lastUpdated) {
        return { updatedAt: { '>': lastUpdated } };
    }

    return Waterline.Collection.extend({
        attributes: {
            identifiers: {
                type: 'array',
                required: false
            },
            properties: {
                type: 'json',
                required: false,
                json: true
            }
        },

        deSerialize: function(obj){
            if(obj && obj._id && !obj.id) {
                obj.id = obj._id.toString();
            }
            return obj;
        },

        publishRecord: function (event, record, id) {
            if (!id && this.attributes && this.attributes.instanceId) {
                id = record && record.instanceId;
            }
            return waterlineProtocol.publishRecord(this, event, record || {}, id || '');
        },

        afterCreate: function (record, next) {
            this.publishRecord('created', record)
            .then(function () {
                next();
            }).catch(function (err) {
                next(err);
            });
        },
        afterUpdate: function (record, next) {
            this.publishRecord('updated', record)
            .then(function () {
                next();
            }).catch(function (err) {
                next(err);
            });
        },
        afterDestroy: function (records, next) {
            var self = this;

            Promise.all(_.map(records, function (record) {
                return self.publishRecord('destroyed', record);
            }))
            .then(function () {
                next();
            }).catch(function (err) {
                next(err);
            });
        },

        addWaterlineUpdatedAtToObject: function(obj, prop) {
            if (prop && obj[prop]) {
                if (!obj[prop].updatedAt) {
                    obj[prop].updatedAt = new Date();
                }
            } else if (!obj.updatedAt && !prop) {
                obj.updatedAt = new Date();
            }
        },

        addWaterlineCreatedAtToObject: function(obj, prop) {
            if (prop && obj[prop]) {
                if (!obj[prop].createdAt) {
                    obj[prop].createdAt = new Date();
                }
            } else if (!obj.createdAt) {
                obj.createdAt = new Date();
            }
        },

        runNativeMongo: function(func, args) {
            args = Array.isArray(args) ? args : [args];
            return Promise.fromNode(this.native.bind(this))
            .then(function(collection) {
                return collection[func].apply(collection, args);
            })
            .then(function(doc) {
                if (doc && _.has(doc, 'value')) {
                    return doc.value;
                } else {
                    // This is for calls like find that return a cursor object
                    // and not a result object
                    return doc;
                }
            });
        },

        createMongoIndexes: function() {
            return Promise.map(_.values(arguments), this.runNativeMongo.bind(this, 'createIndex'));
        },

        createUniqueMongoIndexes: function(indexes) {
            var self = this;
            return Promise.map(indexes, function(index) {
                return self.runNativeMongo('createIndex', [index, { unique: true }]);
            });
        },

        /**
         * Create indexes for different database
         *
         * the waterline has bug for index creation and it also doesn't support compound index,
         * this function is to manually create indexes regardless what waterline will do. This
         * function support both single-field and compound index
         *
         * @return {Promise} Resolved if create indexes successfully.
         */
        createIndexes: function () {
            var self = this;
            var indexes = self.$indexes || [];

            return Promise.try (function() {
                assert.arrayOfObject(indexes, '$indexes should be an array of object');

                //validate and assign default arguments before creating indexes
                //if necessary, convert the indexes format to fit for different database
                _.forEach(indexes, function(indexItem) {
                    assert.object(indexItem.keys, 'Database index keys should be object');
                    if (indexItem.options) {
                        assert.object(indexItem.options, 'Database index options should be object');
                    }
                    else {
                        indexItem.options = {};
                    }
                });

                return indexes;
            })
            .then(function(indexes) {
                if (_.isEmpty(indexes)) {
                    return;
                }
                switch(self.connection.toString()) {
                    case 'mongo':
                        return Promise.map(indexes, function(indexItem) {
                            return self.createMongoIndexes([indexItem.keys, indexItem.options]);
                        });
                    default:
                        break;
                }
            });
        },

        runQuery: function( querystr, params ) {
            return Promise.fromNode(this.query.bind(this, querystr, params));
        },

        postgresqlRunLockedQuery: function(querystr, params) {
            var self = this;
            var pg = Promise.promisifyAll(require('pg'));
            var connectionConfig = self.connections.postgresql.config;
            if(_.has(connectionConfig, 'url')) {
                var connectionUrl = url.parse(connectionConfig.url);
                connectionUrl.query = _.omit(connectionConfig, 'url');
                connectionConfig = url.format(connectionUrl);
            }
            return pg.connectAsync(connectionConfig)
            .spread(function(client, done) {
                var queryAsync = Promise.promisify(client.query);
                var queryList = [
                    ['BEGIN'],
                    ['LOCK TABLE ' + self.identity + ' IN EXCLUSIVE MODE'],
                    [querystr, params],
                    ['COMMIT']
                ];
                return Promise.reduce(queryList, function(rows, query) {
                    return queryAsync.apply(client, query)
                    .then(function(row) {
                        rows.push(row);
                        return rows;
                    });
                }, [])
                .then(function(rows) {
                    done();
                    return rows;
                });
            });
        },

        findMongo: function(query, options) {
            var self = this;
            return this.runNativeMongo('find', [query])
            .then(function(cursor) {
                if (options && options.limit) {
                    cursor = cursor.limit(options.limit);
                }
                if(options && options.sort) {
                    cursor = cursor.sort(options.sort);
                }

                return Promise.resolve(cursor.toArray());
            })
            .then(function(docs){
                return _.map(docs, self.deSerialize);
            });
        },

        findOneMongo: function(args) {
            var self = this;
            return this.runNativeMongo('findOne', args)
            .then(function(doc){
                return self.deSerialize(doc);
            });
        },

        findAndModifyMongo: function() {
            var self = this;

            // Take query and sort arguments off so we can modify the update arg
            var _arguments = Array.prototype.slice.call(arguments);
            var args = _arguments.slice(1);
            if (args.length) {
                args.shift();
            }
            var update = args.shift();

            // Don't validate if we're setting/unsetting individual properties
            if (update.$set || update.$setOnInsert) {
                if (update.$setOnInsert) {
                    this.addWaterlineCreatedAtToObject(update, '$setOnInsert');
                    update.$set = update.$set || {};
                }
                if (update.$set) {
                    this.addWaterlineUpdatedAtToObject(update, '$set');
                }
                return self.runNativeMongo('findAndModify', _arguments)
                .then(function(doc){
                        return self.deSerialize(doc);
                });
            }

            // Don't validate if we're setting/unsetting individual properties
            if (update.$unset) {
                if (!update.$set) {
                    update.$set = {};
                }
                this.addWaterlineUpdatedAtToObject(update, '$set');
                return self.runNativeMongo('findAndModify', _arguments)
                .then(function(doc){
                    return self.deSerialize(doc);
                });
            }

            if (update.$addToSet) {
                if (!update.$set) {
                    update.$set = {};
                }
                this.addWaterlineUpdatedAtToObject(update, '$set');
                return self.runNativeMongo('findAndModify', _arguments)
                .then(function(doc){
                    return self.deSerialize(doc);
                });
            }

            this.addWaterlineUpdatedAtToObject(update);
            this.addWaterlineCreatedAtToObject(update);

            return Promise.fromNode(self.validate.bind(this, update))
            .then(function() {
                return self.runNativeMongo('findAndModify', _arguments);
            })
            .then(function(doc){
                return self.deSerialize(doc);
            });
        },

        needOneMongo: function () {
            var identity = this.identity;
            var args = Array.prototype.slice.call(arguments);

            return this.findOneMongo(args)
            .then(function (record) {
                if (!record) {
                    throw new Errors.NotFoundError(
                        'Could not find %s with criteria %j.'.format(
                            pluralize.singular(identity),
                            args[0]
                        ), {
                            criteria: args[0],
                            collection: identity
                        });
                }

                return record;
            });
        },

        updateMongo: function(filter, update, options) {
            var self = this;
            if (update.$set || update.$unset) {
                this.addWaterlineUpdatedAtToObject(update, '$set');
                this.addWaterlineUpdatedAtToObject(update, '$unset');
                return self.runNativeMongo('update', [filter, update, options]);
            }

            if (update.$pullAll) {
                if (!update.$set) {
                    update.$set = {};
                }
                this.addWaterlineUpdatedAtToObject(update, '$set');
                return self.runNativeMongo('update', [filter, update, options]);
            }

            if (update.$addToSet) {
                if (!update.$set) {
                    update.$set = {};
                }
                this.addWaterlineUpdatedAtToObject(update, '$set');
                return self.runNativeMongo('update', [filter, update, options]);
            }

            this.addWaterlineUpdatedAtToObject(update);

            return Promise.fromNode(self.validate.bind(this, update))
            .then(function() {
                return self.runNativeMongo('update', [filter, update, options]);
            });
        },

        updateAndFindMongo: function(filter, update, options){
            var self = this;

            return self.updateMongo(filter, update, options)
            .then(function(){
                if (options && options.multi === true){
                    return self.findMongo(filter);
                }
                // If filter point to multiple doc and multi !== true
                // Update will be set on a random node.
                return self.findOneMongo(filter);
            });
        },

        removeMongo: function(query) {
            return this.runNativeMongo('remove', [query]);
        },

        /**
         * Look up by identifier - either the "id" or in the identifiers array.
         * Returns a waterline (bluebird) promise that will resolve the promise.
         *
         * @param {string} identifier
         * @returns {Deferred}
         */
        findByIdentifier: function (identifier) {
            var self = this;

            // Search for Lookup records associated with the identifier (id, macAddress, ipAddress).
            return waterline.lookups.findByTerm(identifier).then(function (documents) {
                // Identify a record with a node id.
                var lookup = _.find(documents, function (item) {
                    return item.node !== undefined;
                });
                // If we find a record with a node id then we can lookup the node with it,
                // otherwise we can try to lookup with the identifier directly.
                if (lookup) {
                    return self.findOne({ id: lookup.node });
                } else {
                    return self.findOne({ id: identifier })
                    .then(function(rec) {
                        if(!rec) {
                            return self.findOne({identifiers: identifier});
                        }
                        return rec;
                    });
                }
            });
        },
        /**
         * Look up by identifier - either the "id" or in the identifiers array.
         * Returns a waterline (bluebird) promise that will resolve the promise.
         * Rejects the promise if the document was not found.
         *
         * @param {string} identifier
         * @returns {Deferred}
         */
        needByIdentifier: function (identifier) {
            var self = this;

            return this.findByIdentifier(identifier).then(function (record) {
                if (!record) {
                    throw new Errors.NotFoundError(
                        'Could not find %s with identifier %s'.format(
                            pluralize.singular(self.identity),
                            identifier
                        ), {
                            identifier: identifier,
                            collection: self.identity
                        });
                }

                return record;
            });
        },
        /**
         * Look up and update values by identifier - either the "id" or in the identifiers array.
         * Returns a waterline (bluebird) promise with the record updated.
         * Rejects the promise if the document was not found.
         *
         * @param {string} identifier
         * @param {Object} values
         * @returns {Deferred}
         */
        updateByIdentifier: function (identifier, values) {
            var self = this;

            return self.needByIdentifier(identifier)
                .then(function (record) {
                    return self.update(
                        record.id,
                        values
                    ).then(function (records) {
                        return records[0];
                    });
                });
        },
        /**
         * Look up and remove objects by identifier - either the "id" or in the identifiers array.
         * Returns a waterline (bluebird) promise with the record deleted
         * Rejects the promise if the document was not found.
         * @param {string} identifier
         * @returns {Deferred}
         */
        destroyByIdentifier: function (identifier) {
            var self = this;

            return self.needByIdentifier(identifier)
                .then(function (record) {
                    return self.destroy(
                       record.id
                    ).then(function (records) {
                        return records[0];
                    });
                });
        },
        /**
         * Using an identifier, find and update, or create a new instance if none exists
         * using the values provided. Returns a waterline (bluebird) promise with the record
         * created or updated.
         *
         * @param {string} identifier
         * @param {Object} values
         * @returns {Deferred}
         */
        findOrCreateByIdentifier: function (identifier, values) {
            var self = this;

            return this.findByIdentifier(identifier).then(function (record) {
                if (record) {
                    return record;
                } else {
                    return self.create(values);
                }
            });
        },

        /**
         * Using an identifier, find document and delete one or more items 
         * from a list of this document, the values provided specifies the list and items. 
         * Returns a waterline (bluebird) promise with the document find and modify
         *
         * @param {string} identifier
         * @param {Object} values {field: [value]}
         * @returns {Promise}
         */
        removeListItemsByIdentifier: function (identifier, values) {
            var self = this;
            return this.needByIdentifier(identifier).then(function (record) {
                return self.removeListItemsById(record.id, values);
            });
        },

        /**
         * Using an identifier, find document and add one or more items 
         * to a list of this document, the values provided specifies the list and items. 
         * The signs is an object list, each sign related to one or more object items in [value].
         * The sign means primer key of object for determining if the object exist in list already.
         * Returns a waterline (bluebird) promise with the document find and modified
         *
         * @param {string} identifier
         * @param {Object} values {field: [value]}
         * @Param {Array} signs
         * @returns {Promise}
         */
        addListItemsIfNotExistByIdentifier: function (identifier, values, signs) {
            var self = this;
            return this.needByIdentifier(identifier).then(function (record) {
                return self.addListItemsIfNotExistById(record.id, values, signs);
            });
        },

        /**
         * Using an id, find document and update fields only if they don't exist.
         * Returns a waterline (bluebird) promise with the document find and modify.
         *
         * @param {string} id
         * @param {String} field
         * @param {*} values
         * @returns {Promise}
         */
        addFieldIfNotExistByIdentifier: function (identifier, field, values) {
            var self = this;
            return this.needByIdentifier(identifier).then(function (record) {
                return self.addFieldIfNotExistById(record.id, field, values);
            });
        },

        /**
         * Extend a criteria object with additional waterline query paramters to
         * find the single most recently updated object that matches the criteria.
         * Returns the waterline (bluebird) promise from the find method with the
         * extended criteria.
         *
         * @param {Object} criteria
         * @returns {Deferred}
         */
        findMostRecent: function (criteria) {
            return this.find(
                { where: criteria, limit: 1, sort: { createdAt: -1 } }
            ).then(function (records) {
                return records[0];
            });
        },
        /**
         * Extend a criteria object with additional waterline query paramters to
         * find all records that have been modified after the provided date.
         * Returns the waterline (bluebird) promise from the find method with the
         * extended criteria.
         *
         * @param {Date} lastUpdated
         * @param {Object} criteria
         * @returns {Deferred}
         */
        findSinceLastUpdate: function (lastUpdated, criteria) {
            assert.isDate(lastUpdated);

            return this.find(_.merge({}, criteria, byLastUpdated(lastUpdated)));
        },

        /**
         * Find a single document via criteria and resolve it, otherwise
         * reject with a NotFoundError.
         */
        needOne: function (criteria) {
            var identity = this.identity;

            return this.findOne(criteria).then(function (record) {
                if (!record) {
                    throw new Errors.NotFoundError(
                        'Could not find %s with criteria %j.'.format(
                            pluralize.singular(identity),
                            criteria
                        ), {
                            criteria: criteria,
                            collection: identity
                        });
                }

                return record;
            });
        },

        /**
         * Find a single document by ID and resolve it, otherwise reject
         * with a NotFoundError.
         */
        needOneById: function (id) {
            return this.needOne({ id: id });
        },

        /**
         * Update a single document by criteria and resolve it, otherwise
         * reject with a NotFoundError.
         */
        updateOne: function (criteria, document) {
            var self = this;

            return this.needOne(criteria).then(function (target) {
                return self.update(target.id, document).then(function (documents) {
                    return documents[0];
                });
            });
        },

        /**
         * Update a single document by ID and resolve it, otherwise
         * reject with a NotFoundError.
         */
        updateOneById: function (id, document) {
            return this.updateOne({ id: id }, document);
        },

        /**
         * Using an id, find document and delete one or more items 
         * from a list of this document, the values provided specifies the list and items. 
         * Returns a waterline (bluebird) promise with the document find and modify
         *
         * @param {string} id
         * @param {Object} values {field: [value]}
         * @returns {Promise}
         */
        removeListItemsById: function (id, values) {
            var self = this;
            var query = { _id: waterline.nodes.mongo.objectId(id) };
            var update = {
                $pullAll: values
            };
            var options = {new: true};
            return self.updateAndFindMongo(query, update, options);
        },


        /**
         * Using an Id, find document and add one or more items into a list of this document, 
         * the values provided specifies the list and items, suppose relation is a obj list,
         * targets is a list in obj item of relations, then
         *     values = {"relations.0.targets", ["some string", {obj}, 123]}
         * The sign means primer key of object for determining if the object exist in list already.
         * suppose before modifying, the document is:
         *   {
         *       ...,
         *       list: [
         *           {a:1}
         *       ],
         *       ...
         *   }
         *   If users would like to add "item1", {a:1, b:2} and {c:3} as items in the "list",
         *   , the function's parameter "value (field: [item])" can be set as
         *   value = {list: ['item1', {a:1, b:2}, {c:3}]}
         *   If you just want to modify the first item in the list, "list.0" can be used as field.
         *   To prevent from recreating the item {a:1, b:2} if {a:1} is already in the original 
         *   field, "signs" can be
               signs = [{a:1}]
         *   in this case, 
         *   the document will remain the same as the original one without adding anything.
         * Returns a waterline (bluebird) promise with the document find and modify
         *
         * @param {string} identifier
         * @param {Object} values {field: [value]}
         * @Param {Array} signs
         * @returns {Promise}
         */
        addListItemsIfNotExistById: function (id, values, signs) {
            var self = this;

            var transformedValues = _
            .transform(values, function(result, array, key) {
                var withSign = {};
                if (signs && signs.length !== 0){
                    withSign = _.transform(array, function(result, item){
                        if (typeof item !== "object"){
                            return true;
                        }
                        _.forEach(signs, function(sign){
                            if (_.some([item], sign)){
                                var obj = {};
                                obj[key] = {$each: [item]};
                                obj.sign = sign;
                                obj.field = key;
                                (result.vaulesWithSign || (result.vaulesWithSign = []))
                                .push(obj);
                                (result.objsWithSign || (result.objsWithSign = []))
                                .push(item);
                            }
                        });
                    }, {});
                }

                result.vaulesWithSign = 
                (result.vaulesWithSign || (result.vaulesWithSign = []))
                .concat(withSign.vaulesWithSign || []);

                (result.vaulesWithoutSign || (result.vaulesWithoutSign = {}))[key] = 
                    {$each: _.difference(array, withSign.objsWithSign)};
            }, {});

            if (transformedValues.vaulesWithSign){
                    transformedValues = transformedValues.vaulesWithSign
                        .concat(transformedValues.vaulesWithoutSign);
            }

            return Promise.map(transformedValues, function(transformedValue){
                var query = { _id: waterline.nodes.mongo.objectId(id) };
                var update = {
                    $addToSet: transformedValue
                };
                var options = {new: true};
                if(transformedValue.sign){
                    query[transformedValue.field] = {$not: {$elemMatch: transformedValue.sign}};
                    transformedValue = _.pick(transformedValue, transformedValue.field);
                    update = {
                        $addToSet: transformedValue
                    };
                    return self.findAndModifyMongo(query, {}, update, options);
                }
                return self.updateAndFindMongo(query, update, options);
            }).then(function(modifiedDocs){
                return _.last(modifiedDocs);
            });
        },

        /**
         * Using an id, find document and update fields only if they don't exist.
         * Returns a waterline (bluebird) promise with the document find and modify.
         *
         * @param {string} id
         * @param {String} field
         * @param {Object/String/Number etc.} values
         * @returns {Promise}
         */
        addFieldIfNotExistById: function (id, field, values) {
            var self = this;

            var query = { _id: waterline.nodes.mongo.objectId(id)};
            query[field] = {$exists : false};

            var update = { $set: {} };
            update.$set[field] = values;

            var options = {new: true};
            return self.findAndModifyMongo(query, {}, update, options);
        },

        /**
         * Destroy a single document by criteria and resolve it, otherwies
         * reject with a NotFoundError.
         */
        destroyOne: function (criteria) {
            var self = this;

            return this.needOne(criteria).then(function (target) {
                return self.destroy(target.id).then(function (documents) {
                    return documents[0];
                });
            });
        },

        /**
         * Destroy a single document by ID and resolve it, otherwise
         * reject with a NotFoundError.
         */
        destroyOneById: function (id) {
            return this.destroyOne({ id: id });
        }
    });
}