senecajs/seneca

View on GitHub
seneca.ts

Summary

Maintainability
D
2 days
Test Coverage
/* Copyright © 2010-2023 Richard Rodger and other contributors, MIT License. */
'use strict'



// Node API modules.
const Events = require('events')
const Util = require('util')


// External modules.
const GateExecutor = require('gate-executor')
const Jsonic = require('@jsonic/jsonic-next')
const UsePlugin = require('use-plugin')
import Nid from 'nid'
import { Patrun, Gex } from 'patrun'
const Stats = require('rolling-stats')
const { Ordu } = require('ordu')
const Eraro = require('eraro')
import { Gubu } from 'gubu'


// Internal modules.
const Common = require('./lib/common')
const { make_logging } = require('./lib/logging')
const { API } = require('./lib/api')
const { make_ready } = require('./lib/ready')
const Act = require('./lib/act')
const Add = require('./lib/add')
const Sub = require('./lib/sub')
import { Prior } from './lib/prior'
import { Plugin } from './lib/plugin'
import { Inward } from './lib/inward'
import { Outward } from './lib/outward'
const { Legacy } = require('./lib/legacy')
const { resolve_options } = require('./lib/options')
const { Print } = require('./lib/print')
const { addActions } = require('./lib/actions')
const { transport } = require('./lib/transport')

import Pkg from './package.json'


// Internal data and utilities.
const { error, deep } = Common


const { One, Any, Skip, Open } = Gubu

