kylekatarnls/momentum

View on GitHub
src/momentum-server.js

Summary

Maintainability
F
4 days
Test Coverage
A
100%
const net = require('net');
const express = require('express');
const bodyParser = require('body-parser');
const randomString = require('randomstring');
const fs = require('fs');
const uglify = require('uglify-js');
const mondogdbAdapter = require('./adapter/mongodb');
const MomentumEventEmitter = require('./event/emitter');
const adapters = {
    mongodb: mondogdbAdapter
};
const createAdapterInstance = (...args) => {
    const names = Object.keys(adapters);
    while (names.length) {
        const name = names.shift();
        const AdapterClass = adapters[name];
        if (AdapterClass.isCompatible(...args)) {
            return new AdapterClass(...args);
        }
    }

    return null;
};
const getIpFromRequest = request => {
    return request.headers['x-forwarded-for'] || request.connection.remoteAddress;
};
const eventTypes = {
    updateCollection: 'update-collection',
    updateItem: 'update-item',
    removeCollection: 'remove-collection',
    removeItem: 'remove-item',
    insert: 'insert'
};

class MomentumServer {

    /**
     * Init a momentum server with an adapter (arguments
     * are passed to the adapter constructor).
     *
     * @param args
     */
    constructor(...args) {
        this.isReady = false;
        this.readyPromises = [];
        this.filters = {};
        this.options = {
            groupingDelay: 200,
            maxTokensPerIp: 16,
            maxEventsListeners: 131072,
            timeOut: 120000,
            collectionPrefix: 'mm_'
        };
        this.adapter = createAdapterInstance(...args);
        this.initializeEventsEmitter();
    }

    /**
     * Add a database adapter.
     *
     * @param {string}           name
     * @param {AdapterInterface} adapter
     */
    static addAdapter(name, adapter) {
        adapters[name] = adapter;
    }

    /**
     *
     * @param app
     * @param args
     *
     * @returns {Promise.<MomentumServer>}
     */
    static connect(app, ...args) {
        const momentum = new MomentumServer(...args);

        return momentum.start(app).then(() => {
            return momentum;
        });
    }

    /**
     * Reset events emitter instance.
     */
    initializeEventsEmitter() {
        this.eventsEmitter = null;
    }

    /**
     * Get the current momentum event emitter.
     * Instantiate it if not yet exist.
     *
     * @return {MomentumEventEmitter}
     */
    getEventsEmitter() {
        if (!this.eventsEmitter) {
            this.eventsEmitter = new MomentumEventEmitter();
            this.eventsEmitter.setMaxListeners(this.options.maxEventsListeners);
        }

        return this.eventsEmitter;
    }

    /**
     * Add a callback filter available from the API.
     *
     * @param {string}   filter   filter name
     * @param {Function} callback callback function to be called on
     *                            filter name requested
     */
    addFilter(filter, callback) {
        this.filters[filter] = callback;
    }

    /**
     * Return the callback function for a given filter name.
     *
     * @param {string} filter
     *
     * @return {Function|null}
     */
    getFilter(filter) {
        return this.filters[filter] || null;
    }

    /**
     * Return the authorization strategy.
     *
     * @return {Function<Promise>}
     */
    getAuthorizationStrategy() {
        return this.authorizationStrategy || (() => new Promise(resolve => {
            resolve(true);
        }));
    }

    /**
     * Set/replace the authorization strategy.
     * It must be a function that return a promise that resolve true or false.
     *
     * @param {Function} authorizationStrategy
     *
     * @return this
     */
    setAuthorizationStrategy(authorizationStrategy) {
        this.authorizationStrategy = authorizationStrategy;

        return this;
    }

