src/client/service/MQTTClient.ts
import { DataFrame, DataSerializer, Node, PullOptions, PushOptions, RemoteService, Service } from '@openhps/core';
import { Client, connect } from 'mqtt';
import { MQTTSinkNode, MQTTSourceNode } from '../nodes';
import { MQTTClientOptions } from './MQTTClientOptions';
export class MQTTClient extends RemoteService {
protected client: Client;
protected options: MQTTClientOptions;
constructor(options?: MQTTClientOptions) {
super();
this.options = {
qos: 0,
clientId: `CLIENT_${process.pid}_${Math.random().toString(16).substring(2, 8)}`,
...options,
};
this.once('build', this.connect.bind(this));
this.once('destroy', this.disconnect.bind(this));
}
protected connect(): Promise<void> {
return new Promise((resolve) => {
this.client = connect(this.options.url, this.options);
this.client.on('error', (error) => {
this.model.logger('error', { message: `Connection error: ${error.message}`, error });
this.client?.end();
});
this.client.on('reconnect', () => {
this.model.logger('warn', { message: `Reconnecting to MQTT server ...` });
});
this.client.on('message', this._onMessage.bind(this));
this.client.on('connect', () => {
resolve();
});
});
}
protected disconnect(): Promise<void> {
return new Promise((resolve) => {
this.client.end();
resolve();
});
}
/**
* Send a push to a specific remote node
*
* @param {string} uid Remote Node UID
* @param {DataFrame} frame Data frame to push
* @param {PushOptions} [options] Push options
* @returns {Promise<void>} Promise of completed push
*/
remotePush<T extends DataFrame | DataFrame[]>(uid: string, frame: T, options?: PushOptions): Promise<void> {
return new Promise((resolve, reject) => {
if (!this.client.connected || this.client.disconnecting) {
return resolve(undefined);
}
const messageId = this.registerPromise(resolve, reject);
this.client.publish(
`${this.options.prefix}node/${uid}/push`,
JSON.stringify({
clientId: this.client.options.clientId,
messageId,
frame: DataSerializer.serialize(frame),
options,
}),
{
qos: this.options.qos,
retain: true,
},
);
});
}
/**
* Send a pull request to a specific remote node
*
* @param {string} uid Remote Node UID
* @param {PullOptions} [options] Pull options
* @returns {Promise<void>} Promise of completed pull
*/
remotePull(uid: string, options?: PullOptions): Promise<void> {
return new Promise((resolve, reject) => {
if (!this.client.connected || this.client.disconnecting) {
return resolve(undefined);
}
const messageId = this.registerPromise(resolve, reject);
this.client.publish(
`${this.options.prefix}node/${uid}/pull`,
JSON.stringify({
clientId: this.client.options.clientId,
messageId,
options,
}),
{
qos: this.options.qos,
retain: true,
},
);
});
}
/**
* Send an error to a remote node
*
* @param {string} uid Remote Node UID
* @param {string} event Event name
* @param {any[]} [args] Args
* @returns {Promise<void>} Promise of emitted event
*/
remoteEvent(uid: string, event: string, ...args: any[]): Promise<void> {
return new Promise((resolve, reject) => {
if (!this.client.connected || this.client.disconnecting) {
return resolve(undefined);
}
const messageId = this.registerPromise(resolve, reject);
this.client.publish(
`${this.options.prefix}node/${uid}/events/${event}`,
JSON.stringify({
clientId: this.client.options.clientId,
messageId,
args,
}),
{
qos: this.options.qos,
retain: true,
},
);
});
}
/**
* Send a remote service call
*
* @param {string} uid Service uid
* @param {string} method Method to call
* @param {any[]} [args] Optional set of arguments
* @returns {Promise<any>} Service call output promise
*/
remoteServiceCall(uid: string, method: string, ...args: any[]): Promise<any> {
return new Promise((resolve, reject) => {
if (!this.client.connected || this.client.disconnecting) {
return resolve(undefined);
}
const messageId = this.registerPromise(resolve, reject);
this.client.publish(
`${this.options.prefix}service/${uid}/${method}`,
JSON.stringify({
clientId: this.client.options.clientId,
messageId,
args,
}),
{
qos: this.options.qos,
retain: true,
},
);
});
}
private _onMessage(topic: string, payload: Buffer): void {
const topicParts = topic.replace(this.options.prefix , '').split('/');
const type = topicParts[0];
const uid = topicParts[1];
const action = topicParts[2];
const response = topicParts[topicParts.length - 1] === 'response';
const data: any = JSON.parse(payload.toString());
// Check if message send by self
if (data.clientId === this.client.options.clientId) {
return;
}
if (response) {
const promise = this.getPromise(data.messageId);
if (!promise) {
return;
} else if (data.status === 'ok') {
promise.resolve(data.result);
} else if (data.status === 'error') {
promise.reject(data.error);
}
return;
}
switch (type) {
case 'node':
switch (action) {
case 'push':
Promise.resolve(this.localPush(uid, data.frame, data.options))
.then(() => {
this.client.publish(
topic + '/response',
JSON.stringify({
clientId: this.client.options.clientId,
messageId: data.messageId,
status: 'ok',
}),
);
})
.catch((ex) => {
this.client.publish(
topic + '/response',
JSON.stringify({
clientId: this.client.options.clientId,
messageId: data.messageId,
status: 'error',
error: ex,
}),
);
});
break;
case 'pull':
Promise.resolve(this.localPull(uid, data.options))
.then(() => {
this.client.publish(
topic + '/response',
JSON.stringify({
clientId: this.client.options.clientId,
messageId: data.messageId,
status: 'ok',
}),
);
})
.catch((ex) => {
this.client.publish(
topic + '/response',
JSON.stringify({
clientId: this.client.options.clientId,
messageId: data.messageId,
status: 'error',
error: ex,
}),
);
});
break;
case 'events':
Promise.resolve(this.localEvent(uid, topicParts[3], data))
.then((result: any) => {
this.client.publish(
topic + '/response',
JSON.stringify({
clientId: this.client.options.clientId,
messageId: data.messageId,
status: 'ok',
result,
}),
);
})
.catch((ex) => {
this.client.publish(
topic + '/response',
JSON.stringify({
clientId: this.client.options.clientId,
messageId: data.messageId,
status: 'error',
error: ex,
}),
);
});
break;
}
break;
case 'service':
Promise.resolve(this.localServiceCall(uid, action, ...data))
.then((result: any) => {
this.client.publish(
topic + '/response',
JSON.stringify({
clientId: this.client.options.clientId,
messageId: data.messageId,
status: 'ok',
result,
}),
);
})
.catch((ex) => {
this.client.publish(
topic + '/response',
JSON.stringify({
clientId: this.client.options.clientId,
messageId: data.messageId,
status: 'error',
error: ex,
}),
);
});
break;
}
}
/**
* Register a remote client node
*
* @param {MQTTSinkNode<any> | MQTTSourceNode<any>} node Node to register
* @returns {boolean} Registration success
*/
public registerNode(node: MQTTSinkNode<any> | MQTTSourceNode<any>): this {
// Subscribe to all endpoints for the node
this.client.subscribe(`${this.options.prefix}node/${node.uid}/push`);
this.client.subscribe(`${this.options.prefix}node/${node.uid}/pull`);
this.client.subscribe(`${this.options.prefix}node/${node.uid}/events/completed`);
this.client.subscribe(`${this.options.prefix}node/${node.uid}/events/error`);
this.client.subscribe(`${this.options.prefix}node/${node.uid}/push/response`);
this.client.subscribe(`${this.options.prefix}node/${node.uid}/pull/response`);
this.client.subscribe(`${this.options.prefix}node/${node.uid}/events/completed/response`);
this.client.subscribe(`${this.options.prefix}node/${node.uid}/events/error/response`);
return super.registerNode(node);
}
/**
* Register a remote client service
*
* @param {Service} service Service to register
* @returns {boolean} Registration success
*/
public registerService(service: Service): this {
// Subscribe to all actions for the service
this.client.subscribe(`${this.options.prefix}service/${service.uid}/*`);
this.client.subscribe(`${this.options.prefix}service/${service.uid}/*/response`);
return super.registerService(service);
}
}