moleculerjs/moleculer

View on GitHub
src/service.js

Summary

Maintainability
A
0 mins
Test Coverage
/*
 * moleculer
 * Copyright (c) 2020 MoleculerJS (https://github.com/moleculerjs/moleculer)
 * MIT Licensed
 */

"use strict";

const _ = require("lodash");
const { ServiceSchemaError, MoleculerError } = require("./errors");
const { isObject, isFunction, flatten, functionArguments, deprecate, uniq } = require("./utils");

/**
 * Wrap a handler Function to an object with a `handler` property.
 *
 * @param {Function|Object} o
 * @returns {Object}
 */
function wrapToHandler(o) {
    return isFunction(o) ? { handler: o } : o;
}

/**
 * Wrap any value to an array.
 * @param {any} o
 * @returns {Array}
 */
function wrapToArray(o) {
    return Array.isArray(o) ? o : [o];
}

function isNewSignature(args) {
    return args.length > 0 && ["ctx", "context"].indexOf(args[0].toLowerCase()) !== -1;
}

/**
 * Service class
 *
 * @class Service
 */
class Service {
    /**
     * Creates an instance of Service by schema.
     *
     * @param {ServiceBroker}     broker    broker of service
     * @param {Object}             schema    schema of service
     * @param {any=}             schemaMods    Modified schema
     *
     * @memberof Service
     */
    constructor(broker, schema, schemaMods) {
        if (!isObject(broker)) throw new ServiceSchemaError("Must set a ServiceBroker instance!");

        this.broker = broker;

        if (broker) this.Promise = broker.Promise;
        if (schemaMods) {
            deprecate(
                "schemaMods",
                "Using 'schemaMods' parameter in 'broker.createService' is deprecated. Use 'mixins' instead."
            );
            schema = this.mergeSchemas(schema, schemaMods);
        }
        if (schema) this.parseServiceSchema(schema);
    }

