lib/connection.js
"use strict";
exports.Connection = function(backend, minLatency, maxLatency) {
var connection = this
, db = 0
, queue, watch, block
, timeout = 0
, state = NORMAL
, subs = 0;
this.push = function(client, command, args, callback) {
state(client, prep(command, args, callback));
};
// Push a command to a normal connection.
function NORMAL(client, entry) {
var i, n, matches;
// Transactions.
if (entry.command === "WATCH") {
entry.override = function() {
var i, n = entry.args.length;
if (!watch)
watch = {};
for (i = 0; i < n; i++)
if (!(entry.args[i]in watch))
watch[entry.args[i]] = backend.getRevision(entry.args[i]);
return "OK";
};
}
else if (entry.command === "UNWATCH") {
entry.override = function() {
watch = null;
return "OK";
};
}
else if (entry.command === "DISCARD") {
if (queue) {
if (!timeout)
timeout = setTimeout(exec, randLat());
for (i = 0; i < queue.length; i++)
if (queue[i].command === "MULTI") {
queue.splice(i, queue.length);
// This will substitute the DISCARD command with an UNWATCH,
// hence the recursive call to this.push.
return this.push(["UNWATCH"], entry.callback);
}
}
entry.override = function() { return "OK"; };
}
else if (entry.command === "MULTI") {
/*jshint loopfunc:true*/
entry.override = function(queue) {
if (!queue) throw new Error('WOOT! no queue.');
var w = watch, key, entry;
watch = null;
if (w) for (key in w)
if (backend.getRevision(key) !== w[key]) {
// Abort because of a change in the watched keyspace.
n = 0;
while ((entry = queue.shift())) {
if (entry.command === "EXEC") {
entry.override = function() {
var i, out = [];
for (i = 0; i < n; i++)
out[i] = null;
return out;
};
queue.unshift(entry);
break;
}
n++;
}
return "OK";
}
var replies = [];
var i, m = queue.length, cb = pushReply.bind(replies);
for (i = 0; i < m; i++) {
entry = queue[i];
if (entry.command !== "EXEC") {
// Collect replies for the EXEC output.
entry.callback = cb;
// Prevent blocking within a transaction.
delete entry.block;
}
else {
// Exec calls back with the entire reply list.
entry.override = entry.override.bind(replies);
return "OK";
}
}
throw new Error("WOOT! Can't find the EXEC command in the queue.");
};
// Prevent flushing before the exec.
if (timeout) {
clearTimeout(timeout);
timeout = 0;
}
if (queue)
queue.push(entry);
else
queue = [entry];
return;
}
else if (entry.command === "EXEC") {
entry.override = function() {
return this.join ? this : null;
};
if (queue && !timeout)
timeout = setTimeout(exec, randLat());
}
// Pubsub.
if ((matches = /^(P)?(UN)?SUBSCRIBE$/.exec(entry.command))) {
if (!client.$PUSHDELAY)
client.$PUSHDELAY = new Delay(client, 'pushMessage', minLatency);
entry.override = function() {
var i, n = entry.args.length;
if (n) for (i = 0; i < n; i++) {
// Unsubscribe.
if (matches[2])
subs = backend.unsub(matches[1]? true : false, entry.args[i], client.$PUSHDELAY);
// Subscribe.
else
subs = backend.sub(matches[1]? true : false, entry.args[i], client.$PUSHDELAY);
}
else if (matches[2]) {
// Unsubscribe from all.
subs = backend.unsub(matches[1]? true : false, null, client.$PUSHDELAY);
}
else
return new Error('Wrong number of arguments for \'' + matches[0] + '\' command');
if (!subs)
state = NORMAL;
return "OK";
};
if (!matches[2])
state = SUBSCRIBED;
}
// Connection.
if (entry.command === 'QUIT') {
entry.override = function() {
if (client.$PUSHDELAY) {
// Unsubscribe.
backend.unsub(true, null, client.$PUSHDELAY);
backend.unsub(false, null, client.$PUSHDELAY);
}
return "OK";
};
state = CLOSED;
}
else if (entry.command === 'SELECT') {
entry.override = function() {
var n = entry.args.length;
if (n !== 1)
return new Error("Wrong number of arguments for 'SELECT' command.");
var id = Number(entry.args[0]);
if ((!id && id !== 0) || id % 1 !== 0 || id < 0)
return new Error("invalid DB index");
db = id;
backend.selectDB(db);
return "OK";
};
}
// Regular commands.
if (queue)
queue.push(entry);
else {
queue = [entry];
timeout = setTimeout(exec, randLat());
}
}
// Push a command to a subscribed connection.
function SUBSCRIBED(client, entry) {
// Allow commands that modify the subscription set.
if (/SUBSCRIBE|^QUIT/.test(entry.command))
NORMAL(client, entry);
else
throw new Error("fakeredis: Connection is in pub/sub mode (" + subs + " subscriptions).");
}
// Closed connection.
function CLOSED(client, entry) {
throw new Error("fakeredis: You've closed this connection with QUIT, cannot " + entry.command);
}
// Blocked connection.
function BLOCKED(client, entry) {
if (!block)
block = [client, entry];
else
block.push(client, entry);
}
// Execute everything in the queue sequentially.
function exec() {
timeout = 0;
var q = queue, entry, func, out, err, data, resp = [];
queue = null;
if (connection.verbose)
console.log('\n');
backend.selectDB(db);
if (q) while ((entry = q.shift())) {
if (entry === 'SKIP')
continue;
func = backend[entry.command];
out = null;
if (connection.verbose)
console.log("fakeredis>", entry.command, entry.args.join(' '));
if (entry.override) {
out = entry.override(q);
err = out instanceof Error ? out : null;
data = out instanceof Error ? null : out;
}
else if (!func || typeof func !== 'function')
throw new Error('WOOT! Wierd queue entry : ' + JSON.stringify(entry) + ' / ' + JSON.stringify(q));
else if (func.length && func.length !== entry.args.length) {
err = new Error('Wrong number of arguments for \'' + entry.command.toLowerCase() + '\' command');
data = null;
}
else {
out = func.apply(backend, entry.args);
err = ((out && out.getError) || null) && new Error(out.getError());
data = err ? null : (out && out.getStatus && out.getStatus()) || out;
// Block if necessary.
if (entry.block && err === null && data === null) {
if (resp.length)
flush(resp);
q.unshift(entry);
queue = q;
state = BLOCKED;
backend.sub(false, backend.UPDATE, connection);
if (entry.block && typeof entry.block === 'number')
setTimeout(unblock.bind(null, entry), entry.block * 1000);
return;
}
}
if (!err && !data && typeof out === "undefined")
throw new Error("WOOT! Backend returned undefined.");
if (out && out.rev)
throw new Error("WOOT! Returning the whole keyspace entry.");
if (data === true)
throw new Error("TRUE THAT! " + JSON.stringify(entry));
data = fdata(data);
if (entry.callback)
resp.push(entry.callback.bind(null, err, data));
}
if (connection.verbose)
console.log('\n');
if (resp.length)
flush(resp);
}
function flush(resp) {
setTimeout(
function() {
var i, n;
n = resp.length;
for (i = 0; i < n; i++)
resp[i]();
},
minLatency
);
}
function unblock(entry) {
if (entry)
delete entry.block;
state = NORMAL;
exec();
if (state === NORMAL) {
backend.unsub(false, backend.UPDATE, connection);
var a = block, i, n = a && a.length;
block = null;
for (i = 0; i < n; i += 2)
NORMAL(a[i], a[i + 1]);
}
}
this.pushMessage = function(/* type, channel, message */) {
// Attempt to unblock on backend keyspace change.
unblock();
};
// Format data the way it comes out of node_redis.
function fdata(data) {
if (typeof data !== 'object' && typeof data !== 'number' && typeof data !== 'string')
throw new Error('WOOT! Data is not an object/string/number : ' + data);
if (data) {
if (typeof data === 'string' && !isNaN(data))
data = Number(data);
else if (data.length && data.map)
data = data.map(finnerdata);
else if (typeof data === 'object' && !data.map)
throw new Error('WOOT! Illegal object in data : ' + data);
}
return data;
}
function finnerdata(data) {
if (typeof data !== 'object' && typeof data !== 'number' && typeof data !== 'string')
throw new Error('WOOT! Data is not an object/string/number : ' + data);
if (data || data === 0) {
if (typeof data === 'number')
data = String(data);
else if (data.length && data.map)
data = data.map(finnerdata);
else if (typeof data === 'object' && !data.map)
throw new Error('WOOT! Illegal object in data : ' + data);
}
return data;
}
// Prepare command.
function prep(command, args, callback) {
args = args.map(function(arg) { return String(arg); });
command = command.toUpperCase();
var block = false;
if (/^B[LR]POP/.test(command) && args.length) // Backend will validate the timeout param more robustly.
block = parseInt(args[args.length - 1], 10) || true;
if (!backend[command])
throw new Error("fakeredis: " + command + " is not implemented in fakeredis. Let me know if you need it.");
return { command: command, args: args, callback: callback, block: block };
}
// Helper to push replies onto the replies list.
function pushReply(err, data) {
/*jshint validthis:true*/
this.push(err ||fdata(data));
}
// Immitate latency.
minLatency = Math.ceil(minLatency || 15);
maxLatency = Math.ceil(maxLatency || minLatency * 3);
if (maxLatency < minLatency || minLatency < 0)
throw new Error("Bad min/max latency settings.");
function randLat() {
return Math.ceil((maxLatency - minLatency)* Math.random() + minLatency);
}
};
function Delay(object, method, delay) {
var queue
, flush;
this[method] = function() {
if (!queue) {
queue = [arguments];
setTimeout(flush, delay);
}
else
queue.push(arguments);
};
flush = function() {
var q = queue, i, n = q.length;
queue = null;
for (i = 0; i < n; i++)
object[method].apply(object, q[i]);
};
}