    /**
     * Return a promise that will resolve true if the action is authorized
     * by the authorization strategy, and resolve false else.
     *
     * @param {string} mode
     * @param {string} method
     * @param {Array}  args
     * @param {Object} request
     * @param {Object} response
     *
     * @throws Error if authorization strategy is not a function or return a wrong value.
     *
     * @return {Promise}
     */
    isAllowed(mode, method, args, request, response) {
        const strategy = this.getAuthorizationStrategy();

        if (typeof strategy !== 'function') {
            throw new Error('Authorization strategy must be a function');
        }

        const authorization = strategy(mode, method, args, request, response);

        if (typeof authorization === 'boolean') {
            return new Promise(resolve => {
                resolve(authorization);
            });
        }

        if (authorization instanceof Promise) {
            return authorization;
        }

        throw new Error('Authorization strategy must return true, false or a promise');
    }

    /**
     * Change the URL prefix that will come between the host URL
     * and the route.
     *
     * @param {string} urlPrefix
     *
     * @return this
     */
    setUrlPrefix(urlPrefix) {
        this.urlPrefix = urlPrefix;

        return this;
    }

    /**
     * Return the current URL prefix.
     *
     * @returns {string}
     */
    getUrlPrefix() {
        return this.urlPrefix || '/api/mm/';
    }

    /**
     * Link an express app to the momentum server to
     * serve the momentum API from this app.
     *
     * @param {Object} app
     */
    linkApplication(app) {
        this.linkedApp = app;
    }

    /**
     * Set the application listen port.
     *
     * @param {int} appPort
     *
     * @return this
     */
    setApplicationPort(appPort) {
        this.appPort = appPort;

        return this;
    }

    /**
     * Invalidate tokens that match the given filter.
     *
     * @param {Object} filter
     *
     * @return {Promise}
     */
    invalidateTokens(filter) {
        const tokens = this.options.collectionPrefix + 'tokens';

        return this.remove(tokens, filter);
    }

    /**
     * Return a promise that will resolve true if the given token
     * is valid, and resolve false else.
     *
     * @param {string} token
     *
     * @return {Promise.<boolean>}
     */
    isTokenValid(token) {
        const tokens = this.options.collectionPrefix + 'tokens';
        return this.count(tokens, {token}).then(count => {
            return count > 0;
        });
    }

    /**
     * Add a GET route with the url prefix.
     */
    addRoute(route, callback) {
        this.app.get(this.getUrlPrefix() + route, callback);
    }

    /**
     * Add a POST route with the url prefix and the JSON body-parser.
     */
    addJsonRoute(route, callback) {
        this.app.post(this.getUrlPrefix() + route, bodyParser.json(), callback);
    }

    /**
     * Add the /ready route to the momentum server app.
     */
    addReadyRoute() {
        this.addRoute('ready', (request, response) => {
            const readyCallback = () => {
                const ip = getIpFromRequest(request);
                const tokens = this.options.collectionPrefix + 'tokens';
                this.count(tokens, {ip}).then(count => {
                    if (count >= this.options.maxTokensPerIp) {
                        response.status(429).json({
                            error: 'Too many connections'
                        });

                        return;
                    }

                    const time = (new Date()).getTime();
                    const start = time - 2 * this.options.timeOut;
                    this.remove(tokens, {updatedAt: {$lt: start}}).then(() => {
                        const token = {
                            token: randomString.generate({
                                charset: 'alphanumeric',
                                length: 24
                            }),
                            updatedAt: time,
                            ip
                        };
                        this.insertOne(tokens, token).then(() => {
                            response.status(200).json({
                                status: 'success',
                                token: token.token
                            });
                        });
                    });
                });
            };
            if (this.isReady) {
                readyCallback();

                return;
            }

            const readyPromise = new Promise(resolve => {
                this.readyPromises.push(resolve);
            });
            readyPromise.then(readyCallback);
        });
    }

    /**
     * Add the /quit route to the momentum server app.
     */
    addQuitRoute() {
        this.addRoute('quit', (request, response) => {
            const token = request.query.token;

            this.isTokenValid(token).then(valid => {
                if (!valid) {
                    response.status(500).json({
                        error: 'Invalid token ' + token
                    });

                    return;
                }

                this.invalidateTokens({token}).then(() => {
                    response.status(200).json({status: 'success'});
                });
            });
        });
    }

