microfleet/core

View on GitHub
packages/core/src/index.ts

Summary

Maintainability
D
1 day
Test Coverage
import type * as ns from '@microfleet/core-types'
import type { PartialDeep } from 'type-fest'

/**
 * Microservice Abstract Class
 * @module Microfleet
 */
import { isAbsolute, resolve } from 'path'
import { strict as assert } from 'assert'
import { EventEmitter } from 'eventemitter3'
import * as defaultOpts from './defaults'
import { HttpStatusError } from '@microfleet/validation'
import {
  getHealthStatus,
  PluginHealthCheck,
  HealthStatus,
  PluginHealthStatus,
} from './utils/pluginHealthStatus'
import {
  defaultsDeep,
  getVersion,
  PluginsPriority,
  ConnectorsTypes,
  ConnectorsPriority,
  CONNECTORS_PROPERTY,
  DESTRUCTORS_PROPERTY,
  HEALTH_CHECKS_PROPERTY
} from '@microfleet/utils'
import { debug as _debug } from 'debug'

export { PluginHealthStatus, HealthStatus }

const toArray = <T>(x: T | T[]): T[] => Array.isArray(x) ? x : [x]
const debug = _debug('@microfleet:core')
const kRegisterCalled = Symbol('@microfleet::registerCalled')

export {
  PLUGIN_STATUS_OK,
  PLUGIN_STATUS_FAIL,
  PluginTypes,
  PluginsPriority,
  ConnectorsTypes,
  ConnectorsPriority
} from '@microfleet/utils'

/**
 * Helper method to enable router extensions.
 * @param name - Pass extension name to require.
 * @returns Extension to router plugin.
 */
export const routerExtension = (name: string): unknown => {
  // eslint-disable-next-line @typescript-eslint/no-var-requires
  return require(require.resolve(`./plugins/router/extensions/${name}`)).default
}

function resolveModule<T>(cur: T | null, path: string): T | null {
  if (cur != null) {
    return cur
  }

  try {
    return require(require.resolve(path))
  } catch (e: any) {
    if (e.code !== 'MODULE_NOT_FOUND') {
      // eslint-disable-next-line no-console
      console.warn(e)
    }

    return null
  }
}

export type CoreOptions = ns.ConfigurationRequired & PartialDeep<ns.ConfigurationOptional>

/**
 * @class Microfleet
 */
export class Microfleet extends EventEmitter {
  public static readonly version: string = getVersion()

  public config: ns.CoreOptions
  public readonly version: string
  public readonly migrators: { [name: string]: ns.AnyFn }
  public readonly plugins: string[]
  public readonly [CONNECTORS_PROPERTY]: ns.StartStopTree
  public readonly [DESTRUCTORS_PROPERTY]: ns.StartStopTree
  public readonly [HEALTH_CHECKS_PROPERTY]: PluginHealthCheck[]
  private connectorToPlugin: Map<ns.PluginConnector, string>
  private [kRegisterCalled]: boolean

  /**
   * Allow Extensions
   */
  [property: string]: any;

  /**
   * @param [opts={}] - Overrides for configuration.
   * @returns Instance of microservice.
   */
  constructor(opts: CoreOptions) {
    super()

    // init configuration
    this.config = defaultsDeep(opts, defaultOpts) as ns.CoreOptions
    this.exit = this.exit.bind(this)
    this.version = Microfleet.version

    // init migrations
    this.migrators = Object.create(null)
    this.connectorToPlugin = new Map()

    // init health status checkers
    this[HEALTH_CHECKS_PROPERTY] = []

    // init plugins
    this.plugins = []
    this[CONNECTORS_PROPERTY] = Object.create(null)
    this[DESTRUCTORS_PROPERTY] = Object.create(null)
    this[kRegisterCalled] = false

    for (const pluginType of PluginsPriority) {
      this[CONNECTORS_PROPERTY][pluginType] = []
      this[DESTRUCTORS_PROPERTY][pluginType] = []
    }

    // setup error listener
    this.on('error', this.onError)
  }

  /**
   * Initializes all plugins
   */
  public async register(): Promise<void> {
    if (this[kRegisterCalled]) {
      throw new Error('register() has already been called')
    }

    await this.initPlugins(this.config)

    // setup hooks
    for (const [eventName, hooks] of Object.entries(this.config.hooks)) {
      for (const hook of toArray(hooks)) {
        this.on(eventName, hook)
      }
    }

    if (this.config.sigterm) {
      this.on('ready', () => {
        process.once('SIGTERM', this.exit)
        process.once('SIGINT', this.exit)
      })
    }

    this[kRegisterCalled] = true
  }

