voxgig/seneca-monitor

View on GitHub
monitor.js

Summary

Maintainability
C
7 hrs
Test Coverage
/* Copyright (c) 2017-2019 Richard Rodger and other contributors, MIT License */
'use strict'

var Util = require('util')
var Dgram = require('dgram')

var Hapi = require('@hapi/hapi')
var Inert = require('@hapi/inert')

module.exports = function monitor(options) {
  var seneca = this

  var opts = this.util.deepextend(
    {
      udp: {
        kind: 'udp4',
        port: 10111,
        host: '0.0.0.0',
      },
      web: {
        port: 10181,
        host: '0.0.0.0',
        folder: __dirname,
      },
    },
    options
  )

  var ds = Dgram.createSocket(opts.udp.kind)

  var spec = { ds: ds, seneca: seneca, opts: opts }

  if (options.collect) {
    return make_collector(spec)
  } else {
    return make_monitor(spec)
  }
}

function make_monitor(spec) {
  spec.seneca.on('act-in', function (msg, ignore, meta) {
    var m = msg.meta$ || meta

    var parents = m.parents || []
    var client = parents[0]

    if (!client) {
      return
    }

    var desc = [
      'M',
      m.pattern,
      m.id,
      m.instance,
      m.tag,
      m.version,
      m.sync ? 's' : 'a',
      m.start,
      m.parents.length,
      client[0],
      client[2],
      client[3],
      client[4],
    ].join('~')

    send(spec, desc)
  })

  spec.seneca.add('role:seneca,cmd:close', function (msg, reply) {
    var seneca = this

    send(spec, 'D~' + spec.seneca.id)
    setTimeout(function () {
      spec.ds.close(function () {
        spec.ds = null
        seneca.prior(msg, reply)
      })
    }, 100)
  })
}

function send(spec, desc) {
  if (!spec.ds) return

  var data = new Buffer(desc)
  spec.ds.send(
    data,
    0,
    data.length,
    spec.opts.udp.port,
    spec.opts.udp.host,
    function (err) {
      if (err) spec.seneca.log.warn(err)
    }
  )
}

function make_collector(spec) {
  if (!spec.ds) return

  spec.ds.on('message', function (data) {
    update(data.toString().split('~'))
  })
  spec.ds.bind(spec.opts.udp.port, spec.opts.udp.host)

  var map = {}
  spec.map = map

  spec.seneca.add('role:monitor,get:map', function (msg, reply) {
    reply(map)
  })

  spec.seneca.add('role:monitor,cmd:update', function (msg, reply) {
    update(msg.data || [])
    reply()
  })

  make_web(spec)

  function update(data) {
    var cmd = data[0]

    if ('D' == cmd) {
      var did = data[1]

      Object.keys(map).forEach(function (kid) {
        if (kid === did) {
          delete map[did]
          return
        }
        Object.keys(map[kid].in).forEach(function (pat) {
          Object.keys(map[kid].in[pat]).forEach(function (mid) {
            if (mid === did) {
              delete map[kid].in[pat][did]
            }
          })
        })
        Object.keys(map[kid].out).forEach(function (pat) {
          Object.keys(map[kid].out[pat]).forEach(function (mid) {
            if (mid === did) {
              delete map[kid].out[pat][did]
            }
          })
        })
      })
      return
    }

    var o = 1

    var pattern = data[o + 0]

    if (-1 != pattern.indexOf('role:seneca')) {
      return
    }

    var sync = data[o + 5]
    var start = data[o + 6]

    var rid = data[o + 2]
    var rtag = data[o + 3]
    var rver = data[o + 4]

    var sid = data[o + 9]
    var stag = data[o + 10]
    var sver = data[o + 11]

    if ('' === pattern || rid === sid) {
      return
    }

    // TODO: see seneca-component for an updated version and better names

    var r = (map[rid] = map[rid] || { in: {}, out: {}, tag: rtag })
    var s = (map[sid] = map[sid] || { in: {}, out: {}, tag: stag })

    var rin = (r.in[pattern] = r.in[pattern] || {})
    var sd = (rin[sid] = rin[sid] || {})
    sd.t = stag
    sd.v = sver
    sd.s = sync
    sd.c = 1 + (sd.c || 0)
    sd.w = start

    var sin = (s.out[pattern] = s.out[pattern] || {})
    var rd = (sin[rid] = sin[rid] || {})
    rd.t = rtag
    rd.v = rver
    rd.s = sync
    rd.c = 1 + (rd.c || 0)
    rd.w = start

    r.last = s.last = start
  }
}

async function make_web(spec) {
  var server = new Hapi.Server({
    port: spec.opts.web.port,
    host: spec.opts.web.host,
  })

  await server.register(Inert)

  server.route({
    method: 'GET',
    path: '/{path*}',
    handler: {
      directory: {
        path: spec.opts.web.folder + '/www',
      },
    },
  })

  server.route({
    method: 'GET',
    path: '/api/map',
    handler: async function (request, h) {
      return (
        (await Util.promisify(spec.seneca.act).call(
          spec.seneca,
          'role:monitor,get:map'
        )) || {}
      )
    },
  })

  await server.start()

  spec.seneca.log.info({ kind: 'notice', notice: Util.inspect(server.info) })
}