src/consumer.js
/*globals Blob, ArrayBuffer, DataView */
/*jslint indent:2, node:true, sloppy:true */
var util = require('./util');
/**
* A freedom port for a user-accessable api.
* @class Consumer
* @implements Port
* @uses handleEvents
* @param {Object} interfaceCls The api interface exposed by this consumer.
* @param {Debug} debug The debugger to use for logging.
* @constructor
*/
var Consumer = function (interfaceCls, debug) {
this.id = Consumer.nextId();
this.interfaceCls = interfaceCls;
this.debug = debug;
util.handleEvents(this);
this.ifaces = {};
this.closeHandlers = {};
this.errorHandlers = {};
this.emits = {};
};
/**
* Receive incoming messages for this consumer.
* @method onMessage
* @param {String} source The source of the message.
* @param {Object} message The received message.
*/
Consumer.prototype.onMessage = function (source, message) {
if (source === 'control' && message.reverse) {
this.emitChannel = message.channel;
this.emit(this.emitChannel, {
type: 'channel announcement',
channel: message.reverse
});
this.emit('start');
} else if (source === 'control' && message.type === 'setup') {
this.controlChannel = message.channel;
} else if (source === 'control' && message.type === 'close') {
delete this.controlChannel;
this.doClose();
} else {
if (!this.emitChannel && message.channel) {
this.emitChannel = message.channel;
this.emit('start');
return;
}
if (message.type === 'close' && message.to) {
this.teardown(message.to);
return;
}
if (message.type === 'error') {
this.error(message.to, message.message);
return;
}
if (message.to) {
if (this.emits[message.to]) {
this.emits[message.to]('message', message.message);
} else {
this.debug.warn('Could not deliver message, no such interface: ' + message.to);
}
} else {
var msg = message.message;
util.eachProp(this.emits, function (iface) {
iface('message', message.message);
});
}
}
};
/**
* Create a consumer.Interface associated with this consumer.
* An interface is returned, which is supplied with important control of the
* api via constructor arguments: (bound below in getInterfaceConstructor)
*
* onMsg: function(binder) sets the function to call when messages for this
* interface arrive on the channel,
* emit: function(msg) allows this interface to emit messages,
* id: string is the Identifier for this interface.
* @method getInterface
*/
Consumer.prototype.getInterface = function () {
var Iface = this.getInterfaceConstructor(),
args = Array.prototype.slice.call(arguments, 0);
if (args.length) {
Iface = Iface.bind.apply(Iface, [Iface].concat(args));
}
return new Iface();
};
/**
* Attach an 'onEvent' listener to an interface, allowing external consumers
* to either listen to channel state, or register callbacks on lifetime events
* of individual instances of the interface.
* @method getListener
* @parma {String} name The event to listen to.
* @private
*/
Consumer.prototype.getListener = function (name) {
return function (instance, handler) {
// Listen to the channel directly.
if (typeof instance === 'function' && handler === undefined) {
this.once(name, instance);
return;
}
// Listen to a specific instance.
var handlers = name + 'Handlers';
util.eachProp(this.ifaces, function (candidate, id) {
if (candidate === instance) {
if (this[handlers][id]) {
this[handlers][id].push(handler);
} else {
this[handlers][id] = [handler];
}
return true;
}
}.bind(this));
}.bind(this);
};
/**
* Create a function that can be used to get interfaces from this api consumer
* from a user-visible point.
* @method getProxyInterface
*/
Consumer.prototype.getProxyInterface = function () {
var func = function (p) {
var args = Array.prototype.slice.call(arguments, 1);
if (args.length > 0) {
return p.getInterface.apply(p, args);
} else {
return p.getInterface();
}
}.bind({}, this);
func.close = function (iface) {
if (iface) {
util.eachProp(this.ifaces, function (candidate, id) {
if (candidate === iface) {
this.teardown(id);
this.emit(this.emitChannel, {
type: 'close',
to: id
});
return true;
}
}.bind(this));
} else {
// Close the channel.
this.doClose();
}
}.bind(this);
func.onClose = this.getListener('close');
func.onError = this.getListener('error');
return func;
};
/**
* Provides a bound class for creating a consumer.Interface associated
* with this api. This partial level of construction can be used
* to allow the consumer to be used as a provider for another API.
* @method getInterfaceConstructor
* @private
*/
Consumer.prototype.getInterfaceConstructor = function () {
var id = Consumer.nextId();
return this.interfaceCls.bind(
{},
function (id, obj, binder) {
this.ifaces[id] = obj;
this.emits[id] = binder;
}.bind(this, id),
this.doEmit.bind(this, id),
this.debug
);
};
/**
* Emit a message on the channel once setup is complete.
* @method doEmit
* @private
* @param {String} to The ID of the flow sending the message.
* @param {Object} msg The message to emit
* @param {Boolean} all Send message to all recipients.
*/
Consumer.prototype.doEmit = function (to, msg, all) {
if (all) {
to = false;
}
if (this.emitChannel) {
this.emit(this.emitChannel, {to: to, type: 'message', message: msg});
} else {
this.once('start', this.doEmit.bind(this, to, msg));
}
};
/**
* Teardown a single interface of this api.
* @method teardown
* @param {String} id The id of the interface to tear down.
*/
Consumer.prototype.teardown = function (id) {
if (this.emits[id]) {
this.emits[id]('close');
}
delete this.emits[id];
if (this.closeHandlers[id]) {
util.eachProp(this.closeHandlers[id], function (prop) {
prop();
});
}
delete this.ifaces[id];
delete this.closeHandlers[id];
delete this.errorHandlers[id];
};
/**
* Handle a message error reported to this api.
* @method error
* @param {String?} id The id of the interface where the error occured.
* @param {Object} message The message which failed, if relevant.
*/
Consumer.prototype.error = function (id, message) {
if (id && this.errorHandlers[id]) {
util.eachProp(this.errorHandlers[id], function (prop) {
prop(message);
});
} else if (!id) {
this.emit('error', message);
}
};
/**
* Close / teardown the flow this api terminates.
* @method doClose
*/
Consumer.prototype.doClose = function () {
if (this.controlChannel) {
this.emit(this.controlChannel, {
type: 'Channel Closing',
request: 'close'
});
}
util.eachProp(this.emits, function (emit, id) {
this.teardown(id);
}.bind(this));
this.emit('close');
this.off();
this.emitChannel = null;
};
/**
* Get the textual description of this port.
* @method toString
* @return The description of this port.
*/
Consumer.prototype.toString = function () {
if (this.emitChannel) {
return "[Consumer " + this.emitChannel + "]";
} else {
return "[unbound Consumer]";
}
};
/**
* Get the next ID for an api channel.
* @method nextId
* @static
* @private
*/
Consumer.nextId = function () {
if (!Consumer.id) {
Consumer.id = 1;
}
return (Consumer.id += 1);
};
/**
* Convert a structured data structure into a message stream conforming to
* a template and an array of binary data elements.
* @static
* @method messageToPortable
* @param {Object} template The template to conform to
* @param {Object} value The instance of the data structure to confrom
* @param {Debug} debug A debugger for errors.
* @return {{text: Object, binary: Array}} Separated data streams.
*/
Consumer.messageToPortable = function (template, value, debug) {
var externals = [],
message = Consumer.conform(template, value, externals, true, debug);
return {
text: message,
binary: externals
};
};
/**
* Convert Structured Data streams into a data structure conforming to a
* template.
* @static
* @method portableToMessage
* @param {Object} template The template to conform to
* @param {{text: Object, binary: Array}} streams The streams to conform
* @param {Debug} debug A debugger for errors.
* @return {Object} The data structure matching the template.
*/
Consumer.portableToMessage = function (template, streams, debug) {
return Consumer.conform(template, streams.text, streams.binary, false, debug);
};
/**
* Force a collection of values to look like the types and length of an API
* template.
* @static
* @method conform
* @param {Object} template The template to conform to
* @param {Object} from The value to conform
* @param {Array} externals Listing of binary elements in the template
* @param {Boolean} Whether to to separate or combine streams.
* @aparam {Debug} debug A debugger for errors.
*/
Consumer.conform = function (template, from, externals, separate, debug) {
/* jshint -W086 */
if (typeof (from) === 'function' && template !== 'proxy') {
//from = undefined;
//throw "Trying to conform a function";
debug.error("Message discarded as functions can't cross modules!");
return undefined;
} else if (typeof (from) === 'undefined') {
return undefined;
} else if (from === null) {
return null;
} else if (template === undefined) {
debug.error("Message discarded for not matching declared type!", from);
return undefined;
}
switch (template) {
case 'string':
return String('') + from;
case 'number':
return Number(1) * from;
case 'boolean':
return Boolean(from === true);
case 'object':
// TODO(willscott): Allow removal if sandboxing enforces this.
if (typeof from === 'undefined') {
return undefined;
} else {
return JSON.parse(JSON.stringify(from));
}
case 'blob':
if (separate) {
if (from instanceof Blob) {
externals.push(from);
return externals.length - 1;
} else {
debug.error('conform expecting Blob, but saw ' + (typeof from));
externals.push(new Blob([]));
return externals.length - 1;
}
} else {
return externals[from];
}
case 'buffer':
if (separate) {
externals.push(Consumer.makeArrayBuffer(from, debug));
return externals.length - 1;
} else {
return Consumer.makeArrayBuffer(externals[from], debug);
}
case 'proxy':
return from;
}
var val, i;
if (Array.isArray(template) && from !== undefined) {
val = [];
i = 0;
if (template.length === 2 && template[0] === 'array') {
//console.log("template is array, value is " + JSON.stringify(value));
for (i = 0; i < from.length; i += 1) {
val.push(Consumer.conform(template[1], from[i], externals,
separate, debug));
}
} else {
for (i = 0; i < template.length; i += 1) {
if (from[i] !== undefined) {
val.push(Consumer.conform(template[i], from[i], externals,
separate, debug));
} else {
val.push(undefined);
}
}
}
return val;
} else if (typeof template === 'object' && from !== undefined) {
val = {};
util.eachProp(template, function (prop, name) {
if (from[name] !== undefined) {
val[name] = Consumer.conform(prop, from[name], externals, separate,
debug);
}
});
return val;
}
debug.error('Unknown template provided: ' + template);
};
/**
* Make a thing into an Array Buffer
* @static
* @method makeArrayBuffer
* @param {Object} thing
* @param {Debug} debug A debugger in case of errors.
* @return {ArrayBuffer} An Array Buffer
*/
Consumer.makeArrayBuffer = function (thing, debug) {
if (!thing) {
return new ArrayBuffer(0);
}
if (thing instanceof ArrayBuffer) {
return thing;
} else if (thing.constructor.name === "ArrayBuffer" &&
typeof thing.prototype === "undefined") {
// Workaround for webkit origin ownership issue.
// https://github.com/UWNetworksLab/freedom/issues/28
return new DataView(thing).buffer;
} else {
debug.error('expecting ArrayBuffer, but saw ' +
(typeof thing) + ': ' + JSON.stringify(thing));
return new ArrayBuffer(0);
}
};
/**
* Recursively traverse a [nested] object and freeze its keys from being
* writable. Note, the result can have new keys added to it, but existing ones
* cannot be overwritten. Doesn't do anything for arrays or other collections.
*
* @method recursiveFreezeObject
* @static
* @param {Object} obj - object to be frozen
* @return {Object} obj
**/
Consumer.recursiveFreezeObject = function (obj) {
var k, ret = {};
if (typeof obj !== 'object') {
return obj;
}
for (k in obj) {
if (obj.hasOwnProperty(k)) {
Object.defineProperty(ret, k, {
value: Consumer.recursiveFreezeObject(obj[k]),
writable: false,
enumerable: true
});
}
}
return ret;
};
module.exports = Consumer;