    /**
     * Parse Service schema & register as local service
     *
     * @param {Object} schema of Service
     */
    parseServiceSchema(schema) {
        if (!isObject(schema))
            throw new ServiceSchemaError(
                "The service schema can't be null. Maybe is it not a service schema?"
            );

        this.originalSchema = _.cloneDeep(schema);

        if (schema.mixins) {
            schema = this.applyMixins(schema);
        }

        if (isFunction(schema.merged)) {
            schema.merged.call(this, schema);
        } else if (Array.isArray(schema.merged)) {
            schema.merged.forEach(fn => fn.call(this, schema));
        }

        this.broker.callMiddlewareHookSync("serviceCreating", [this, schema]);

        if (!schema.name) {
            /* eslint-disable-next-line no-console */
            console.error(
                "Service name can't be empty! Maybe it is not a valid Service schema. Maybe is it not a service schema?",
                { schema }
            );
            throw new ServiceSchemaError(
                "Service name can't be empty! Maybe it is not a valid Service schema. Maybe is it not a service schema?",
                { schema }
            );
        }

        this.name = schema.name;
        this.version = schema.version;
        this.settings = schema.settings || {};
        this.metadata = schema.metadata || {};
        this.schema = schema;

        this.fullName = Service.getVersionedFullName(
            this.name,
            this.settings.$noVersionPrefix !== true ? this.version : undefined
        );

        this.logger = this.broker.getLogger(this.fullName, {
            svc: this.name,
            ver: this.version
        });

        this.actions = {}; // external access to actions
        this.events = {}; // external access to event handlers.

        // Service item for Registry
        const serviceSpecification = {
            name: this.name,
            version: this.version,
            fullName: this.fullName,
            settings: this._getPublicSettings(this.settings),
            metadata: this.metadata,
            actions: {},
            events: {}
        };

        // Register methods
        if (isObject(schema.methods)) {
            _.forIn(schema.methods, (method, name) => {
                /* istanbul ignore next */
                if (
                    [
                        "name",
                        "version",
                        "settings",
                        "metadata",
                        "dependencies",
                        "schema",
                        "broker",
                        "actions",
                        "logger",
                        "created",
                        "started",
                        "stopped",
                        "_start",
                        "_stop",
                        "_init",
                        "applyMixins"
                    ].indexOf(name) !== -1 ||
                    name.startsWith("mergeSchema")
                ) {
                    throw new ServiceSchemaError(
                        `Invalid method name '${name}' in '${this.name}' service!`
                    );
                }

                this._createMethod(method, name);
            });
        }

        // Register actions
        if (isObject(schema.actions)) {
            _.forIn(schema.actions, (action, name) => {
                if (action === false) return;

                let innerAction = this._createAction(action, name);

                serviceSpecification.actions[innerAction.name] = innerAction;

                const wrappedHandler = this.broker.middlewares.wrapHandler(
                    "localAction",
                    innerAction.handler,
                    innerAction
                );

                // Expose to be callable as `this.actions.find({ ...params })`
                const ep = this.broker.registry.createPrivateActionEndpoint(innerAction);
                this.actions[name] = (params, opts) => {
                    let ctx;
                    if (opts && opts.ctx) {
                        // Reused context (in case of retry)
                        ctx = opts.ctx;
                    } else {
                        ctx = this.broker.ContextFactory.create(
                            this.broker,
                            ep,
                            params,
                            opts || {}
                        );
                    }
                    return wrappedHandler(ctx);
                };
            });
        }

        // Event subscriptions
        if (isObject(schema.events)) {
            _.forIn(schema.events, (event, name) => {
                const innerEvent = this._createEvent(event, name);
                serviceSpecification.events[innerEvent.name] = innerEvent;

                // Expose to be callable as `this.events[''](params, opts);
                this.events[innerEvent.name] = (params, opts) => {
                    let ctx;
                    if (opts && opts.ctx) {
                        // Reused context (in case of retry)
                        ctx = opts.ctx;
                    } else {
                        const ep = {
                            id: this.broker.nodeID,
                            event: innerEvent
                        };
                        ctx = this.broker.ContextFactory.create(
                            this.broker,
                            ep,
                            params,
                            opts || {}
                        );
                    }
                    ctx.eventName = name;
                    ctx.eventType = "emit";
                    ctx.eventGroups = [innerEvent.group || this.name];

                    return innerEvent.handler(ctx);
                };
            });
        }

        this._serviceSpecification = serviceSpecification;

        // Initialize
        this._init();
    }

    /**
     * Return a service settings without protected properties.
     *
     * @param {Object?} settings
     */
    _getPublicSettings(settings) {
        if (settings && Array.isArray(settings.$secureSettings)) {
            return _.omit(settings, [].concat(settings.$secureSettings, ["$secureSettings"]));
        }

        return settings;
    }

    /**
     * Initialize service. It called `created` handler in schema
     *
     * @private
     * @memberof Service
     */
    _init() {
        this.logger.debug(`Service '${this.fullName}' is creating...`);
        if (isFunction(this.schema.created)) {
            this.schema.created.call(this);
        } else if (Array.isArray(this.schema.created)) {
            this.schema.created.forEach(fn => fn.call(this));
        }

        this.broker.addLocalService(this);

        this.broker.callMiddlewareHookSync("serviceCreated", [this]);

        this.logger.debug(`Service '${this.fullName}' created.`);
    }