  /**
   * Asyncronously calls event listeners
   * and waits for them to complete.
   * This is a bit odd compared to normal event listeners,
   * but works well for dynamically running async actions and waiting
   * for them to complete.
   *
   * @param event - Hook name to be called during execution.
   * @param args - Arbitrary args to pass to the hooks.
   * @returns Result of invoked hook.
   */
  public async hook(event: string, ...args: unknown[]): Promise<any[]> {
    const listeners = this.listeners(event)
    const work = []

    for (const listener of listeners.values()) {
      work.push(listener.apply(this, args))
    }

    return Promise.all(work)
  }

  /**
   * Adds migrators.
   * @param name - Migrator name.
   * @param fn - Migrator function to be invoked.
   * @param args - Arbitrary args to be passed to fn later on.
   */
  public addMigrator(name: string, fn: ns.AnyFn, ...args: any[]): void {
    this.migrators[name] = (...migratorArgs: any[]): any => fn.call(this, ...args, ...migratorArgs)
  }

  /**
   * Performs migration for a given database or throws if migrator is not present.
   * @param  name - Name of the migration to invoke.
   * @param  args - Extra args to pass to the migrator.
   * @returns Result of the migration.
   */
  public migrate(name: string, ...args: unknown[]): any {
    const migrate = this.migrators[name]
    assert(typeof migrate === 'function', `migrator ${name} not defined`)
    return migrate(...args)
  }

  /**
   * Generic connector for all of the plugins.
   * @returns Walks over registered connectors and emits ready event upon completion.
   */
  public async connect(): Promise<unknown[]> {
    if (!this[kRegisterCalled]) {
      await this.register()
    }

    return this.processAndEmit(this.getConnectors(), 'ready', ConnectorsPriority)
  }

  /**
   * Generic cleanup function.
   * @returns Walks over registered destructors and emits close event upon completion.
   */
  public async close(): Promise<any> {
    const r = await this.processAndEmit(this.getDestructors(), 'close', [...ConnectorsPriority].reverse())
    if (this.config.sigterm) {
      process.removeListener('SIGTERM', this.exit)
      process.removeListener('SIGINT', this.exit)
    }
    return r
  }

  // ****************************** Plugin section: public ************************************

  /**
   * Public function to init plugins.
   *
   * @param mod - Plugin module instance.
   * @param mod.name - Plugin name.
   * @param mod.attach - Plugin attach function.
   * @param [conf] - Configuration in case it's not present in the core configuration object.
   */
  // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
  public async initPlugin<T extends Record<string, unknown>>(mod: ns.Plugin<T>, conf?: any): Promise<void> {
    const pluginName = mod.name
    debug('initializing', pluginName)

    let expose: ns.PluginInterface

    try {
      const configuration = conf || this.config[mod.name] || Object.create(null)

      // Temporary workaround while we have bundled schemas
      if (pluginName === 'validator') {
        if (!configuration.schemas) {
          configuration.schemas = []
        }
        configuration.schemas.push(resolve(__dirname, '../schemas'))
      }

      expose = await mod.attach.call(this, configuration, __filename)
    } catch (e: any) {
      if (e.constructor === HttpStatusError) {
        e.message = `[@microfleet/core] Could not attach ${mod.name}:\n${e.message}`
      }

      throw e
    }

    this.plugins.push(pluginName)

    if (typeof expose !== 'object' || expose == null) {
      return
    }

    const { connect, status, close } = expose
    const type = ConnectorsTypes[mod.type]

    assert(type, 'Plugin type must be equal to one of connectors type')

    if (typeof connect === 'function') {
      this.addConnector(type, connect, pluginName)
    }

    if (typeof close === 'function') {
      this.addDestructor(type, close, pluginName)
    }

    if (typeof status === 'function') {
      this.addHealthCheck(new PluginHealthCheck(mod.name, status))
    }
  }

  /**
   * Returns registered connectors.
   * @returns Connectors.
   */
  public getConnectors(): ns.StartStopTree {
    return this[CONNECTORS_PROPERTY]
  }

  /**
   * Returns registered destructors.
   * @returns Destructors.
   */
  public getDestructors(): ns.StartStopTree {
    return this[DESTRUCTORS_PROPERTY]
  }

  /**
   * Returns registered health checks.
   * @returns Health checks.
   */
  public getHealthChecks(): PluginHealthCheck[] {
    return this[HEALTH_CHECKS_PROPERTY]
  }

  /**
   * Initializes connectors on the instance of Microfleet.
   * @param type - Connector type.
   * @param handler - Plugin connector.
   * @param plugin - name of the plugin, optional.
   */
  public addConnector(type: ns.ConnectorsTypes, handler: ns.PluginConnector, plugin?: string): void {
    this.addHandler(CONNECTORS_PROPERTY, type, handler, plugin)
  }

  /**
   * Initializes destructor on the instance of Microfleet.
   * @param type - Destructor type.
   * @param handler - Plugin destructor.
   * @param plugin - name of the plugin, optional.
   */
  public addDestructor(type: ns.ConnectorsTypes, handler: ns.PluginConnector, plugin?: string): void {
    this.addHandler(DESTRUCTORS_PROPERTY, type, handler, plugin)
  }

