danielwippermann/resol-vbus

View on GitHub
src/tcp-connection-endpoint.js

Summary

Maintainability
F
3 days
Test Coverage
/*! resol-vbus | Copyright (c) 2013-present, Daniel Wippermann | MIT license */

const { EventEmitter } = require('events');
const net = require('net');


const { applyDefaultOptions } = require('./utils');



function runAsync(fn) {
    async function runner() {
        return await fn();
    }
    return runner();
}

class TcpConnectionEndpoint extends EventEmitter {

    /**
     * Creates a new instance and optionally initializes its members.
     *
     * @constructs
     * @augments EventEmitter
     * @param {object} options The initialization values for this instance.
     * @param {number} options.port See {@link TcpConnectionEndpoint#port}
     * @param {string} options.password See {@link TcpConnectionEndpoint#password}
     * @param {string[]} options.channels See {@link TcpConnectionEndpoint#channels}
     *
     * @classdesc
     * The TcpConnectionEndpoint can act as the remote side for a TcpConnection.
     * It supports all the commands that a DL3 connected via VBus.net would
     * provide as well.
     *
     * A `connection` event is emitted whenever an incoming connection passes
     * the VBus-over-TCP handshake.
     */
    constructor(options) {
        super();

        applyDefaultOptions(this, options, /** @lends TcpConnectionEndpoint.prototype */ {

            /**
            * The port number to listen on for incoming connections.
            * @type {number}
            */
            port: 7053,

            /**
             * An optional async function that verifies the viaTag provided to the `CONNECT` command.
             * @type {function}
             */
            verifyViaTag: null,

            /**
             * The password to check against if the PASS command is received.
             * @type {string}
             */
            password: null,

            /**
             * An optional async function that verifies the password provided to the `PASS` command.
             * @type {function}
             */
            verifyPassword: null,

            /**
            * The list of channels to return if the CHANNELLIST command is received.
            * @type {string[]}
            */
            channels: null,

            /**
             * An optional async function that verifies the channel provided to the `CHANNEL` command.
             * @type {function}
             */
            verifyChannel: null,

            /**
             * An optional async function that verifies whether the `DATA` command succeeds.
             * @type {function}
             */
            verifyDataMode: null,

        });

        if (this.channels == null) {
            this.channels = [ 'VBus' ];
        }
    }

    /**
     * Starts the server to listen for incoming connections.
     *
     * @return {Promise} A promise that resolves when the server is started.
     */
    start() {
        const _this = this;

        return new Promise((resolve, reject) => {
            const done = function(err, result) {
                if (err) {
                    reject(err);
                } else {
                    resolve(result);
                }
            };

            const server = net.createServer((socket) => {
                _this._onConnection(socket);
            });

            server.listen(this.port, () => {
                if (_this.port === 0) {
                    _this.port = server.address().port;
                }
                done(null, true);
            });

            server.on('error', (err) => {
                done(err);
            });

            this.server = server;
        });
    }

    stop() {
        if (this.server) {
            this.server.close();

            this.server = null;
        }
    }