// Seneca options.
const option_defaults = {
  // Tag this Seneca instance, will be appended to instance identifier.
  tag: '-', // TODO: FIX: Gubu api.test.js#292

  // Standard timeout for actions.
  timeout: 22222,

  // Standard length of identifiers for actions.
  idlen: 12,
  didlen: 4,

  // Manually set instance identifier.
  id$: Skip(String),

  // Register (true) default plugins. Set false to not register when
  // using custom versions.
  default_plugins: Open({}),

  // Test mode. Use for unit testing.
  test: false,

  // Quiet mode. Moves log level to warn. Use for unit testing.
  quiet: false,

  // Default logging specification - see lib/logging.js
  log: Any(make_logging().default_logspec),

  // Custom logger function, optional - see lib/logging.js
  logger: One(Function, Object, String, null),

  // Wait time for plugins to close gracefully.
  death_delay: 11111,

  // LEGACY: remove in 4.x
  deathdelay: 11111,

  // Wait time for actions to complete before shutdown.
  close_delay: 22222,

  // Legacy; specify general error handler
  errhandler: Skip(One(Function, null)),

  // Load options from a file path
  from: Skip(String),

  // Provide a module to base option require loading from
  module: Skip(),

  // Control error handling.
  error: {

    // Control capture of errors for logging.
    capture: {

      // Capture errors in action callbacks (false throws uncaught).
      // callback: true,

      // Capture errors in actions and pass to callback (false throws uncaught).
      // action: true,
    },

    // Custom function to identify thrown errors.
    identify: (e: any) => e instanceof Error,
  },

  // Validate messages and options.
  valid: {

    // If false disables all validation.
    active: true,

    // Validate message parameters.
    message: true,

    // Validate main Seneca instance options.
    option: true,

    // Validate plugin options.
    plugin: true,
  },

  // Debug settings.
  debug: {
    // Throw (some) errors from seneca.act.
    fragile: false,

    // Fatal errors ... aren't fatal. Not for production!
    undead: false,

    // Print debug info to console
    print: {
      // Print options. Best used via --seneca.print.options.
      options: false,

      // Amount of information to print on fatal error: 'summary', 'full'
      fatal: 'summary',

      // Include environment when printing full crash report.
      // Default: false for security.
      env: false,

      // Regardless of logging, call `console.err` on errors
      err: false,

      // Depth of object inspection
      depth: 2,
    },

    // Trace action caller and place in args.caller$.
    act_caller: false,

    // Shorten all identifiers to 2 characters.
    short_logs: false,

    // Record and log callpoints (calling code locations).
    callpoint: false,

    // Log deprecation warnings
    deprecation: true,

    // Set to array to force artificial argv and ignore process.argv
    argv: One([], null),

    // Set to object to force artificial env and ignore process.env
    env: One({}, null),

    // Length of data description in logs
    datalen: 111,
  },

  // Enforce strict behaviours. Relax when backwards compatibility needed.
  strict: {
    // Action result must be a plain object.
    result: true,

    // Delegate fixedargs override action args.
    fixedargs: true,

    // Adding a pattern overrides existing pattern only if matches exactly.
    add: false,

    // If no action is found and find is false,
    // then no error returned along with empty object
    find: true,

    // Maximum number of times an action can call itself
    maxloop: 11,

    // Exports must exist
    exports: false,
  },

  // Keep a transient time-ordered history of actions submitted
  history: {
    // History log is active.
    active: true,

    // Prune the history. Disable only for debugging.
    prune: true,

    // Prune the history only periodically.
    interval: 100,
  },

  // Action executor tracing. See gate-executor module.
  trace: {
    act: One(Function, false),

    stack: false,

    // Messages that do not match a known pattern
    unknown: One(String, true),

    // Messages that have invalid content
    invalid: false,
  },

  // Action statistics settings. See rolling-stats module.
  stats: {
    size: 1024,
    interval: 60000,
    running: false,
  },

  // Plugin settings
  plugin: {},

  // Plugins to load (will be passed to .use)
  plugins: One({}, [], null),

  // System wide functionality.
  system: {

    // Function to exit the process.
    exit: (...args: any[]) => {
      process.exit(...args)
    },

    // Close instance on these signals, if true.
    close_signals: {
      SIGHUP: false,
      SIGTERM: false,
      SIGINT: false,
      SIGBREAK: false,
    },

    plugin: {
      load_once: false,
    },

    // System actions.
    action: {

      // Add system actions.
      add: true
    },
  },

  // Internal functionality. Reserved for objects and functions only.
  internal: Open({
    // Console printing utilities
    print: {
      // Print to standard out
      log: One(Function, null),

      // Print to standard err
      err: One(Function, null),
    },
  }),

  // Log status at periodic intervals.
  status: {
    interval: 60000,

    // By default, does not run.
    running: false,
  },

  // Shared default transport configuration
  transport: Open({
    // Standard port for messages.
    port: 10101,

    host: Skip(String),
    path: Skip(String),
    protocol: Skip(String),
  }),

  limits: {
    maxparents: 33,
  },

  // Setup event listeners before starting
  events: {},

  // Backwards compatibility settings.
  legacy: One(Boolean, {
    // Add legacy properties
    actdef: false,

    // Action callback must always have signature callback(error, result).
    action_signature: false,

    // Use old error handling.
    error: true,

    // Use old error codes. REMOVE in Seneca 4.x
    error_codes: false,

    // Use old fail method
    fail: false,

    // Logger can be changed by options method.
    logging: false,

    // Add meta$ property to messages.
    meta: false,

    // Remove meta argument in action arguments and callbacks.
    // meta_arg_remove: false,

    // Use seneca-transport plugin.
    transport: true,

    // Insert "[TIMEOUT]" into timeout error message
    timeout_string: true,

    // If false, use Gubu for message validation.
    rules: false,

    // If false, use Gubu for option validation (including plugin defaults)
    options: true,

    // If true, look for plugin options by name at the top level of options
    top_plugins: false,
  }),

  // Processing task ordering.
  order: {
    // Action add task ordering.
    add: {
      // Print task execution log.
      debug: false,
    },

    // Action inward task ordering.
    inward: {
      // Print task execution log.
      debug: false,
    },

    // Action outward task ordering.
    outward: {
      // Print task execution log.
      debug: false,
    },

    // Plugin load task ordering.
    use: {
      // Print task execution log.
      debug: false,
    },
  },

  // Prior actions.
  prior: {
    // Call prior actions directly (not as further messages).
    direct: false,
  },

  // Legacy
  reload$: Skip(Boolean),
  // actcache: Any(),
  // seneca: Any(),
}


