src/provider.js
/*jslint indent:2, node:true, sloppy:true, browser:true */
var Consumer = require('./consumer');
var util = require('./util');
/**
* A freedom port for a user-accessable provider.
* @class Provider
* @implements Port
* @uses handleEvents
* @param {Object} def The interface of the provider.
* @param {Debug} debug The debugger to use for logging.
* @contructor
*/
var Provider = function (def, debug) {
this.id = Consumer.nextId();
util.handleEvents(this);
this.debug = debug;
this.definition = def;
this.mode = Provider.mode.synchronous;
this.channels = {};
this.iface = null;
this.closeHandlers = {};
this.providerCls = null;
this.ifaces = {};
this.emits = {};
};
/**
* Provider modes of operation.
* @property mode
* @static
* @type number
*/
Provider.mode = {
synchronous: 0,
asynchronous: 1,
promises: 2
};
/**
* Receive external messages for the provider.
* @method onMessage
* @param {String} source the source identifier of the message.
* @param {Object} message The received message.
*/
Provider.prototype.onMessage = function (source, message) {
if (source === 'control' && message.reverse) {
this.channels[message.name] = message.channel;
this.emit(message.channel, {
type: 'channel announcement',
channel: message.reverse
});
this.emit('start');
} else if (source === 'control' && message.type === 'setup') {
this.controlChannel = message.channel;
} else if (source === 'control' && message.type === 'close') {
if (message.channel === this.controlChannel) {
delete this.controlChannel;
}
this.close();
} else {
if (!this.channels[source] && message.channel) {
this.channels[source] = message.channel;
this.emit('start');
return;
} else if (!this.channels[source]) {
this.debug.warn('Message from unconfigured source: ' + source);
return;
}
if (message.type === 'close' && message.to) {
this.teardown(source, message.to);
} else if (message.to && this.emits[source] &&
this.emits[source][message.to]) {
message.message.to = message.to;
this.emits[source][message.to](message.message);
} else if (message.to && message.message &&
message.message.type === 'construct') {
var args = Consumer.portableToMessage(
(this.definition.constructor && this.definition.constructor.value) ?
this.definition.constructor.value : [],
message.message,
this.debug
),
instance;
if (!this.ifaces[source]) {
this.ifaces[source] = {};
this.emits[source] = {};
}
this.ifaces[source][message.to] = true;
instance = this.getProvider(source, message.to, args);
// don't save a reference to instance if it closed itself already.
if (this.ifaces[source] &&
this.ifaces[source][message.to]) {
this.ifaces[source][message.to] = instance.instance;
this.emits[source][message.to] = instance.onmsg;
}
} else {
this.debug.warn(this.toString() + ' dropping message ' +
JSON.stringify(message));
}
}
};
/**
* Close / teardown the flow this provider terminates.
* @method close
*/
Provider.prototype.close = function () {
if (this.controlChannel) {
this.emit(this.controlChannel, {
type: 'Provider Closing',
request: 'close'
});
delete this.controlChannel;
}
this.emit('close');
// Release references.
delete this.iface;
delete this.providerCls;
this.ifaces = {};
this.emits = {};
this.emitChannel = null;
};
/**
* Teardown a single instance of an object fulfilling this provider.
* @method teardown
* @param {String} source The consumer source of the instance.
* @param {String} id The id of the instance to tear down.
*/
Provider.prototype.teardown = function (source, id) {
// Ignore teardown of non-existant ids.
if (!this.ifaces[source]) {
return;
}
delete this.ifaces[source][id];
delete this.emits[source][id];
if (this.closeHandlers[source] && this.closeHandlers[source][id]) {
util.eachProp(this.closeHandlers[source][id], function (prop) {
prop();
});
delete this.closeHandlers[source][id];
}
};
/**
* Get an interface to expose externally representing this port.
* Providers are registered with the port using either
* provideSynchronous or provideAsynchronous depending on the desired
* return interface.
* @method getInterface
* @return {Object} The external interface of this Provider.
*/
Provider.prototype.getInterface = function () {
if (this.iface) {
return this.iface;
} else {
var sanityCheck = function (provider) {
if (typeof provider !== "function") {
throw new Error("Provider " + this.toString() +
" needs to be implemented by a function.");
}
};
this.iface = {
provideSynchronous: function (prov) {
sanityCheck(prov);
this.providerCls = prov;
this.mode = Provider.mode.synchronous;
}.bind(this),
provideAsynchronous: function (prov) {
sanityCheck(prov);
this.providerCls = prov;
this.mode = Provider.mode.asynchronous;
}.bind(this),
providePromises: function (prov) {
sanityCheck(prov);
this.providerCls = prov;
this.mode = Provider.mode.promises;
}.bind(this)
};
util.eachProp(this.definition, function (prop, name) {
switch (prop.type) {
case "constant":
Object.defineProperty(this.iface, name, {
value: Consumer.recursiveFreezeObject(prop.value),
writable: false
});
break;
}
}.bind(this));
return this.iface;
}
};
/**
* Create a function that can be used to get interfaces from this provider from
* a user-visible point.
* @method getProxyInterface
*/
Provider.prototype.getProxyInterface = function () {
var func = function (p) {
return p.getInterface();
}.bind({}, this);
func.close = function (iface) {
if (iface) {
util.eachProp(this.ifaces, function (ids, source) {
util.eachProp(ids, function (candidate, id) {
if (candidate === iface) {
this.teardown(source, id);
this.emit(this.channels[source], {
type: 'close',
to: id
});
return true;
}
}.bind(this));
}.bind(this));
} else {
// Close the channel.
this.close();
}
}.bind(this);
func.onClose = function (iface, handler) {
// Listen to the channel directly.
if (typeof iface === 'function' && handler === undefined) {
this.once('close', iface);
return;
}
util.eachProp(this.ifaces, function (ids, source) {
util.eachProp(ids, function (candidate, id) {
if (candidate === iface) {
if (!this.closeHandlers[source]) {
this.closeHandlers[source] = {};
}
if (!this.closeHandlers[source][id]) {
this.closeHandlers[source][id] = [];
}
this.closeHandlers[source][id].push(handler);
return true;
}
}.bind(this));
}.bind(this));
}.bind(this);
return func;
};
/**
* Get a new instance of the registered provider.
* @method getProvider
* @param {String} source The port this instance is interactign with.
* @param {String} identifier the messagable address for this provider.
* @param {Array} args Constructor arguments for the provider.
* @return {Function} A function to send messages to the provider.
*/
Provider.prototype.getProvider = function (source, identifier, args) {
if (!this.providerCls) {
this.debug.error('Cannot instantiate provider, since it is not provided');
return {instance: undefined, onmsg: undefined};
}
var events = {},
dispatchEvent,
BoundClass,
instance;
util.eachProp(this.definition, function (prop, name) {
if (prop.type === 'event') {
events[name] = prop;
}
});
dispatchEvent = function (src, ev, id, name, value) {
if (ev[name]) {
var streams = Consumer.messageToPortable(ev[name].value, value,
this.debug);
this.emit(this.channels[src], {
type: 'message',
to: id,
message: {
name: name,
type: 'event',
text: streams.text,
binary: streams.binary
}
});
}
}.bind(this, source, events, identifier);
// this is all to say: new providerCls(dispatchEvent, args[0], args[1],...)
BoundClass = this.providerCls.bind.apply(this.providerCls,
[this.providerCls, dispatchEvent].concat(args || []));
instance = new BoundClass();
return {
instance: instance,
onmsg: function (port, src, msg) {
var prop, debug, args, returnPromise, ret;
if (msg.action === 'method') {
if (typeof this[msg.type] !== 'function') {
port.debug.warn("Provider does not implement " + msg.type + "()!");
port.emit(port.channels[src], {
type: 'method',
to: msg.to,
message: {
to: msg.to,
type: 'method',
reqId: msg.reqId,
name: msg.type,
error: 'Provider does not implement ' + msg.type + '()!'
}
});
return;
}
prop = port.definition[msg.type];
debug = port.debug;
args = Consumer.portableToMessage(prop.value, msg, debug);
if (msg.reqId === null) {
// Reckless call. Ignore return value.
ret = function(resolve, reject) {
if (reject) {
debug.error(reject);
}
};
} else {
ret = function (src, msg, prop, resolve, reject) {
var streams = Consumer.messageToPortable(prop.ret, resolve,
debug);
this.emit(this.channels[src], {
type: 'method',
to: msg.to,
message: {
to: msg.to,
type: 'method',
reqId: msg.reqId,
name: msg.type,
text: streams.text,
binary: streams.binary,
error: reject
}
});
}.bind(port, src, msg, prop);
}
if (!Array.isArray(args)) {
args = [args];
}
if (port.mode === Provider.mode.synchronous) {
try {
ret(this[msg.type].apply(this, args));
} catch (e1) {
ret(undefined, e1.message + ' ' + e1.stack);
}
} else if (port.mode === Provider.mode.asynchronous) {
try {
this[msg.type].apply(instance, args.concat(ret));
} catch (e2) {
ret(undefined, e2.message + ' ' + e2.stack);
}
} else if (port.mode === Provider.mode.promises) {
try {
returnPromise = this[msg.type].apply(this, args);
if (returnPromise && returnPromise.then) {
returnPromise.then(ret, ret.bind({}, undefined));
} else {
ret(undefined, 'No promise returned from ' +
msg.type + ': ' + returnPromise);
}
} catch (e3) {
ret(undefined, e3.message + ' ' + e3.stack);
}
}
}
}.bind(instance, this, source)
};
};
/**
* Get a textual description of this port.
* @method toString
* @return {String} the description of this port.
*/
Provider.prototype.toString = function () {
if (this.emitChannel) {
return "[Provider " + this.emitChannel + "]";
} else {
return "[unbound Provider]";
}
};
module.exports = Provider;