    _onConnection(socket) {
        const _this = this;

        const connectionInfo = {
            socket,
        };

        let phase = 0;
        let rxBuffer = null;

        const write = function() {
            return socket.write.apply(socket, arguments);
        };

        const onData = function(chunk) {
            if (phase < 1000) {
                let buffer;
                if (rxBuffer) {
                    buffer = Buffer.concat([ rxBuffer, chunk ]);
                } else {
                    buffer = chunk;
                }

                let start = 0, index = 0;

                let processNextLine = undefined;

                const callback = function(err, result, transition) {
                    if (err) {
                        _this.emit('connectionAttemptFailed', {
                            ip: socket.remoteAddress,
                            family: socket.remoteFamily,
                            port: socket.remotePort,
                            error: err,
                        });

                        write('-ERROR: ' + JSON.stringify(err.toString()) + '\r\n');
                    } else {
                        write(result.toString() + '\r\n');

                        if (transition === false) {
                            socket.end();
                            connectionInfo.socket = null;
                        } else if (transition === true) {
                            phase = 1000;

                            _this.emit('connection', connectionInfo);
                        }

                        processNextLine();
                    }
                };

                const processLine = function(line) {
                    let md;
                    if ((md = /^CONNECT (.*)$/.exec(line))) {
                        const viaTagString = md [1];
                        if (_this.verifyViaTag) {
                            runAsync(() => _this.verifyViaTag(viaTagString, connectionInfo)).then(() => {
                                connectionInfo.viaTag = viaTagString;
                                callback(null, '+OK');
                            }, err => callback(err));
                        } else {
                            connectionInfo.viaTag = viaTagString;
                            callback(null, '+OK');
                        }
                    } else if ((md = /^PASS (.*)$/.exec(line))) {
                        const passwordString = md [1];
                        if (_this.verifyPassword) {
                            runAsync(() => _this.verifyPassword(passwordString, connectionInfo)).then(() => {
                                connectionInfo.password = passwordString;
                                callback(null, '+OK');
                            }, err => callback(err));
                        } else if (!_this.password || (passwordString === _this.password)) {
                            connectionInfo.password = passwordString;
                            callback(null, '+OK');
                        } else {
                            callback(new Error('Password mismatch'));
                        }
                    } else if ((md = /^CHANNELLIST$/.exec(line))) {
                        const response = _this.channels.reduce((memo, channel, index) => {
                            if (channel) {
                                memo.push('*' + index + ':' + channel);
                            }
                            return memo;
                        }, []).join('\r\n');

                        callback(null, response + '\r\n+OK');
                    } else if ((md = /^CHANNEL (\d+)$/.exec(line))) {
                        const channelString = md [1];
                        const index = +channelString;
                        const channel = _this.channels [index];
                        if (_this.verifyChannel) {
                            runAsync(() => _this.verifyChannel(channelString, index, channel, connectionInfo)).then(() => {
                                connectionInfo.channel = channelString;
                                callback(null, '+OK');
                            }, err => callback(err));
                        } else if (channel) {
                            connectionInfo.channel = channelString;
                            callback(null, '+OK');
                        } else {
                            callback(new Error('Channel not available'));
                        }
                    } else if ((md = /^QUIT$/.exec(line))) {
                        callback(null, '+OK', false);
                    } else if ((md = /^DATA$/.exec(line))) {
                        if (_this.verifyDataMode) {
                            runAsync(() => _this.verifyDataMode(connectionInfo)).then(() => {
                                callback(null, '+OK', true);
                            }, err => callback(err));
                        } else {
                            callback(null, '+OK', true);
                        }
                    } else {
                        callback(new Error('Unknown command'));
                    }
                };

                processNextLine = function() {
                    if (phase < 1000) {
                        while (index < buffer.length) {
                            if ((buffer [index] === 13) || (buffer [index] === 10)) {
                                if (start < index) {
                                    const line = buffer.toString('utf8', start, index);
                                    start = index + 1;
                                    processLine(line);
                                    break;
                                } else {
                                    start = index + 1;
                                }
                            }

                            index++;
                        }
                    }

                    if (start < buffer.length) {
                        if (phase >= 1000) {
                            rxBuffer = null;
                        } else {
                            rxBuffer = buffer.slice(start);
                        }
                    } else {
                        rxBuffer = null;
                    }
                };

                processNextLine();
            }
        };

        const onEnd = function() {

        };

        const onError = function() {

        };

        const onTimeout = function() {

        };

        socket.on('data', onData);
        socket.on('end', onEnd);
        socket.on('error', onError);
        socket.setTimeout(30000, onTimeout);
        socket.setKeepAlive(true, 60000);

        write('+HELLO: This is TcpConnectionEndpoint, at your service!\r\n');
    }

}


Object.assign(TcpConnectionEndpoint.prototype, /** @lends TcpConnectionEndpoint.prototype */ {

    /**
     * The port number to listen on for incoming connections.
     * @type {number}
     */
    port: 7053,

    /**
     * The list of channels to return if the CHANNELLIST command is received.
     * @type {string[]}
     */
    channels: null,

    /**
     * The Server instance used for listening for incoming connections.
     * @type {net.Server}
     */
    server: null,

});



module.exports = TcpConnectionEndpoint;