// Utility functions exposed by Seneca via `seneca.util`.
const seneca_util = {
  Eraro,

  Jsonic,
  Nid,
  Patrun,
  Gex,
  Gubu,

  pins: Common.pins,
  clean: Common.clean,
  pattern: Common.pattern,
  print: Common.print,
  error: error,
  deep: Common.deep,

  deepextend: Common.deep,
  parsepattern: Common.parsePattern,
  pincanon: Common.pincanon,
  router: function router() {
    return Patrun()
  },
  resolve_option: Common.resolve_option,

  // Legacy (deprecate and remove)
  // argprops: Legacy.argprops,
  // recurse: Legacy.recurse,
  // copydata: Legacy.copydata,
  // nil: Legacy.nil,
  flatten: Legacy.flatten,
}

// Internal implementations.
const intern = {
  util: seneca_util,
}

// Seneca is an EventEmitter.
function Seneca(this: any) {
  Events.EventEmitter.call(this)
  this.setMaxListeners(0)
}
Util.inherits(Seneca, Events.EventEmitter)

// Mark the Seneca object
Seneca.prototype.isSeneca = true

// Provide useful description when convered to JSON.
// Cannot be instantiated from JSON.
Seneca.prototype.toJSON = function toJSON() {
  return {
    isSeneca: true,
    id: this.id,
    did: this.did,
    fixedargs: this.fixedargs,
    fixedmeta: this.fixedmeta,
    start_time: this.start_time,
    version: this.version,
  }
}

Seneca.prototype[Util.inspect.custom] = Seneca.prototype.toJSON

// Create a Seneca instance.
function init(seneca_options?: any, more_options?: any) {
  var initial_opts =
    'string' === typeof seneca_options
      ? deep({}, { from: seneca_options }, more_options)
      : deep({}, seneca_options, more_options)

  // Legacy options, remove in 4.x
  initial_opts.deathdelay = initial_opts.death_delay

  var seneca = make_seneca(initial_opts)
  var options = seneca.options()

  // The 'internal' key of options is reserved for objects and functions
  // that provide functionality, and are thus not really printable
  seneca.log.debug({ kind: 'notice', options: { ...options, internal: null } })

  Print.print_options(seneca, options)

  // Register plugins specified in options.
  options.plugins = null == options.plugins ? {} : options.plugins
  var pluginkeys = Object.keys(options.plugins)
  for (var pkI = 0; pkI < pluginkeys.length; pkI++) {
    var pluginkey = pluginkeys[pkI]
    var plugindesc = options.plugins[pluginkey]

    if (false === plugindesc) {
      seneca.private$.ignore_plugins[pluginkey] = true
    } else {
      seneca.use(plugindesc)
    }
  }

  seneca.ready(function(this: any) {
    this.log.info({ kind: 'notice', data: 'hello ' + this.id })
  })

  return seneca
}


// Expose Seneca prototype for easier monkey-patching
init.Seneca = Seneca

// To reference builtin loggers when defining logging options.
init.loghandler = Legacy.loghandler

// Makes require('seneca').use(...) work by creating an on-the-fly instance.
init.use = function top_use() {
  var argsarr = new Array(arguments.length)
  for (var l = 0; l < argsarr.length; ++l) {
    argsarr[l] = arguments[l]
  }

  var instance = init()

  return instance.use.apply(instance, argsarr)
}

// Makes require('seneca').test() work.
init.test = function top_test() {
  return init().test(...arguments)
}

// Makes require('seneca').quiet() work.
init.quiet = function top_quiet() {
  return init().quiet(...arguments)
}

init.util = seneca_util
init.valid = Gubu

init.test$ = { intern: intern }


type Instance = ReturnType<typeof make_seneca> & Record<string, any>

export type { Instance }


export default init

module.exports = init


