lib/rethink.js
/* jshint undef: true, unused: true, latedef: nofunc*/
/*eslint no-use-before-define:0, no-extra-parens:0, consistent-this:0*/
'use strict';
var r;
var moment = require('moment');
var _ = require('lodash');
var util = require('util');
var debug = require('debug')('connectors:rethinkdb');
var Connector = require('loopback-connector').Connector;
var BPromise = require('bluebird');
function RethinkDB(s, dataSource) {
debug('ctor');
Connector.call(this, 'rethink', s);
this.dataSource = dataSource;
this.database = s.database;
}
util.inherits(RethinkDB, Connector);
exports.initialize = function initializeDataSource(dataSource, callback) {
debug('initialize');
var s = dataSource.settings;
if(dataSource.settings.rs) {
s.rs = dataSource.settings.rs;
if(dataSource.settings.url) {
var uris = dataSource.settings.url.split(',');
s.hosts = [];
s.ports = [];
uris.forEach(function(uri) {
var url = require('url').parse(uri);
s.hosts.push(url.hostname || 'localhost');
s.ports.push(parseInt(url.port || '28015', 10));
if(!s.database) {
s.database = url.pathname.replace(/^\//, '');
}
if(!s.username) {
s.username = url.auth && url.auth.split(':')[0];
}
if(!s.password) {
s.password = url.auth && url.auth.split(':')[1];
}
});
}
s.database = s.database || 'test';
} else {
if(dataSource.settings.url) {
var url = require('url').parse(dataSource.settings.url);
s.host = url.hostname;
s.port = url.port;
s.database = url.pathname.replace(/^\//, '');
s.username = url.auth && url.auth.split(':')[0];
s.password = url.auth && url.auth.split(':')[1];
}
s.host = s.host || 'localhost';
s.port = parseInt(s.port || '28015', 10);
s.database = s.database || 'test';
}
s.safe = s.safe || false;
dataSource.adapter = new RethinkDB(s, dataSource);
r = require('rethinkdbdash')({
host: s.host,
port: s.port,
db: s.database,
user: s.username,
password: s.password
});
dataSource.connector = dataSource.adapter;
// expose the rethinkdbdash driver
dataSource.connector.db = r;
process.nextTick(callback);
};
RethinkDB.prototype.connect = function(cb) {
cb(); // connection pooling handles it
};
RethinkDB.prototype.getTypes = function() {
return ['db', 'nosql', 'rethinkdb'];
};
RethinkDB.prototype.getDefaultIdType = function() {
return String;
};
RethinkDB.prototype.table = function(model) {
return this._models[model].model.tableName;
};
// creates tables ifnot exists
RethinkDB.prototype.autoupdate = function(models, done) {
debug('autoupdate');
var _this = this;
if((!done) && ('function' === typeof models)) {
done = models;
models = undefined;
}
// First argument is a model name
if('string' === typeof models) {
models = [models];
}
models = models || Object.keys(_this._models);
r.db(_this.database).tableList().run()
.catch(done)
.then(function(list) {
var promises = models.map(function(model) {
if(list.length === 0 || list.indexOf(model) < 0) {
return r.db(_this.database).tableCreate(model).run()
.then(function() {
createIndices(model);
});
// .catch(cb);
} else {
return createIndices(model);
}
});
BPromise.all(promises).nodeify(done);
});
function createIndices(model) {
var properties = _this._models[model].properties;
var settings = _this._models[model].settings;
var indexCollection = _.extend({}, properties, settings);
function checkAndCreate(list, indexName, indexOption, indexFunction) {
// Don't attempt to create an index on primary key 'id'
if(indexName !== 'id' && _hasIndex(_this, model, indexName) && list.indexOf(indexName) < 0) {
var query = r.db(_this.database).table(model);
if(indexFunction) {
query = query.indexCreate(indexName, indexFunction, indexOption);
} else {
query = query.indexCreate(indexName, indexOption);
}
return query.run();
} else {
return null;
}
}
if(!_.isEmpty(indexCollection)) {
return r.db(_this.database).table(model).indexList().run()
.catch(function(err) {
return BPromise.reject(err);
})
.then(function(list) {
var promises = Object.keys(indexCollection).map(function(indexName) {
var indexConf = indexCollection[indexName];
return checkAndCreate(list, indexName, indexConf.indexOption || {}, indexConf.indexFunction);
});
return BPromise.all(promises);
});
} else {
return null;
}
}
};
// drops tables and re-creates them
RethinkDB.prototype.automigrate = function(models, cb) {
debug('automigrate');
this.autoupdate(models, cb);
};
// checks ifdatabase needs to be actualized
RethinkDB.prototype.isActual = function(cb) {
debug('isActual');
var _this = this;
r.db(_this.database).tableList().run()
.catch(cb)
.then(function(list) {
if(_.isEmpty(list)) {
cb(null, _.isEmpty(_this._models));
}
var actual = true;
var promises = Object.keys(_this._models).map(function(model) {
if(!actual) {
cb(null, false);
return;
}
var properties = _this._models[model].properties;
var settings = _this._models[model].settings;
var indexCollection = _.extend({}, properties, settings);
if(list.indexOf(model) < 0) {
actual = false;
cb(null, false);
return;
} else {
r.db(_this.database).table(model).indexList().run()
.catch(function(err) {
cb(err);
})
.then(function(list) {
if(!actual) {
cb(new Error('isActual error'), false);
}
Object.keys(indexCollection).forEach(function(property) {
if(_hasIndex(_this, model, property) && list.indexOf(property) < 0) {
actual = false;
}
});
cb(null, actual);
});
}
});
BPromise.all(promises).nodeify(function(err) {
cb(err, actual);
});
});
};
RethinkDB.prototype.create = function(model, data, callback) {
debug('create');
if(data.id === null || data.id === undefined) {
delete data.id;
}
this.save(model, data, callback, true);
};
RethinkDB.prototype.updateOrCreate = function(model, data, callback) {
debug('updateOrCreate');
if(data.id === null || data.id === undefined) {
delete data.id;
}
this.save(model, data, callback, true, true);
};
RethinkDB.prototype.save = function(model, data, callback, strict, returnObject) {
debug('save');
var _this = this;
if(strict === undefined) {
strict = false;
}
Object.keys(data).forEach(function(key) {
if(data[key] === undefined) {
data[key] = null;
}
});
r.db(_this.database).table(model).insert(data, {
conflict: strict ? 'error' : 'update',
returnChanges: true
}).run()
.catch(callback)
.then(function(m) {
var err = m.first_error && new Error(m.first_error);
if(err) {
throw new Error(err);
} else {
var info = {};
var id = model.id;
if(m.inserted > 0) {
info.isNewInstance = true;
}
// if(m.changes) {
if(m.changes && m.changes.length > 0) {
id = m.changes[0].new_val.id;
}
// if(returnObject && m.changes) {
if(returnObject && m.changes && m.changes.length > 0) {
return [m.changes[0].new_val, info];
} else {
return [id, info];
}
}
})
.nodeify(callback, {
spread: true
});
};
RethinkDB.prototype.exists = function(model, id, callback) {
debug('exists');
var _this = this;
r.db(_this.database).table(model).get(id).run()
.catch(callback)
.then(function(data) {
return !!(data);
})
.nodeify(callback);
};
RethinkDB.prototype.find = function find(model, id, callback) {
debug('find');
var _this = this;
var _keys;
r.db(_this.database)
.table(model)
.get(id)
.run()
.catch(callback)
.then(function(data) {
_keys = _this._models[model].properties;
if(data) {
// Pass to expansion helper
_expandResult(data, _keys);
}
// Done
// callback(null, data);
return data;
})
.nodeify(callback);
};
RethinkDB.prototype.destroy = function destroy(model, id, callback) {
debug('destroy');
var _this = this;
r.db(_this.database).table(model).get(id).delete().run()
.catch(callback)
.nodeify(callback);
};
var createAllPromise = function(model, filter) {
var _this = this;
if(!filter) {
filter = {};
}
var promise = r.db(_this.database).table(model);
if(filter.order) {
var keys = filter.order;
if(typeof keys === 'string') {
keys = keys.split(',');
}
keys.forEach(function(key) {
var m = key.match(/\s+(A|DE)SC$/);
key = key.replace(/\s+(A|DE)SC$/, '').trim();
var hasIndex = _hasIndex(_this, model, key);
if(m && m[1] === 'DE') {
if(hasIndex) {
promise = promise.orderBy({
index: r.desc(key)
});
} else {
promise = promise.orderBy(r.desc(key));
}
} else {
if(hasIndex) {
promise = promise.orderBy({
index: r.asc(key)
});
} else {
promise = promise.orderBy(r.asc(key));
}
}
});
} else {
// default sort by id
promise = promise.orderBy({ index: r.asc('id') });
}
if(filter.where) {
promise = buildWhere(_this, model, filter.where, promise); //_processWhere(_this, model, filter.where, promise);
}
if(filter.skip) {
promise = promise.skip(filter.skip);
} else if(filter.offset) {
promise = promise.skip(filter.offset);
}
if(filter.limit) {
promise = promise.limit(filter.limit);
}
return promise;
};
RethinkDB.prototype.allFeed = function(model, feedId, filter, callback) {
debug('all');
var promise = createAllPromise.call(this, model, filter);
promise
.merge({
feedId: feedId
})
.changes()
.run({
cursor: true
}, callback);
};
RethinkDB.prototype.all = function all(model, filter, options, callback) {
debug('all');
var _this = this;
var _model;
var _keys;
var promise = createAllPromise.call(this, model, filter);
promise.run()
.catch(callback)
.then(function(data) {
_keys = _this._models[model].properties;
_model = _this._models[model].model;
data.forEach(function(element) {
_expandResult(element, _keys);
});
if(filter && filter.include && filter.include.length > 0) {
_model.includeAsync = BPromise.promisify(_model.include);
return _model.includeAsync(data, filter.include, options);
} else {
return data;
}
})
.nodeify(callback);
};
RethinkDB.prototype.destroyAll = function destroyAll(model, where, callback) {
debug('destroyAll');
var _this = this;
if(!callback && 'function' === typeof where) {
callback = where;
where = undefined;
}
var promise = r.db(_this.database).table(model);
if(where !== undefined) {
promise = buildWhere(_this, model, where, promise);
}
promise.delete().run()
.catch(callback)
.then(function(result) {
return {
count: result.deleted
};
})
.nodeify(callback);
};
RethinkDB.prototype.count = function count(model, callback, where) {
debug('count');
var _this = this;
var promise = r.db(_this.database).table(model);
if(where && typeof where === 'object') {
promise = buildWhere(_this, model, where, promise);
}
promise.count().run()
.catch(callback)
.then(function(count) {
return count;
})
.nodeify(callback);
};
RethinkDB.prototype.updateAttributes = function updateAttrs(model, id, data, cb) {
debug('updateAttributes');
var _this = this;
data.id = id;
Object.keys(data).forEach(function(key) {
if(data[key] === undefined) {
data[key] = null;
}
});
r.db(_this.database).table(model).update(data).run()
.catch(cb)
.then(function() {
return data;
})
.nodeify(cb);
};
RethinkDB.prototype.update = RethinkDB.prototype.updateAll = function update(model, where, data, callback) {
debug('update/updateAll');
var _this = this;
var promise = r.db(_this.database).table(model);
if(where !== undefined) {
promise = buildWhere(_this, model, where, promise);
}
promise.update(data, {
returnChanges: true
}).run()
.catch(callback)
.then(function(result) {
return {
count: result.replaced
};
})
.nodeify(callback);
};
RethinkDB.prototype.disconnect = function() {
debug('disconnect');
};
/*
Some values may require post-processing. Do that here.
*/
function _expandResult(result, keys) {
Object.keys(result).forEach(function(key) {
if(!keys.hasOwnProperty(key)) {
return;
}
if(keys[key].type &&
keys[key].type.name === 'Date' &&
!(result[key] instanceof Date)) {
// Expand date result data, backward compatible
result[key] = moment.unix(result[key]).toDate();
}
});
}
// need to rewrite the function as it does not take into account a different name for the index
function _hasIndex(_this, model, key) {
// Primary key always hasIndex
if(key === 'id') {
return true;
}
var modelDef = _this._models[model];
var retval = (_.isObject(modelDef.properties[key]) && modelDef.properties[key].index) || (_.isObject(modelDef.settings[key]) && modelDef.settings[key].index);
return retval;
}
var operators = {
between: function(key, value) {
return r.row(key).gt(value[0]).and(r.row(key).lt(value[1]));
},
gt: function(key, value) {
return r.row(key).gt(value);
},
lt: function(key, value) {
return r.row(key).lt(value);
},
gte: function(key, value) {
return r.row(key).ge(value);
},
lte: function(key, value) {
return r.row(key).le(value);
},
inq: function(key, value) {
var query = [];
value.forEach(function(v) {
query.push(r.row(key).eq(v));
});
var condition = _.reduce(query, function(sum, qq) {
return sum.or(qq);
});
return condition;
},
nin: function(key, value) {
var query = [];
value.forEach(function(v) {
query.push(r.row(key).ne(v));
});
var condition = _.reduce(query, function(sum, qq) {
return sum.and(qq);
});
return condition;
},
neq: function(key, value) {
return r.row(key).ne(value);
},
like: function(key, value) {
return r.row(key).match(value);
},
nlike: function(key, value) {
return r.row(key).match(value).not();
}
};
function buildWhere(self, model, where, promise) {
if(where === null || (typeof where !== 'object')) {
return promise;
}
var query = buildFilter(where);
if(query) {
return promise.filter(query);
} else {
return promise;
}
}
function buildFilter(where) {
var filter = [];
Object.keys(where).forEach(function(k) {
// determine ifk is field name or condition name
var conditions = ['and', 'or', 'between', 'gt', 'lt', 'gte', 'lte', 'inq', 'nin', 'near', 'neq', 'like', 'nlike'];
var condition = where[k];
if(k === 'and' || k === 'or') {
if(_.isArray(condition)) {
var query = _.map(condition, function(c) {
return buildFilter(c);
});
if(k === 'and') {
filter.push(_.reduce(query, function(s, f) {
return s.and(f);
}));
} else {
filter.push(_.reduce(query, function(s, f) {
return s.or(f);
}));
}
}
} else {
if(_.isObject(condition) && _.intersection(_.keys(condition), conditions).length > 0) {
// k is condition
_.keys(condition).forEach(function(operator) {
if(conditions.indexOf(operator) >= 0) {
filter.push(operators[operator](k, condition[operator]));
}
});
} else {
// k is field equality
filter.push(r.row(k).eq(condition));
}
}
});
return _.reduce(filter, function(s, f) {
return s.and(f);
});
}