  /**
   * Initializes plugin health check.
   * @param {Function} handler - Health check function.
   */
  public addHealthCheck(handler: PluginHealthCheck): void {
    this[HEALTH_CHECKS_PROPERTY].push(handler)
  }

  /**
   * Asks for health status of registered plugins if it's possible, logs it and returns summary.
   */
  public getHealthStatus(): Promise<HealthStatus> {
    return getHealthStatus.call(this, this.getHealthChecks(), this.config.healthChecks)
  }

  public hasPlugin(name: string): boolean {
    return this.plugins.includes(name)
  }

  /**
   * Overrides SIG* events and exits cleanly.
   * @returns Resolves when exit sequence has completed.
   */
  private async exit(): Promise<void> | never {
    this.log.info('received close signal... closing connections...')

    let timeout: NodeJS.Timeout | null = null
    try {
      await Promise.race([
        this.close(),
        new Promise((_, reject) => {
          timeout = setTimeout(reject, 30000, new Error('failed to close after 30 seconds'))
          timeout.unref()
        })
      ])
    } catch (err) {
      this.log.error({ err }, 'Unable to shutdown')
      process.exit(128)
    }

    if (timeout) clearTimeout(timeout)
    this.log.info('close finished')
  }

  /**
   * Helper for calling funcs and emitting event after.
   *
   * @param collection - Object with namespaces for arbitrary handlers.
   * @param event - Type of handlers that must be called.
   * @param [priority=Microfleet.ConnectorsPriority] - Order to process collection.
   * @returns Result of the invocation.
   */
  private async processAndEmit(collection: ns.StartStopTree, event: string, priority = ConnectorsPriority): Promise<any[]> {
    const responses = []
    for (const connectorType of priority) {
      const connectors: ns.PluginConnector[] | void = collection[connectorType]
      if (!connectors) {
        continue
      }

      for (const handler of connectors) {
        const pluginName = this.connectorToPlugin.get(handler)
        if (this.log) {
          this.log.info({ pluginName, connectorType, event }, 'started')
        }

        responses.push(await handler.call(this))

        if (this.log) {
          this.log.info({ pluginName, connectorType, event }, 'completed')
        }
      }
    }

    this.emit(event)

    return responses
  }

  // ***************************** Plugin section: private **************************************
  private addHandler(property: ns.HandlerProperties, type: ns.ConnectorsTypes, handler: ns.PluginConnector, plugin?: string): void {
    if (this[property][type] === undefined) {
      this[property][type] = []
    }

    if (property === DESTRUCTORS_PROPERTY) {
      // reverse
      this[property][type].unshift(handler)
    } else {
      this[property][type].push(handler)
    }

    if (plugin) {
      this.connectorToPlugin.set(handler, plugin)
    }
  }

  /**
   * Initializes service plugi`ns.
   * @param {Object} config - Service plugins configuration.
   * @private
   */
  private async initPlugins(config: ns.CoreOptions): Promise<void> {
    // require all modules
    const plugins: ns.Plugin[] = []
    for (const plugin of config.plugins) {
      const paths = isAbsolute(plugin)
        ? [plugin]
        : [`./plugins/${plugin}`, `@microfleet/plugin-${plugin}`]

      // back-compatibility, should be removed when we redo initialization of plugins
      if (plugin === 'redisCluster') {
        paths.unshift('@microfleet/plugin-redis-cluster')
      } else if (plugin === 'redisSentinel') {
        paths.unshift('@microfleet/plugin-redis-sentinel')
      }

      const pluginModule: ns.Plugin | null = paths.reduce(resolveModule, null)

      if (pluginModule === null) {
        throw new Error(`failed to init ${plugin}`)
      }

      plugins.push(pluginModule)
    }

    // sort and ensure that they are attached based
    // on their priority
    plugins.sort(this.pluginComparator)

    // call the .attach function
    for (const plugin of plugins) {
      await this.initPlugin(plugin)
    }

    this.emit('init')
  }

  private pluginComparator(a: ns.Plugin, b: ns.Plugin): number {
    const ap = PluginsPriority.indexOf(a.type)
    const bp = PluginsPriority.indexOf(b.type)

    // same plugin type, check priority
    if (ap === bp) {
      if (a.priority < b.priority) return -1
      if (a.priority > b.priority) return 1
      return 0
    }

    // different plugin types, sort based on it
    if (ap < bp) return -1
    return 1
  }

  /**
   * Notifies about errors when no other listeners are present
   * by throwing them.
   * @param err - Error that was emitted by the service members.
   */
  private onError = (err: Error): void | never => {
    if (this.listeners('error').length > 1) {
      return
    }

    throw err
  }
}

export interface Microfleet extends ns.Microfleet {}