    /**
     * Add the /on route to the momentum server app.
     */
    addOnRoute() {
        this.addRoute('on', (request, response) => {
            request.setTimeout(0);
            let end;
            let timeout = setTimeout(() => {
                response.status(200).json({events: []});
                end();
            }, this.options.timeOut);
            let group = null;
            const token = request.query.token;
            const eventsCollection = this.options.collectionPrefix + 'events';
            const off = this.on('listen:' + token, (collection, id, filter, ...args) => {
                clearTimeout(timeout);
                this.insertOne(eventsCollection, {
                    token,
                    listen: {collection, id, filter},
                    args: JSON.stringify(args)
                }).then(() => {
                    if (!group) {
                        group = setTimeout(() => {
                            this.find(eventsCollection, {token}).then(events => {
                                off();
                                if (events && !response.headersSent) {
                                    response.status(200).json({
                                        events: events.map(event => {
                                            event.args = JSON.parse(event.args);
                                            event.args.push(this.getItemId(event));

                                            return event;
                                        })
                                    });
                                    end();
                                    const ids = events.map(event => this.getItemId(event));
                                    this.remove(eventsCollection, this.getFilterFromItemId({$in: ids}));
                                }
                            }).catch(off);
                        }, this.options.groupingDelay);
                    }
                });
            });
            end = () => {
                setTimeout(off, this.options.timeOut / 4);
            };
        });
    }

    /**
     * Add the /listen route to the momentum server app.
     */
    addListenRoute() {
        this.addJsonRoute('listen', (request, response) => {
            const token = request.body.token;
            const collection = request.body.collection;
            if (!collection) {
                response.status(400).json({
                    error: 'Missing collection name'
                });

                return;
            }

            this.isTokenValid(token).then(valid => {
                if (!valid) {
                    response.status(500).json({
                        error: 'Invalid token ' + token
                    });

                    return;
                }

                const filter = request.body.filter;
                let handler;
                if (filter) {
                    handler = this.getFilter(filter);
                    if (!handler) {
                        response.status(400).json({
                            error: 'Unknown filter ' + filter
                        });

                        return;
                    }
                }
                let off;
                const id = request.body.id;
                const listener = (...args) => {
                    if (!filter) {
                        this.emit('listen:' + token, collection, id, filter, ...args);

                        return;
                    }

                    handler(args).then(newArgs => {
                        this.emit('listen:' + token, collection, id, filter, ...newArgs);
                    });
                };
                const check = () => {
                    setTimeout(() => {
                        this.isTokenValid(token).then(valid => {
                            (valid ? check : off)();
                        });
                    }, this.options.timeOut);
                };
                check();
                if (id) {
                    off = this.onItemTouched(collection, id, listener);
                    response.status(200).json({status: 'success'});

                    return;
                }

                off = this.onCollectionTouched(collection, listener);
                response.status(200).json({status: 'success'});

                this.on('listen-stop:' + token + ',' + JSON.stringify([collection, id || '', filter || '']), off);
            });
        });
    }

    /**
     * Add the /listen/stop route to the momentum server app.
     */
    addListenStopRoute() {
        this.addJsonRoute('listen/stop', (request, response) => {
            const token = request.body.token;
            const collection = request.body.collection;
            if (!collection) {
                response.status(400).json({
                    error: 'Missing collection name'
                });

                return;
            }

            this.isTokenValid(token).then(valid => {
                if (!valid) {
                    response.status(500).json({
                        error: 'Invalid token ' + token
                    });

                    return;
                }

                const filter = request.body.filter;
                const id = request.body.id;
                this.emit('listen-stop:' + token + ',' + JSON.stringify([collection, id || '', filter || '']));

                response.status(200).json({status: 'success'});
            });
        });
    }