// Create a new Seneca instance.
function make_seneca(initial_opts?: any) {
  // Create a private context.
  var private$: any = make_private()
  private$.error = error

  // Create a new root Seneca instance.
  var root$: any = new (Seneca as any)()

  // Expose private data to plugins.
  root$.private$ = private$

  // Resolve initial options.
  private$.optioner = resolve_options(module, option_defaults, initial_opts)
  var start_opts = private$.optioner.get()

  // Console print utilities
  private$.print = {
    log: start_opts.internal.print.log || Print.internal_log,
    err: start_opts.internal.print.err || Print.internal_err,
  }

  // These need to come from options as required during construction.
  private$.actrouter = start_opts.internal.actrouter || Patrun({ gex: true })

  private$.translationrouter =
    start_opts.internal.translationrouter || Patrun({ gex: true })

  const soi_subrouter = start_opts.internal.subrouter || {}

  private$.subrouter = {
    // Check for legacy inward router
    inward: soi_subrouter.inward || Patrun({ gex: true }),
    outward: soi_subrouter.outward || Patrun({ gex: true }),
  }


  // Setup event handlers, if defined
  var event_names = ['log', 'act_in', 'act_out', 'act_err', 'ready', 'close']
  event_names.forEach(function(event_name) {
    if ('function' === typeof start_opts.events[event_name]) {
      root$.on(event_name, start_opts.events[event_name])
    }
  })

  // Create internal tools.
  private$.actnid = Nid({ length: start_opts.idlen })
  private$.didnid = Nid({ length: start_opts.didlen })

  // Instance specific incrementing counters to create unique function names
  private$.next_action_id = Common.autoincr()

  var callpoint = (private$.callpoint = Common.make_callpoint(
    start_opts.debug.callpoint
  ))

  // Define public member variables.
  root$.start_time = Date.now()
  root$.context = {}
  root$.version = Pkg.version

  // TODO: rename in 4.x as "args" terminology is legacy
  root$.fixedargs = {}
  root$.fixedmeta = {}

  root$.fixedmeta = {}

  root$.flags = {
    closed: false,
  }

  Object.defineProperty(root$, 'root', { value: root$ })

  private$.history = Common.history(start_opts.history)

  const ready = make_ready(root$)

  // API for Ordu-defined processes.
  root$.order = {}

  // TODO: rename back to plugins
  const api_use = Plugin.api_use(callpoint, {
    debug: !!start_opts.debug.ordu || !!start_opts.order.use.debug,
  })
  root$.use = api_use.use // Define and load a plugin.

  root$.order.plugin = api_use.ordu

  // Seneca methods. Official API.
  root$.toString = API.toString
  root$.has = API.has // True if the given pattern has an action.
  root$.find = API.find // Find the action definition for a pattern.
  root$.list = API.list // List the patterns added to this instance.
  root$.status = API.status // Get the status if this instance.
  root$.reply = API.reply // Reply to a submitted message.
  root$.sub = Sub.api_sub // Subscribe to messages.
  root$.list_plugins = API.list_plugins // List the registered plugins.
  root$.find_plugin = API.find_plugin // Find the plugin definition.
  root$.has_plugin = API.has_plugin // True if the plugin is registered.
  root$.ignore_plugin = API.ignore_plugin // Ignore plugin and don't register it.
  root$.listen = API.listen(callpoint) // Listen for inbound messages.
  root$.client = API.client(callpoint) // Send outbound messages.
  root$.gate = API.gate // Create a delegate that executes actions in sequence.
  root$.ungate = API.ungate // Execute actions in parallel.
  root$.translate = API.translate // Translate message to new pattern.
  root$.ping = API.ping // Generate ping response.
  root$.test = API.test // Set test mode.
  root$.quiet = API.quiet // Convenience method to set logging level to `warn+`.
  root$.export = API.export // Export plain objects from a plugin.
  root$.depends = API.depends // Check for plugin dependencies.
  root$.delegate = API.delegate // Create an action-specific Seneca instance.
  root$.prior = Prior.api_prior // Call the previous action definition for pattern.
  root$.inward = API.inward // Add a modifier function for messages inward
  root$.outward = API.outward // Add a modifier function for responses outward
  root$.error = API.error // Set global error handler, or generate Seneca Error
  root$.fail = start_opts.legacy.fail
    ? Legacy.make_legacy_fail(start_opts)
    : API.fail // Throw a Seneca error
  root$.explain = API.explain // Toggle top level explain capture
  root$.decorate = API.decorate // Decorate seneca object with functions
  root$.seneca = API.seneca
  root$.close = API.close(callpoint) // Close and shutdown plugins.
  root$.options = API.options // Get and set options.
  root$.fix = API.fix // fix pattern arguments, message arguments, and custom meta
  root$.wrap = API.wrap // wrap each found pattern with a new action
  root$.add = Add.api_add // Add a pattern an associated action.
  root$.act = Act.api_act // Submit a message and trigger the associated action.
  root$.ready = ready.api_ready // Callback when plugins initialized.
  root$.valid = Gubu // Expose Gubu shape builders

  root$.internal = function() {
    return {
      ordu: {
        use: api_use.ordu,
      },
    }
  }

  // Non-API methods.
  // root$.register = Plugins.register(callpoint)

  // DEPRECATE IN 4.x
  root$.findact = root$.find
  root$.plugins = API.list_plugins
  root$.findplugin = API.find_plugin
  root$.hasplugin = API.has_plugin
  root$.hasact = Legacy.hasact
  root$.act_if = Legacy.act_if
  root$.findpins = Legacy.findpins
  root$.pinact = Legacy.findpins
  root$.next_act = Legacy.next_act

  // Identifier generator.
  root$.idgen = Nid({ length: start_opts.idlen })

  // Instance tag
  start_opts.tag = null != start_opts.tag ? start_opts.tag : option_defaults.tag

  // Create a unique identifer for this instance.
  root$.id =
    start_opts.id$ ||
    root$.idgen() +
    '/' +
    root$.start_time +
    '/' +
    process.pid +
    '/' +
    root$.version +
    '/' +
    start_opts.tag

  // The instance tag, useful for grouping instances.
  root$.tag = start_opts.tag

  if (start_opts.debug.short_logs || start_opts.log.short) {
    start_opts.idlen = 2
    root$.idgen = Nid({ length: start_opts.idlen })
    root$.id = root$.idgen() + '/' + start_opts.tag
  }

  root$.fullname = 'Seneca/' + root$.id

  root$.die = Common.makedie(root$, {
    type: 'sys',
    plugin: 'seneca',
    tag: root$.version,
    id: root$.id,
    callpoint: callpoint,
  })

  root$.util = seneca_util

  private$.exports = { options: start_opts }
  private$.decorations = {}

  // Error events are fatal, unless you're undead.  These are not the
  // same as action errors, these are unexpected internal issues.
  root$.on('error', root$.die)

  private$.ge = GateExecutor({
    timeout: start_opts.timeout,
  })
    //.clear(action_queue_clear)
    .clear(ready.clear_ready)
    .start()

  // TODO: this should be a plugin
  // setup status log
  if (start_opts.status.interval > 0 && start_opts.status.running) {
    private$.stats = private$.stats || {}
    private$.status_interval = setInterval(function status() {
      root$.log.info({
        kind: 'status',
        alive: Date.now() - private$.stats.start,
        act: private$.stats.act,
      })
    }, start_opts.status.interval)
  }

  if (start_opts.stats) {
    private$.timestats = new Stats.NamedStats(
      start_opts.stats.size,
      start_opts.stats.interval
    )

    if (start_opts.stats.running) {
      setInterval(function stats() {
        private$.timestats.calculate()
      }, start_opts.stats.interval)
    }
  }

  // private$.plugins = {}
  private$.plugin_order = { byname: [], byref: [] }
  private$.use = UsePlugin({
    prefix: ['seneca-', '@seneca/'],
    module: start_opts.internal.module || module,
    msgprefix: false,
    builtin: '',
    merge_defaults: false,
  })

  // TODO: provide an api to add these
  private$.action_modifiers = [
    function add_rules_from_validate_annotation(actdef: any) {
      actdef.rules = Object.assign(
        actdef.rules,
        deep({}, actdef.func.validate || {})
      )
    },
  ]

  private$.sub = { handler: null, tracers: [] }

  root$.order.add = new Ordu({
    name: 'add',
    debug: !!start_opts.debug.ordu || !!start_opts.order.add.debug,
  })
    .add(Add.task.translate)
    .add(Add.task.prepare)
    .add(Add.task.plugin)
    .add(Add.task.callpoint)
    .add(Add.task.flags)
    .add(Add.task.action)
    .add(Add.task.prior)
    .add(Add.task.rules)
    .add(Add.task.register)
    .add(Add.task.modify)

  root$.order.inward = new Ordu({
    name: 'inward',
    debug: !!start_opts.debug.ordu || !!start_opts.order.inward.debug,
  })
    .add(Inward.inward_msg_modify)
    .add(Inward.inward_closed)
    .add(Inward.inward_act_cache)
    .add(Inward.inward_act_default)
    .add(Inward.inward_act_not_found)
    .add(Inward.inward_act_stats)
    .add(Inward.inward_validate_msg)
    .add(Inward.inward_warnings)
    .add(Inward.inward_msg_meta)
    .add(Inward.inward_limit_msg)
    .add(Inward.inward_prepare_delegate)
    .add(Inward.inward_sub)
    .add(Inward.inward_announce)

  root$.order.outward = new Ordu({
    name: 'outward',
    debug: !!start_opts.debug.ordu || !!start_opts.order.outward.debug,
  })
    .add(Outward.outward_make_error)
    .add(Outward.outward_act_stats)
    .add(Outward.outward_act_cache)
    .add(Outward.outward_res_object)
    .add(Outward.outward_res_entity)
    .add(Outward.outward_msg_meta)
    .add(Outward.outward_trace)
    .add(Outward.outward_sub)
    .add(Outward.outward_announce)
    .add(Outward.outward_act_error)

  // Configure logging

  // Mark logger as being externally defined from options
  if (start_opts.logger && 'object' === typeof start_opts.logger) {
    start_opts.logger.from_options$ = true
  }

  // Load logger and update log options
  var logspec = private$.logging.build_log(root$)
  start_opts = private$.exports.options = private$.optioner.set({
    log: logspec,
  })

  if (start_opts.test) {
    root$.test('string' === typeof start_opts.test ? start_opts.test : null)
  }

  if (start_opts.quiet) {
    root$.quiet()
  }

  private$.exit_close = function() {
    root$.close(function root_exit_close(err: any) {
      if (err && true != private$.optioner.get().quiet) {
        private$.print.err(err)
      }

      start_opts.system.exit(err ? (err.exit === null ? 1 : err.exit) : 0)
    })
  }

  if (start_opts.system.action.add) {
    addActions(root$)
  }


  start_opts.legacy.error = false

  // TODO: move to static options in Seneca 4.x
  start_opts.transport = deep(
    {
      port: 62345,
      host: '127.0.0.1',
      path: '/act',
      protocol: 'http',
    },
    start_opts.transport
  )

  transport(root$)


  Print(root$, start_opts.debug.argv || process.argv)

  Common.each(start_opts.system.close_signals, function(active: any, signal: any) {
    if (active) {
      process.once(signal, private$.exit_close)
    }
  })

  // Emit start message (also ensures GateExecutor will trigger 'ready' event)
  root$.act('sys:seneca,on:point,point:start')

  return root$
}

// Private member variables of Seneca object.
function make_private() {
  return {
    logging: make_logging(),
    stats: {
      start: Date.now(),
      act: {
        calls: 0,
        done: 0,
        fails: 0,
        cache: 0,
      },
      actmap: {},
    },
    actdef: {},
    transport: {
      register: [],
    },
    plugins: {
      // Virtual "plugin" for top level actions.
      root$: {
        name: 'root$',
        fullname: 'root$',
        tag: '-',
        options: Object.create(null),
        shared: Object.create(null),
      }
    },
    ignore_plugins: {},
    // intercept: { act_error: [] }
  }
}