lsm/micromono

View on GitHub
lib/api/rpc.js

Summary

Maintainability
A
2 hrs
Test Coverage
var logger = require('../logger')('micromono:rpc')
var toArray = require('lodash.toarray')


/**
 * The RPC class for managing different transport adapters.
 *
 * @param  {Object} options Options for RPC with following format:
 *
 * ```javascript
 * {
 *   api: {
 *     fn: function(){}
 *   }, // an object contains handler functions
 *   type: 'axon', // type of adapter or adapter it self
 *   isRemote: true, // whether this is client side or server side
 *   scheduler: obj, // the scheduler for distributing requests, client side only
 * }
 * ```
 *
 * @return {RPC}         Instance of RPC.
 */
var RPC = module.exports = function MicromonoRPC(options) {
  logger.info('Initialize MicromonoRPC', {
    type: options.type,
    isRemote: options.isRemote
  }).trace(options)

  var rpcAdapter

  // figure out adapter
  if ('string' === typeof options.type) {
    this.type = options.type
    rpcAdapter = require('./' + this.type)
  } else if ('object' === typeof options.type) {
    rpcAdapter = options.type
    this.type = rpcAdapter.type
  }

  if ('function' === typeof rpcAdapter) {
    rpcAdapter = new rpcAdapter()
    // Keep a reference of the adpater so if adpater support multiple connections
    // it can refer to itself handle that internally.
    this.adapter = rpcAdapter
  }

  if ('object' !== typeof rpcAdapter)
    throw new Error('options.type should be either type of adapter or the adapter itself, got ' + typeof options.type)

  // internal object holds all the api handlers
  this._handlers = {}

  // Override serializer if found in adapter.
  if (rpcAdapter.serialize)
    this.serialize = rpcAdapter.serialize

  if (rpcAdapter.deserialize)
    this.deserialize = rpcAdapter.deserialize

  // add client or server features
  if (options.isRemote)
    this.prepareClient(rpcAdapter, options)
  else
    this.prepareServer(rpcAdapter, options)
}

RPC.prototype.prepareClient = function(rpcAdapter, options) {
  logger.debug('Prepare RPC client', {
    type: options.type,
    service: options.ann.name + '@' + options.ann.version
  }).trace(options)

  var self = this
  this.ann = options.ann
  this.send = rpcAdapter.send
  this.connect = rpcAdapter.connect
  this.scheduler = options.scheduler
  this.scheduler.on('add', function(provider) {
    logger.info('Found new RPC provider', {
      host: provider.host,
      service: provider.name + '@' + provider.version
    }).trace(provider)
    self.connect(provider)
  })

  if (options.api)
    Object.keys(options.api).forEach(this.addRemoteAPI.bind(this))
}

RPC.prototype.prepareServer = function(rpcAdapter, options) {
  logger.debug('Prepare RPC server', {
    type: options.type
  }).trace(options)

  this.startServer = rpcAdapter.startServer
  if (options.api) {
    var api = options.api
    var self = this
    Object.keys(api).forEach(function(apiName) {
      var handler = api[apiName]
      // Add local api handler
      self.addAPI(apiName, handler)
    })
  }
}

/**
 * Handler for disconnect event of provider
 *
 * @param  {Object} provider The annoucement data of disconnected provider.
 */
RPC.prototype.onProviderDisconnect = function(provider) {
  logger.info('RPC provider disconnected', {
    id: provider.id.slice(0, 8),
    host: provider.host,
    service: provider.name + '@' + provider.version
  }).trace(provider)

  this.scheduler.remove(provider)
}

/**
 * Add an API handler.
 *
 * @param {String} name       Name of the api.
 * @param {Function} handler  Handler of the api.
 */
RPC.prototype.addAPI = function(name, handler) {
  if ('function' === typeof handler) {
    logger.debug('Add server api', {
      name: name,
      args: handler.args
    })

    this._handlers[name] = {
      name: name,
      args: handler.args,
      handler: handler
    }
  }
}

/**
 * Generate a remote api handler based on name.
 *
 * @param {String} name Name of the remote api.
 */
RPC.prototype.addRemoteAPI = function(name) {
  logger.debug('Generate local interface of remote api', {
    name: name,
    service: this.ann.name + '@' + this.ann.version
  })

  var self = this
  this._handlers[name] = function() {
    var args = toArray(arguments)
    var data = {
      name: name,
      args: args
    }
    self.send(data)
  }
}

/**
 * Get an api handler by name.
 *
 * @param {String} name       Name of the api.
 * @param {Function} The handler function.
 */
RPC.prototype.getHandler = function(name) {
  var handler = this._handlers[name]
  if (handler && handler.handler)
    handler = handler.handler

  return handler
}

/**
 * Get all api handlers.
 *
 * @param {Object} The api handlers object.
 */
RPC.prototype.getAPIs = function() {
  return this._handlers
}

/**
 * Dispatch message received to corresponding api handler.
 *
 * @param  {String|Buffer}   msg      The message data.
 * @param  {Function} reply A callback function for replying the result to client.
 */
RPC.prototype.dispatch = function(msg, reply) {
  var data = this.deserialize(msg)
  var handler = this.getHandler(data.name)

  if (handler) {
    var args = data.args || []
    if (true === data.cid)
      args.push(reply)

    handler.apply(null, args)
  }
}

/**
 * Serialize data.
 *
 * @param  {Any}    data Data to serialize.
 * @return {String}      Serialized data.
 */
RPC.prototype.serialize = function(data) {
  return JSON.stringify(data)
}

/**
 * Deserialize message to data.
 *
 * @param  {String} msg Message data to deserialize.
 * @return {Any}        Deserialized data.
 */
RPC.prototype.deserialize = function(msg) {
  return JSON.parse(msg)
}