    /**
     * Proxy request from HTTP API to database.
     *
     * @param {string}   mode
     * @param {Object}   request
     * @param {Object}   response
     * @param {Array}    allowedMethods
     * @param {Function} transform
     */
    proxyDataBaseRequest(mode, request, response, allowedMethods, transform) {
        const method = request.body.method;
        const args = request.body.args;
        const token = request.body.token;

        this.isTokenValid(token).then(valid => {
            if (!valid) {
                response.status(500).json({
                    error: 'Invalid token ' + token
                });

                return;
            }

            const end = (status, data) => {
                const json = Object.assign({
                    args,
                    method
                }, data);

                if (method === 'insertOne') {
                    args.push(this.getItemId(args[1]));
                }

                response.status(status).json(json);
            };

            if (allowedMethods.indexOf(method) === -1) {
                end(400, {
                    error: method + ' method unknown'
                });

                return;
            }

            if (typeof request.body.args !== 'object' || request.body.args.length < 1) {
                end(403, {
                    error: 'Arguments cannot be empty'
                });

                return;
            }

            this.isAllowed(mode, method, args, request, response).then(isAllowed => {
                if (!isAllowed) {
                    end(403, {
                        error: method + ' not allowed with ' + JSON.stringify(args)
                    });

                    return;
                }

                return this[method](...args).then(result => transform(result));
            })
            .catch(error => ({
                error: error + ''
            }))
            .then(result => {
                if (result) {
                    // JSON stringify and parse remove all database dynamic properties
                    end(result.error ? 500 : 200, JSON.parse(JSON.stringify(result)));
                }
            });
        });
    }

    /**
     * Add the /emit route to the momentum server app.
     */
    addEmitRoute() {
        this.addJsonRoute('emit', (request, response) => {
            this.proxyDataBaseRequest('emit', request, response, [
                'insertOne',
                'insertMany',
                'updateOne',
                'updateMany',
                'remove'
            ], result => {
                const data = Object.assign({
                    result: result.result
                }, result);

                ['message', 'ops', 'connection'].forEach(key => {
                    delete data[key];
                });

                return data;
            });
        });
    }

    /**
     * Add the /emit route to the momentum server app.
     */
    addDataRoute() {
        this.addJsonRoute('data', (request, response) => {
            this.proxyDataBaseRequest('data', request, response, [
                'findOne',
                'find',
                'count'
            ], result => ({result}));
        });
    }

    /**
     * Start the momentum server (start the adapter and
     * start the API to listen the needed routes).
     *
     * @param {int|Object} app optional app to link or port to listen
     */
    start(app = null) {
        let appPort = null;
        if (!isNaN(app)) {
            appPort = app;
            app = null;
        }
        this.stop();
        this.setApplicationPort(appPort);
        this.linkApplication(app);
        this.server = null;
        this.app = this.linkedApp || this.startServer();
        this.addReadyRoute();
        this.addQuitRoute();
        this.addOnRoute();
        this.addDataRoute();
        this.addListenRoute();
        this.addListenStopRoute();
        this.addEmitRoute();
        this.initializeEventsEmitter();
        const start = new Promise(resolve => {
            let startAdapter, broadcastLibrary;
            const end = () => {
                if (startAdapter && broadcastLibrary) {
                    resolve();
                }
            };
            this.startAdapter().then(() => {
                startAdapter = true;
                end();
            });
            this.broadcastLibrary().then(() => {
                broadcastLibrary = true;
                end();
            });
        });
        start.then(() => {
            this.isReady = true;
            this.readyPromises.forEach(promise => {
                promise();
            });
            this.readyPromises = [];
        });

        return start;
    }

