lib/backend.js
var zmq = require('zeromq'),
errors = require('../config/errors'),
metric = require('./metric'),
Logger = require('logger-facade-nodejs'),
_ = require('lodash'),
Message = require('zmq-service-suite-message'),
msgpack = require('msgpack-js');
var IDENTITY_FRAME = 0,
PROTOCOL_FRAME = 1,
TYPE_FRAME = 2,
RID_FRAME = 3,
ADDRESS_FRAME = 4,
HEADERS_FRAME = 5,
STATUS_FRAME = 6,
PAYLOAD_FRAME = 7;
var REPLY_FRAME_SIZE = 9;
var Backend = function(configuration, smiService){
var instance;
var log = Logger.getLogger('Backend');
var defaults = {
backend: 'tcp://127.0.0.1:5559'
};
var config = _.defaults(configuration, defaults);
var socket;
var smiActions = ['UP','DOWN','HEARTBEAT'];
var reply = function(message){
message.type = Message.Type.REP;
if(message.address.sid === 'SMI') {
log.debug("reply to: %s rid: %s with status: %s",
message.identity, message.rid, message.status);
} else {
log.info("reply to: %s rid: %s with status: %s",
message.identity, message.rid, message.status);
}
// TODO: log frames in debug mode
// if(log.isDebug()){
// log.debug(message.toString());
// }
socket.send(message.toFrames());
};
var forward = function(frames){
// on forward the frist frames is destination
frames[TYPE_FRAME + 1] = Message.Type.REQ;
frames = metric.beStart(frames);
log.info("forward request from: %s rid: %s to: %s",
frames[IDENTITY_FRAME + 1], frames[RID_FRAME + 1], frames[IDENTITY_FRAME]);
// TODO: log frames in debug mode
// if(log.isDebug()){
// var message = Message.parse(frames);
// log.debug(message.toString());
// }
socket.send(frames);
};
var replyError = function(errorCode, message){
var error = errors[errorCode.toString()];
message.status = error.code;
message.payload = error.body;
reply(message);
};
var getSMIAction = function(address){
var verb = address.verb.toUpperCase();
if(!_.includes(smiActions, verb)){
return null;
}
return smiService[verb.toLowerCase()];
};
var handleRequest = function(frames) {
var message = Message.parse(frames);
var isSMIRequest = message.address.sid === 'SMI';
if(!isSMIRequest){
log.error("doesn't respond to %s", message.address.sid);
}
var action = getSMIAction(message.address);
if(action === null){
log.error("SMI service doesn't respond to %s", message.address.verb);
}
if (isSMIRequest && action !== null){
log.debug("routing from: %s rid: %s to SMI.%s request...",
message.identity, message.rid, message.address.verb);
message = action(message);
if(message.status === 200) {
reply(message);
return;
}
} else {
message.status = 500;
}
replyError(message.status, message);
};
var onMessage = function(){
var frames = _.toArray(arguments);
var from;
// check if it is reply
if(frames.length === REPLY_FRAME_SIZE){
// remove worker identity and let client identity on position 0
from = frames.shift();
}
log.debug("received from: %s rid: %s",
from || frames[IDENTITY_FRAME], frames[RID_FRAME]);
// TODO: log frames in debug mode
// if(log.isDebug()) {
// var msg = Message.parse(frames);
// log.debug(msg.toString());
// }
var isRequest = frames[TYPE_FRAME].toString('utf8') === Message.Type.REQ;
if (isRequest) {
handleRequest(frames);
return;
}
var to = frames[IDENTITY_FRAME];
if(!to){
log.error("invalid routing from: %s to: %s", from, to);
return;
}
frames = metric.beEnd(frames);
var status = frames[STATUS_FRAME];
log.info("forward to frontend from: %s to: %s sending: %s", from, to, status);
instance.frontendSendCallback(frames);
};
var onError = function(error){
// reply with error
log.error("received zmq error => %s", error.stack);
};
var run = function(){
log.info('connected to %s', config.backend);
socket = zmq.socket('router');
socket.on('message', onMessage);
socket.on('error', onError);
socket.bindSync(config.backend);
};
var stop = function(){
log.info('disconnected from %s', config.backend);
socket.close();
};
instance = {
send: forward,
run: run,
stop: stop,
frontendSendCallback: null
};
return instance;
};
module.exports = Backend;