    /**
     * Start service
     *
     * @returns {Promise}
     * @private
     * @memberof Service
     */
    _start() {
        this.logger.debug(`Service '${this.fullName}' is starting...`);
        return this.Promise.resolve()
            .then(() => {
                return this.broker.callMiddlewareHook("serviceStarting", [this]);
            })
            .then(() => {
                // Wait for dependent services
                if (this.schema.dependencies)
                    return this.waitForServices(
                        this.schema.dependencies,
                        this.settings.$dependencyTimeout || this.broker.options.dependencyTimeout,
                        this.settings.$dependencyInterval || this.broker.options.dependencyInterval
                    );
            })
            .then(() => {
                if (isFunction(this.schema.started))
                    return this.Promise.method(this.schema.started).call(this);

                if (Array.isArray(this.schema.started)) {
                    return this.schema.started
                        .map(fn => this.Promise.method(fn.bind(this)))
                        .reduce((p, fn) => p.then(() => fn()), this.Promise.resolve());
                }
            })
            .then(() => {
                // Register service
                return this.broker.registerLocalService(this._serviceSpecification);
            })
            .then(() => {
                return this.broker.callMiddlewareHook("serviceStarted", [this]);
            })
            .then(() => this.logger.info(`Service '${this.fullName}' started.`));
    }

    /**
     * Stop service
     *
     * @returns {Promise}
     * @private
     * @memberof Service
     */
    _stop() {
        this.logger.debug(`Service '${this.fullName}' is stopping...`);
        return this.Promise.resolve()
            .then(() => {
                return this.broker.callMiddlewareHook("serviceStopping", [this], { reverse: true });
            })
            .then(() => {
                if (isFunction(this.schema.stopped))
                    return this.Promise.method(this.schema.stopped).call(this);

                if (Array.isArray(this.schema.stopped)) {
                    const arr = Array.from(this.schema.stopped).reverse();
                    return arr
                        .map(fn => this.Promise.method(fn.bind(this)))
                        .reduce((p, fn) => p.then(() => fn()), this.Promise.resolve());
                }

                return this.Promise.resolve();
            })
            .then(() => {
                return this.broker.callMiddlewareHook("serviceStopped", [this], { reverse: true });
            })
            .then(() => this.logger.info(`Service '${this.fullName}' stopped.`));
    }

    /**
     * Create an external action handler for broker (internal command!)
     *
     * @param {Object|Function} actionDef
     * @param {String} name
     * @returns {Object}
     *
     * @private
     * @memberof Service
     */
    _createAction(actionDef, name) {
        let action;
        if (isFunction(actionDef)) {
            // Wrap to an object
            action = {
                handler: actionDef
            };
        } else if (isObject(actionDef)) {
            action = _.cloneDeep(actionDef);
        } else {
            throw new ServiceSchemaError(
                `Invalid action definition in '${name}' action in '${this.fullName}' service!`
            );
        }

        let handler = action.handler;
        if (!isFunction(handler)) {
            throw new ServiceSchemaError(
                `Missing action handler on '${name}' action in '${this.fullName}' service!`
            );
        }

        action.rawName = action.name || name;
        if (this.settings.$noServiceNamePrefix !== true)
            action.name = this.fullName + "." + action.rawName;
        else action.name = action.rawName;

        if (action.cache === undefined && this.settings.$cache !== undefined) {
            action.cache = this.settings.$cache;
        }

        action.service = this;
        action.handler = this.Promise.method(handler.bind(this));

        return action;
    }

    /**
     * Create an internal service method.
     *
     * @param {Object|Function} methodDef
     * @param {String} name
     * @returns {Object}
     */
    _createMethod(methodDef, name) {
        let method;
        if (isFunction(methodDef)) {
            // Wrap to an object
            method = {
                handler: methodDef
            };
        } else if (isObject(methodDef)) {
            method = methodDef;
        } else {
            throw new ServiceSchemaError(
                `Invalid method definition in '${name}' method in '${this.fullName}' service!`
            );
        }

        if (!isFunction(method.handler)) {
            throw new ServiceSchemaError(
                `Missing method handler on '${name}' method in '${this.fullName}' service!`
            );
        }

        method.name = name;
        method.service = this;
        method.handler = method.handler.bind(this);

        this[name] = this.broker.middlewares.wrapHandler("localMethod", method.handler, method);

        return method;
    }