    /**
     * Minify and broadcast the library on /momentum.js route.
     */
    broadcastLibrary() {
        return new Promise(resolve => {
            fs.readFile(__dirname + '/../lib/momentum.js', 'utf8', (err, data) => {
                const result = uglify.minify({
                    'momentum.js': data
                }, {
                    sourceMap: {
                        filename: 'momentum.js',
                        url: './momentum.js.map'
                    }
                });
                this.addRoute('momentum.js', (request, response) => {
                    response.set('Content-Type', 'application/javascript; charset=utf-8');
                    response.send(data);
                });
                this.addRoute('momentum.min.js', (request, response) => {
                    response.set('Content-Type', 'application/javascript; charset=utf-8');
                    response.send(result.code);
                });
                this.addRoute('momentum.js.map', (request, response) => {
                    response.set('Content-Type', 'application/json; charset=utf-8');
                    response.send(result.map);
                });
                resolve();
            });
        });
    }

    /**
     * Start the database adapter.
     */
    startAdapter() {
        return this.adapter.start();
    }

    /**
     * Stop the database adapter.
     */
    stopAdapter() {
        return new Promise(resolve => {
            if (this.adapter) {
                const stopPromise = this.adapter.stop();
                if (stopPromise instanceof Promise) {
                    stopPromise.then(resolve).catch(resolve);

                    return;
                }
            }

            resolve();
        });
    }

    /**
     * Start a stand-alone express server.
     */
    startServer() {
        const expressApp = express();
        this.server = expressApp.listen(this.appPort);

        return expressApp;
    }

    /**
     * Stop the express server (if stand-alone).
     */
    stopServer() {
        return new Promise(resolve => {
            if (this.server) {
                this.server.close(() => {
                    resolve();
                });

                return;
            }

            resolve();
        });
    }

    /**
     * Stop the adapter and the momentum server.
     */
    stop() {
        this.isReady = false;

        return new Promise(resolve => {
            let adapterStopped = !this.adapter;
            let serverStopped = !this.server;
            const next = () => {
                if (adapterStopped && serverStopped) {
                    resolve();
                }
            };
            this.stopAdapter().then(() => {
                adapterStopped = true;
                next();
            });
            this.stopServer().then(() => {
                serverStopped = true;
                next();
            });
        });
    }

    /**
     * Listen events and return a function to stop listening with
     * calling it.
     *
     * @param {Array} events
     * @param {Array} args
     *
     * @returns {Function}
     */
    on(events, ...args) {
        if (!events) {
            throw new Error('event must be a string or an array');
        }

        if (!events.forEach) {
            events = [events];
        }

        events.forEach(event => {
            this.getEventsEmitter().on(event, ...args);
        });

        return () => {
            events.forEach(event => {
                this.getEventsEmitter().removeListener(event, ...args);
            });
        };
    }

    /**
     * Listen a momentum event from the eventTypes list.
     *
     * @param {string} eventKey
     * @param {string} eventParam
     * @param {Array}  args
     *
     * @returns {Function}
     */
    onEvent(eventKey, eventParam, args) {
        return this.on(eventTypes[eventKey] + ':' + eventParam, ...args);
    }

    /**
     * Listen events that touch a given collection
     * (update, remove, insert).
     *
     * @param {string} collection
     * @param {Array}  args
     *
     * @returns {Function}
     */
    onCollectionTouched(collection, ...args) {
        const offCollectionUpdate = this.onCollectionUpdate(collection, ...args);
        const offCollectionRemove = this.onCollectionRemove(collection, ...args);
        const offInsert = this.onInsert(collection, ...args);

        return () => {
            offCollectionUpdate();
            offCollectionRemove();
            offInsert();
        };
    }

    /**
     * Listen events that touch a given collection
     * item (update, remove).
     *
     * @param {string} collection
     * @param {string} item
     * @param {Array}  args
     *
     * @returns {Function}
     */
    onItemTouched(collection, item, ...args) {
        const offItemRemove = this.onItemRemove(collection, item, ...args);
        const offItemUpdate = this.onItemUpdate(collection, item, ...args);

        return () => {
            offItemRemove();
            offItemUpdate();
        };
    }

    /**
     * Listen collection update event.
     *
     * @param {string} collection
     * @param {Array}  args
     *
     * @returns {Function}
     */
    onCollectionUpdate(collection, ...args) {
        return this.onEvent('updateCollection', collection, args);
    }

