src/client/protocol/base.js
const { formatRequest, formatError } = require("../../util/format");
const { ERR_CODES, ERR_MSGS } = require("../../util/constants");
const MessageBuffer = require("../../util/buffer");
const logging = require("../../util/logger");
/**
* Creates an instance of the base client protocol class.
* This is the class that all other client protocols inherit from.
*/
class JsonRpcClientProtocol {
/**
* JsonRpcClientProtocol contructor
* @param {class} factory Instance of [JsonRpcClientFactory]{@link JsonRpcClientFactory}
* @param {(1|2)} version JSON-RPC version to make requests with
* @param {string} delimiter Delimiter to use for message buffer
* @property {class} factory Instance of [JsonRpcClientFactory]{@link JsonRpcClientFactory}
* @property {class} connector The socket instance for the client
* @property {(1|2)} version JSON-RPC version to use
* @property {string} delimiter Delimiter to use for message buffer
* @property {number} message_id Current message ID
* @property {number} serving_message_id Current message ID. Used for external functions to hook into
* @property {Object} pendingCalls Key value pairs for pending message IDs to promise resolve/reject objects
* @property {Object.<string|number, JSON>} responseQueue Key value pairs for outstanding message IDs to response object
* @property {Object} server Server host and port object {host: "x.x.x.x", port: xxxx}
* @property {class} messageBuffer Instance of [MessageBuffer]{@link MessageBuffer}
*
*/
constructor(factory, version, delimiter) {
if (!(this instanceof JsonRpcClientProtocol)) {
return new JsonRpcClientProtocol(factory, version, delimiter);
}
this.factory = factory;
this.connector = undefined;
this.listener = undefined;
this.delimiter = delimiter;
this.version = version;
this.message_id = 1;
this.serving_message_id = 1;
this.pendingCalls = {};
this.responseQueue = {};
this.server = this.factory.server;
this._connectionTimeout = undefined;
this.messageBuffer = new MessageBuffer(this.delimiter);
}
/**
* Set the `connector` attribute for the protocol instance.
* The connector is essentially the socket or connection instance for the client.
*
* @abstract
*
*/
setConnector() {
throw Error("function must be overwritten in subclass");
}
/**
* Make the connection to the server.
*
* Calls [setConnector]{@link JsonRpcClientProtocol#setConnector} to establish the client connection.
*
* Calls [listen]{@link JsonRpcClientProtocol#listen} if connection was successful, and will resolve the promise.
*
* Will retry connection on the `connectionTimeout` interval.
* Number of connection retries is based on `remainingRetries`.
*
* If `null` is set for number of retries, then connections will attempt indefinitely.
*
* Will reject the promise if connect or re-connect attempts fail and there are no remaining retries.
*
* @returns Promise
*
*/
connect() {
return new Promise((resolve, reject) => this._retryConnection(resolve, reject));
}
/**
*
* Manage the connection attempts for the client.
*
* @param {Promise.resolve} resolve `Promise.resolve` passed from [connect]{@link JsonRpcClientProtocol#connect}
* @param {Promise.reject} reject `Promise.reject` passed from [connect]{@link JsonRpcClientProtocol#connect}
*
* @returns Promise
*
* @private
*/
_retryConnection(resolve, reject) {
this.setConnector();
this.connector.connect(this.server);
this.connector.setEncoding("utf8");
this.connector.on("connect", () => {
this.factory.remainingRetries = this.factory.options.retries;
this.listener = this.connector;
this.listen();
resolve(this.server);
});
this.connector.on("error", error => this._onConnectionFailed(error, resolve, reject));
this.connector.on("close", (hadError) => {
if (!hadError) {
this.factory.emit("serverDisconnected");
}
});
}
/**
*
* Handle connection attempt errors. Log failures and
* retry if required.
*
*
* @param {Error} error `node.net` system error (https://nodejs.org/api/errors.html#errors_common_system_errors)
* @param {Promise.resolve} resolve `Promise.resolve` passed from [connect]{@link JsonRpcClientProtocol#connect}
* @param {Promise.reject} reject `Promise.reject` passed from [connect]{@link JsonRpcClientProtocol#connect}
*
* @returns Promise
*
* @private
*/
_onConnectionFailed(error, resolve, reject) {
if (this.factory.remainingRetries > 0) {
this.factory.remainingRetries -= 1;
logging
.getLogger()
.error(
`Failed to connect. Address [${this.server.host}:${this.server.port}]. Retrying. ${this.factory.remainingRetries} attempts left.`
);
} else if (this.factory.remainingRetries === 0) {
this.factory.pcolInstance = undefined;
return reject(error);
} else {
logging
.getLogger()
.error(
`Failed to connect. Address [${this.server.host}:${this.server.port}]. Retrying.`
);
}
this._connectionTimeout = setTimeout(() => {
this._retryConnection(resolve, reject);
}, this.factory.connectionTimeout);
}
/**
* Ends connection to the server.
*
* Sets `JsonRpcClientFactory.pcolInstance` to `undefined`
*
* Clears the connection timeout
*
* @param {function} cb Called when connection is sucessfully closed
*/
end(cb) {
clearTimeout(this._connectionTimeout);
this.factory.pcolInstance = undefined;
this.connector.end(cb);
}
/**
* Setup `this.listner.on("data")` event to listen for data coming into the client.
*
* Pushes received data into `messageBuffer` and calls
* [_waitForData]{@link JsonRpcClientProtocol#_waitForData}
*/
listen() {
this.listener.on("data", (data) => {
this.messageBuffer.push(data);
this._waitForData();
});
}
/**
* Accumulate data while [MessageBuffer.isFinished]{@link MessageBuffer#isFinished} is returning false.
*
* If the buffer returns a message it will be passed to [verifyData]{@link JsonRpcClientProtocol#verifyData}
*
* @private
*
*/
_waitForData() {
while (!this.messageBuffer.isFinished()) {
const message = this.messageBuffer.handleData();
try {
this.verifyData(message);
} catch (e) {
this.gotError(e);
}
}
}
/**
* Verify the incoming data returned from the `messageBuffer`
*
* Throw an error if its not a valid JSON-RPC object.
*
* Call [gotNotification]{@link JsonRpcClientProtocol#gotNotification} if the message a notification.
*
* Call [gotBatch]{@link JsonRpcClientProtocol#gotBatch} if the message is a batch request.
*
* @param {string} chunk Data to verify
*/
verifyData(chunk) {
try {
// will throw an error if not valid json
const message = JSON.parse(chunk);
if (Array.isArray(message)) {
this.gotBatch(message);
} else if (!(message === Object(message))) {
// error out if it cant be parsed
throw SyntaxError();
} else if (!("id" in message)) {
// no id, so assume notification
this.gotNotification(message);
} else if (message.error) {
// got an error back so reject the message
const { id } = message;
const { code, data } = message.error;
const errorMessage = message.error.message;
this._raiseError(errorMessage, code, id, data);
} else if ("result" in message) {
// Got a result, so must be a response
this.gotResponse(message);
} else {
const code = ERR_CODES.unknown;
const errorMessage = ERR_MSGS.unknown;
this._raiseError(errorMessage, code, null);
}
} catch (e) {
if (e instanceof SyntaxError) {
const code = ERR_CODES.parseError;
const errorMessage = `Unable to parse message: '${chunk}'`;
this._raiseError(errorMessage, code, null);
} else {
throw e;
}
}
}
/**
* Called when the received `message` is a notification.
* Emits an event using `message.method` as the name.
* The data passed to the event is the `message`.
*
* @param {JSON} message A valid JSON-RPC message object
*/
gotNotification(message) {
this.factory.emit(message.method, message);
}
/**
* Called when the received message is a batch.
*
* Calls [gotNotification]{@link JsonRpcClientProtocol#gotNotification} for every
* notification in the batch.
*
* Calls [gotBatchResponse]{@link JsonRpcClientProtocol#gotBatchResponse} otherwise.
*
* @param {JSON[]} message A valid JSON-RPC batch message
*/
gotBatch(message) {
// possible batch request
message.forEach((res) => {
if (res && res.method && !res.id) {
this.gotNotification(res);
}
});
this.gotBatchResponse(message);
}
/**
* Called when the received message is a response object from the server.
*
* Gets the response using [getResponse]{@link JsonRpcClientProtocol#getResponse}.
*
* Resolves the message and removes it from the `responseQueue`. Cleans up any
* outstanding timers.
*
* @param {JSON} message A valid JSON-RPC message object
*/
gotResponse(message) {
this.serving_message_id = message.id;
this.responseQueue[message.id] = message;
try {
const response = this.getResponse(message.id);
this.pendingCalls[message.id].resolve(response);
delete this.responseQueue[message.id];
this.factory.cleanUp(message.id);
} catch (e) {
if (e instanceof TypeError) {
// response id likely not in the queue
logging
.getLogger()
.error(
`Message has no outstanding calls: ${JSON.stringify(message)}`
);
}
}
}
/**
* Get the outstanding request object for the given ID.
*
* @param {string|number} id ID of outstanding request
*/
getResponse(id) {
return this.responseQueue[id];
}
/**
* Send a message to the server
*
* @param {string} request Stringified JSON-RPC message object
* @param {function=} cb Callback function to be called when message has been sent
*/
write(request, cb) {
this.connector.write(request, cb);
}
/**
* Generate a stringified JSON-RPC message object.
*
* @param {string} method Name of the method to use in the request
* @param {Array|JSON} params Params to send
* @param {boolean=} id If true it will use instances `message_id` for the request id, if false will generate a notification request
* @example
* client.message("hello", ["world"]) // returns {"jsonrpc": "2.0", "method": "hello", "params": ["world"], "id": 1}
* client.message("hello", ["world"], false) // returns {"jsonrpc": "2.0", "method": "hello", "params": ["world"]}
*/
message(method, params, id = true) {
const request = formatRequest({
method,
params,
id: id ? this.message_id : undefined,
version: this.version,
delimiter: this.delimiter
});
if (id) {
this.message_id += 1;
}
return request;
}
/**
* Send a notification to the server.
*
* Promise will resolve if the request was sucessfully sent, and reject if
* there was an error sending the request.
*
* @param {string} method Name of the method to use in the notification
* @param {Array|JSON} params Params to send
* @return Promise
* @example
* client.notify("hello", ["world"])
*/
notify(method, params) {
return new Promise((resolve, reject) => {
const request = this.message(method, params, false);
try {
this.write(request, () => {
resolve(request);
});
} catch (e) {
// this.connector is probably undefined
reject(e);
}
});
}
/**
* Send a request to the server
*
* Promise will resolve when a response has been received for the request.
*
* Promise will reject if the server responds with an error object, or if
* the response is not received within the set `requestTimeout`
*
* @param {string} method Name of the method to use in the request
* @param {Array|JSON} params Params to send
* @returns Promise
* @example
* client.send("hello", {"foo": "bar"})
*/
send(method, params) {
return new Promise((resolve, reject) => {
const request = this.message(method, params);
this.pendingCalls[JSON.parse(request).id] = { resolve, reject };
try {
this.write(request);
} catch (e) {
// this.connector is probably undefined
reject(e);
}
this._timeoutPendingCalls(JSON.parse(request).id);
});
}
/**
* Method used to call [message]{@link JsonRpcClientProtocol#message}, [notify]{@link JsonRpcClientProtocol#notify} and [send]{@link JsonRpcClientProtocol#send}
*
* @returns Object
* @example
* client.request().send("hello", ["world"])
* client.request().notify("foo")
* client.request().message("foo", ["bar"])
*/
request() {
const self = this;
return {
message: this.message.bind(self),
send: this.send.bind(self),
notify: this.notify.bind(self)
};
}
/**
* Used to send a batch request to the server.
*
* Recommend using [message]{@link JsonRpcClientProtocol#message} to construct the message objects.
*
* Will use the IDs for the requests in the batch in an array as the keys for `pendingCalls`.
*
* How a client should associate batch responses to their requests is not in the spec, so this is the solution.
*
* @param {Array} requests An array of valid JSON-RPC message objects
* @returns Promise
* @example
* client.batch([client.message("foo", ["bar"]), client.message("hello", [], false)])
*/
batch(requests) {
return new Promise((resolve, reject) => {
const batchIds = [];
const batchRequests = [];
for (const request of requests) {
const json = JSON.parse(request);
batchRequests.push(json);
if (json.id) {
batchIds.push(json.id);
}
}
this.pendingCalls[String(batchIds)] = { resolve, reject };
const request = JSON.stringify(batchRequests);
try {
this.write(request + this.delimiter);
} catch (e) {
// this.connector is probably undefined
reject(e);
}
this._timeoutPendingCalls(String(batchIds));
});
}
/**
* Associate the ids in the batch message to their corresponding `pendingCalls`.
*
* Will call [_resolveOrRejectBatch]{@link JsonRpcClientProtocol#_resolveOrRejectBatch} when object is determined.
*
* @param {Array} batch Array of valid JSON-RPC message objects
*/
gotBatchResponse(batch) {
const batchResponseIds = [];
batch.forEach((message) => {
if ("id" in message) {
batchResponseIds.push(message.id);
}
});
if (batchResponseIds.length === 0) {
// dont do anything here since its basically an invalid response
return;
}
// find the resolve and reject objects that match the batch request ids
for (const ids of Object.keys(this.pendingCalls)) {
const arrays = [JSON.parse(`[${ids}]`), batchResponseIds];
const difference = arrays.reduce((a, b) => a.filter(c => !b.includes(c)));
if (difference.length === 0) {
this.factory.cleanUp(ids);
this._resolveOrRejectBatch(batch, batchResponseIds);
}
}
}
/**
* Returns the batch response.
*
* Overwrite if class needs to reformat response in anyway (i.e. in [HttpClientProtocol]{@link HttpClientProtocol})
*
* @param {Array} batch Array of valid JSON-RPC message objects
*/
getBatchResponse(batch) {
return batch;
}
/**
* Will reject the request associated with the given ID with a JSON-RPC formated error object.
*
* Removes the id from `pendingCalls` and deletes outstanding timeouts.
*
* @param {string|number} id ID of the request to timeout
* @private
*/
_timeoutPendingCalls(id) {
this.factory.timeouts[id] = setTimeout(() => {
this.factory.cleanUp(id);
try {
const error = JSON.parse(
formatError({
jsonrpc: this.version,
delimiter: this.delimiter,
id: typeof id === "string" ? null : id,
code: ERR_CODES.timeout,
message: ERR_MSGS.timeout
})
);
this.pendingCalls[id].reject(error);
delete this.pendingCalls[id];
} catch (e) {
if (e instanceof TypeError) {
logging
.getLogger()
.error(`Message has no outstanding calls. ID [${id}]`);
}
}
}, this.factory.requestTimeout);
}
/**
* Resolve or reject the given batch request based on the given batch IDs.
*
* @param {Array} batch Valid JSON-RPC batch request
* @param {string} batchIds Stringified list of batch IDs associated with the given batch
* @private
*/
_resolveOrRejectBatch(batch, batchIds) {
const batchResponse = this.getBatchResponse(batch);
try {
const invalidBatches = [];
batch.forEach((message) => {
if (message.error) {
// reject the whole message if there are any errors
this.pendingCalls[batchIds].reject(batchResponse);
invalidBatches.push(batchIds);
}
});
if (invalidBatches.length !== 0) {
invalidBatches.forEach((id) => {
delete this.pendingCalls[id];
});
} else {
this.pendingCalls[batchIds].resolve(batchResponse);
delete this.pendingCalls[batchIds];
}
} catch (e) {
if (e instanceof TypeError) {
// no outstanding calls
logging
.getLogger()
.log(
`Batch response has no outstanding calls. Response IDs [${batchIds}]`
);
}
}
}
/**
* Throws an Error whos message is a JSON-RPC error object
*
* @param {string} message Error message
* @param {number} code Error code
* @param {string|number=} id ID for error message object
* @param {*=} data Optional data to include about the error
* @throws Error
* @private
*/
_raiseError(message, code, id, data) {
const error = formatError({
jsonrpc: this.version,
delimiter: this.delimiter,
id,
code,
message,
data
});
throw new Error(error);
}
/**
* Calls [rejectPendingCalls]{@link JsonRpcClientProtocol#rejectPendingCalls} with error object.
*
* If the object cannot be parsed, then an unkown error code is sent with a `null` id.
*
* @param {string} error Stringified JSON-RPC error object
*/
gotError(error) {
let err;
try {
err = JSON.parse(error.message);
} catch (e) {
err = JSON.parse(
formatError({
jsonrpc: this.version,
delimiter: this.delimiter,
id: null,
code: ERR_CODES.unknown,
message: JSON.stringify(error, Object.getOwnPropertyNames(error))
})
);
}
this.rejectPendingCalls(err);
}
/**
* Reject the pending call for the given ID is in the error object.
*
* If the error object has a null id, then log the message to the logging
.getLogger().
*
* @param {string} error Stringified JSON-RPC error object
*
*/
rejectPendingCalls(error) {
try {
this.pendingCalls[error.id].reject(error);
this.factory.cleanUp(error.id);
} catch (e) {
if (e instanceof TypeError) {
// error object id probably not a pending response
logging
.getLogger()
.error(`Message has no outstanding calls: ${JSON.stringify(error)}`);
}
}
}
}
module.exports = JsonRpcClientProtocol;