    /**
     * Create an event subscription for broker
     *
     * @param {Object|Function} eventDef
     * @param {String} name
     * @returns {Object}
     *
     * @private
     * @memberof Service
     */
    _createEvent(eventDef, name) {
        let event;
        if (isFunction(eventDef) || Array.isArray(eventDef)) {
            event = {
                handler: eventDef
            };
        } else if (isObject(eventDef)) {
            event = _.cloneDeep(eventDef);
        } else {
            throw new ServiceSchemaError(
                `Invalid event definition in '${name}' event in '${this.fullName}' service!`
            );
        }

        if (!isFunction(event.handler) && !Array.isArray(event.handler)) {
            throw new ServiceSchemaError(
                `Missing event handler on '${name}' event in '${this.fullName}' service!`
            );
        }

        // Detect new or legacy parameter list of event handler
        // Legacy: handler(payload, sender, eventName)
        // New: handler(ctx)
        let handler;
        if (isFunction(event.handler)) {
            const args = functionArguments(event.handler);
            handler = this.Promise.method(event.handler);
            handler.__newSignature = event.context === true || isNewSignature(args);
        } else if (Array.isArray(event.handler)) {
            handler = event.handler.map(h => {
                const args = functionArguments(h);
                h = this.Promise.method(h);
                h.__newSignature = event.context === true || isNewSignature(args);
                return h;
            });
        }

        if (!event.name) event.name = name;

        event.service = this;
        const self = this;
        if (isFunction(handler)) {
            // Call single handler
            event.handler = function (ctx) {
                return handler.apply(
                    self,
                    handler.__newSignature ? [ctx] : [ctx.params, ctx.nodeID, ctx.eventName, ctx]
                );
            };
        } else if (Array.isArray(handler)) {
            // Call multiple handler
            event.handler = function (ctx) {
                return self.Promise.all(
                    handler.map(fn =>
                        fn.apply(
                            self,
                            fn.__newSignature ? [ctx] : [ctx.params, ctx.nodeID, ctx.eventName, ctx]
                        )
                    )
                );
            };
        }

        return event;
    }

    /**
     * Call a local event handler. Useful for unit tests.
     *
     * @param {String} eventName
     * @param {any?} params
     * @param {Object?} opts
     */
    emitLocalEventHandler(eventName, params, opts) {
        if (!this.events[eventName])
            return Promise.reject(
                new MoleculerError(
                    `No '${eventName}' registered local event handler`,
                    500,
                    "NOT_FOUND_EVENT",
                    { eventName }
                )
            );

        return this.events[eventName](params, opts);
    }

    /**
     * Getter of current Context.
     * @returns {Context?}
     *
     * @memberof Service
     *
    get currentContext() {
        return this.broker.getCurrentContext();
    }*/

    /**
     * Setter of current Context
     *
     * @memberof Service
     *
    set currentContext(ctx) {
        this.broker.setCurrentContext(ctx);
    }*/

    /**
     * Wait for other services
     *
     * @param {String|Array<String>} serviceNames
     * @param {Number} timeout Timeout in milliseconds
     * @param {Number} interval Check interval in milliseconds
     * @returns {Promise}
     * @memberof Service
     */
    waitForServices(serviceNames, timeout, interval) {
        return this.broker.waitForServices(serviceNames, timeout, interval, this.logger);
    }

    /**
     * Apply `mixins` list in schema. Merge the schema with mixins schemas. Returns with the mixed schema
     *
     * @param {Schema} schema
     * @returns {Schema}
     *
     * @memberof Service
     */
    applyMixins(schema) {
        if (schema.mixins) {
            const mixins = Array.isArray(schema.mixins) ? schema.mixins : [schema.mixins];
            if (mixins.length > 0) {
                const mixedSchema = Array.from(mixins)
                    .reverse()
                    .reduce((s, mixin) => {
                        if (mixin.mixins) mixin = this.applyMixins(mixin);

                        return s ? this.mergeSchemas(s, mixin) : mixin;
                    }, null);

                return this.mergeSchemas(mixedSchema, schema);
            }
        }

        /* istanbul ignore next */
        return schema;
    }