    /**
     * Listen collection item update event.
     *
     * @param {string} collection
     * @param {string} item
     * @param {Array}  args
     *
     * @returns {Function}
     */
    onItemUpdate(collection, item, ...args) {
        return this.onEvent('updateItem', collection + ':' + item, args);
    }

    /**
     * Listen collection remove event.
     *
     * @param {string} collection
     * @param {Array}  args
     *
     * @returns {Function}
     */
    onCollectionRemove(collection, ...args) {
        return this.onEvent('removeCollection', collection, args);
    }

    /**
     * Listen collection item remove event.
     *
     * @param {string} collection
     * @param {string} item
     * @param {Array}  args
     *
     * @returns {Function}
     */
    onItemRemove(collection, item, ...args) {
        return this.onEvent('removeItem', collection + ':' + item, args);
    }

    /**
     * Listen collection insert event.
     *
     * @param {string} collection
     * @param {Array}  args
     *
     * @returns {Function}
     */
    onInsert(collection, ...args) {
        return this.onEvent('insert', collection, args);
    }

    /**
     * Emit an event.
     *
     * @param {Array} args
     *
     * @returns this
     */
    emit(...args) {
        return this.getEventsEmitter().emit(...args);
    }

    /**
     * Emit a momentum event from eventTypes list.
     *
     * @param {string} eventKey
     * @param {Object} eventParam
     * @param {Array}  args
     *
     * @returns {*}
     */
    emitEvent(eventKey, eventParam, ...args) {
        const event = eventTypes[eventKey];

        return this.emit(event + ':' + eventParam, event, ...args);
    }

    /**
     * Emit an error event.
     *
     * @param {string} eventKey
     * @param {Object} eventParam
     * @param {Array}  args
     *
     * @returns {*}
     */
    emitError(eventKey, eventParam, ...args) {
        const event = eventTypes[eventKey] + '-error';

        return this.emit(event + ':' + eventParam, event, ...args);
    }

    /**
     * Emit event/error for each item id.
     *
     * @param {Array}  ids
     * @param {string} method
     * @param {string} itemEvent
     * @param {string} collection
     * @param {Object} event
     */
    emitForEachItem(ids, method, itemEvent, collection, event) {
        if (!method) {
            throw new Error('no method');
        }
        ids.forEach(id => {
            this[method](itemEvent, collection + ':' + id, Object.assign({
                id
            }, event));
        });
    }

    /**
     * Remove items.
     *
     * @param {string} collection
     * @param {Object} filter
     * @param {Object} options
     *
     * @returns {Promise}
     */
    remove(collection, filter, options) {
        return new Promise((resolve, reject) => {
            this.find(collection, filter).then(objects => {
                const ids = objects.map(obj => this.getItemId(obj));
                const promise = this.callAdapter('remove', collection, filter, options);
                const callback = method => result => {
                    const event = {
                        name: 'remove',
                        collection,
                        filter,
                        options,
                        result
                    };
                    this[method]('removeCollection', collection, Object.assign({
                        ids
                    }, event));
                    this.emitForEachItem(ids, method, 'removeItem', collection, event);
                };
                promise
                    .then(callback('emitEvent'))
                    .catch(callback('emitError'));

                resolve(promise);
            }).catch(reject);
        });
    }

    /**
     * Call adapter method if ready, else return a dead promise.
     *
     * @param {string} method
     * @param {Array}  args
     *
     * @returns {Promise}
     */
    callAdapter(method, ...args) {
        if (this.isReady) {
            return this.adapter[method](...args);
        }

        return new Promise(() => {});
    }

    /**
     * Call a write method on the adapter and emit
     * corresponding events.
     *
     * @param {string} method
     * @param {Array}  args
     * @param {Object} info
     * @param {Array}  events
     *
     * @returns {Promise}
     */
    callWithEvents(method, args, info, events) {
        const promise = this.callAdapter(method, ...args);
        const callback = emitFunction => result => {
            events.forEach(event => {
                emitFunction.call(this, ...event, info, method, result);
            });
        };
        promise
            .then(callback(this.emitEvent))
            .catch(callback(this.emitError));

        return promise;
    }