    /**
     * Merge two Service schema
     *
     * @param {Object} mixinSchema        Mixin schema
     * @param {Object} svcSchema         Service schema
     * @returns {Object} Mixed schema
     *
     * @memberof Service
     */
    mergeSchemas(mixinSchema, svcSchema) {
        const res = _.cloneDeep(mixinSchema);
        if (!svcSchema) return res;
        const mods = _.cloneDeep(svcSchema);
        if (!mixinSchema) return mods;

        Object.keys(mods).forEach(key => {
            if ((key === "name" || key === "version") && mods[key] !== undefined) {
                // Simple overwrite
                res[key] = mods[key];
            } else if (key === "settings") {
                // Merge with defaultsDeep
                res[key] = this.mergeSchemaSettings(mods[key], res[key]);
            } else if (key === "metadata") {
                // Merge with defaultsDeep
                res[key] = this.mergeSchemaMetadata(mods[key], res[key]);
            } else if (key === "hooks") {
                // Merge & concat
                res[key] = this.mergeSchemaHooks(mods[key], res[key] || {});
            } else if (key === "actions") {
                // Merge with defaultsDeep
                res[key] = this.mergeSchemaActions(mods[key], res[key] || {});
            } else if (key === "methods") {
                // Overwrite
                res[key] = this.mergeSchemaMethods(mods[key], res[key]);
            } else if (key === "events") {
                // Merge & concat by groups
                res[key] = this.mergeSchemaEvents(mods[key], res[key] || {});
            } else if (["merged", "created", "started", "stopped"].indexOf(key) !== -1) {
                // Concat lifecycle event handlers
                res[key] = this.mergeSchemaLifecycleHandlers(mods[key], res[key]);
            } else if (key === "mixins") {
                // Concat mixins
                res[key] = this.mergeSchemaUniqArray(mods[key], res[key]);
            } else if (key === "dependencies") {
                // Concat mixins
                res[key] = this.mergeSchemaUniqArray(mods[key], res[key]);
            } else {
                const customFnName = "mergeSchema" + key.replace(/./, key[0].toUpperCase()); // capitalize first letter
                // TODO: add middleware hook
                if (isFunction(this[customFnName])) {
                    res[key] = this[customFnName](mods[key], res[key]);
                } else {
                    res[key] = this.mergeSchemaUnknown(mods[key], res[key]);
                }
            }
        });

        return res;
    }

    /**
     * Merge `settings` property in schema
     *
     * @param {Object} src Source schema property
     * @param {Object} target Target schema property
     *
     * @returns {Object} Merged schema
     */
    mergeSchemaSettings(src, target) {
        if ((target && target.$secureSettings) || (src && src.$secureSettings)) {
            const srcSS = src && src.$secureSettings ? src.$secureSettings : [];
            const targetSS = target && target.$secureSettings ? target.$secureSettings : [];
            if (!target) target = {};

            target.$secureSettings = uniq([...srcSS, ...targetSS]);
        }

        return _.defaultsDeep(src, target);
    }

    /**
     * Merge `metadata` property in schema
     *
     * @param {Object} src Source schema property
     * @param {Object} target Target schema property
     *
     * @returns {Object} Merged schema
     */
    mergeSchemaMetadata(src, target) {
        return _.defaultsDeep(src, target);
    }

    /**
     * Merge `mixins` property in schema
     *
     * @param {Object} src Source schema property
     * @param {Object} target Target schema property
     *
     * @returns {Object} Merged schema
     */
    mergeSchemaUniqArray(src, target) {
        return _.uniqWith(_.compact(flatten([src, target])), _.isEqual);
    }

    /**
     * Merge `dependencies` property in schema
     *
     * @param {Object} src Source schema property
     * @param {Object} target Target schema property
     *
     * @returns {Object} Merged schema
     */
    mergeSchemaDependencies(src, target) {
        return this.mergeSchemaUniqArray(src, target);
    }

    /**
     * Merge `hooks` property in schema
     *
     * @param {Object} src Source schema property
     * @param {Object} target Target schema property
     *
     * @returns {Object} Merged schema
     */
    mergeSchemaHooks(src, target) {
        Object.keys(src).forEach(k => {
            if (target[k] == null) target[k] = {};

            Object.keys(src[k]).forEach(k2 => {
                const modHook = wrapToArray(src[k][k2]);
                const resHook = wrapToArray(target[k][k2]);

                target[k][k2] = _.compact(
                    flatten(k === "before" ? [resHook, modHook] : [modHook, resHook])
                );
            });
        });

        return target;
    }

    /**
     * Merge `actions` property in schema
     *
     * @param {Object} src Source schema property (real schema)
     * @param {Object} target Target schema property (mixin schema)
     *
     * @returns {Object} Merged schema
     */
    mergeSchemaActions(src, target) {
        Object.keys(src).forEach(k => {
            if (src[k] === false && target[k]) {
                delete target[k];
                return;
            }

            const srcAction = wrapToHandler(src[k]);
            const targetAction = wrapToHandler(target[k]);

            if (srcAction && srcAction.hooks && targetAction && targetAction.hooks) {
                Object.keys(srcAction.hooks).forEach(k => {
                    const modHook = wrapToArray(srcAction.hooks[k]);
                    const resHook = wrapToArray(targetAction.hooks[k]);

                    srcAction.hooks[k] = _.compact(
                        flatten(k === "before" ? [resHook, modHook] : [modHook, resHook])
                    );
                });
            }

            target[k] = _.defaultsDeep(srcAction, targetAction);
        });

        return target;
    }

    /**
     * Merge `methods` property in schema
     *
     * @param {Object} src Source schema property
     * @param {Object} target Target schema property
     *
     * @returns {Object} Merged schema
     */
    mergeSchemaMethods(src, target) {
        return Object.assign(target || {}, src || {});
    }

    /**
     * Merge `events` property in schema
     *
     * @param {Object} src Source schema property
     * @param {Object} target Target schema property
     *
     * @returns {Object} Merged schema
     */
    mergeSchemaEvents(src, target) {
        Object.keys(src).forEach(k => {
            const modEvent = wrapToHandler(src[k]);
            const resEvent = wrapToHandler(target[k]);

            let handler = _.compact(
                flatten([resEvent ? resEvent.handler : null, modEvent ? modEvent.handler : null])
            );
            if (handler.length === 1) handler = handler[0];

            target[k] = _.defaultsDeep(modEvent, resEvent);
            target[k].handler = handler;
        });

        return target;
    }

    /**
     * Merge `started`, `stopped`, `created` event handler properties in schema
     *
     * @param {Object} src Source schema property
     * @param {Object} target Target schema property
     *
     * @returns {Object} Merged schema
     */
    mergeSchemaLifecycleHandlers(src, target) {
        return _.compact(flatten([target, src]));
    }

    /**
     * Merge unknown properties in schema
     *
     * @param {Object} src Source schema property
     * @param {Object} target Target schema property
     *
     * @returns {Object} Merged schema
     */
    mergeSchemaUnknown(src, target) {
        if (src !== undefined) return src;

        return target;
    }

    /**
     * Return a versioned full service name.
     * @param {String} name
     * @param {String|Number?} version
     */
    static getVersionedFullName(name, version) {
        if (version != null)
            return (typeof version == "number" ? "v" + version : version) + "." + name;

        return name;
    }
}

module.exports = Service;