    /**
     * Insert one or many item(s).
     *
     * @param {string}       collection
     * @param {Object|Array} document
     * @param {Object}       options
     * @param {string}       type
     *
     * @returns {Promise}
     */
    insert(collection, document, options, type) {
        if (!type) {
            type = document instanceof Array ? 'insertMany' : 'insertOne';
        }

        return this.callWithEvents(
            type, [
                collection,
                document,
                options
            ], {
                name: 'insert',
                collection,
                [type === 'insertMany' ? 'items' : 'item']: document,
                options
            }, [
                ['insert', collection]
            ]
        );
    }

    /**
     * Insert one item.
     *
     * @param {string} collection
     * @param {Object} document
     * @param {Object} options
     *
     * @returns {Promise}
     */
    insertOne(collection, document, options) {
        return this.insert(collection, document, options, 'insertOne');
    }

    /**
     * Insert one item.
     *
     * @param {string} collection
     * @param {Array}  documents
     * @param {Object} options
     *
     * @returns {Promise}
     */
    insertMany(collection, documents, options) {
        return this.insert(collection, documents, options, 'insertMany');
    }

    /**
     * Update one item.
     *
     * @param {string} collection
     * @param {Object} filter
     * @param {Object} update
     * @param {Object} options
     *
     * @returns {Promise}
     */
    updateOne(collection, filter, update, options) {
        return this.findOne(collection, filter).then(item => {
            if (!item) {
                throw new Error(JSON.stringify([collection, filter]) + ' not found');
            }

            const id = this.getItemId(item);
            return this.callWithEvents(
                'updateOne', [
                    collection,
                    filter,
                    update,
                    options
                ], {
                    name: 'update',
                    collection,
                    item,
                    id,
                    filter,
                    update,
                    options
                }, [
                    ['updateCollection', collection],
                    ['updateItem', collection + ':' + id]
                ]
            );
        });
    }

    /**
     * Update many items.
     *
     * @param {string} collection
     * @param {Object} filter
     * @param {Object} update
     * @param {Object} options
     *
     * @returns {Promise}
     */
    updateMany(collection, filter, update, options) {
        return new Promise((resolve, reject) => {
            this.find(collection, filter).then(objects => {
                const ids = objects.map(obj => this.getItemId(obj));
                const promise = this.callAdapter('updateMany', collection, filter, update, options);
                const callback = method => result => {
                    const event = {
                        name: 'update',
                        collection,
                        update,
                        filter,
                        options,
                        result
                    };
                    this[method]('updateCollection', collection, Object.assign({
                        ids
                    }, event));
                    this.emitForEachItem(ids, method, 'updateItem', collection, event);
                };
                promise
                    .then(callback('emitEvent'))
                    .catch(callback('emitError'));

                resolve(promise);
            }).catch(reject);
        });
    }

    /**
     * Return id for a given item.
     *
     * @param {Object} item
     *
     * @returns {string|int}
     */
    getItemId(item) {
        return this.adapter.getItemId(item);
    }

    /**
     * Return filter object for a given id.
     *
     * @param {string|int} itemId
     *
     * @returns {Object}
     */
    getFilterFromItemId(itemId) {
        return this.adapter.getFilterFromItemId(itemId);
    }

    /**
     * Count items.
     *
     * @param args
     *
     * @returns {Promise}
     */
    count(...args) {
        return this.callAdapter('count', ...args);
    }

    /**
     * Find items.
     *
     * @param args
     *
     * @returns {Promise}
     */
    find(...args) {
        return this.callAdapter('find', ...args);
    }

    /**
     * Find one item.
     *
     * @param args
     *
     * @returns {Promise}
     */
    findOne(...args) {
        return this.callAdapter('findOne', ...args);
    }
}

module.exports = MomentumServer;