danielwippermann/resol-vbus

View on GitHub
src/tcp-connection.js

Summary

Maintainability
D
2 days
Test Coverage
/*! resol-vbus | Copyright (c) 2013-present, Daniel Wippermann | MIT license */

const net = require('net');
const tls = require('tls');


const Connection = require('./connection');
const {
    applyDefaultOptions,
    hasOwnProperty,
    isNumber,
    isObject,
    isPromise,
    isString,
} = require('./utils');



class TcpConnection extends Connection {

    /**
     * Creates a new TcpConnection instance and optionally initializes its
     * members to the given values.
     *
     * @constructs
     * @augments Connection
     * @param {object} options Initialization values
     * @param {string} options.host See {@link TcpConnection#host}
     * @param {number} options.port See {@link TcpConnection#port}
     * @param {string} options.viaTag See {@link TcpConnection#viaTag}
     * @param {string} options.password See {@link TcpConnection#password}
     * @param {boolean} options.rawVBusDataOnly See {@link TcpConnection#rawVBusDataOnly}
     * @param {boolean} options.disableReconnect See {@link TcpConnection#disableReconnect}
     *
     * @classdesc
     * The TcpConnection class is primarily designed to provide access to VBus live data
     * using the VBus-over-TCP specification. That includes the VBus/LAN adapter, the
     * Dataloggers (DL2 and DL3) and VBus.net.
     * In addition to that it can be used to connect to a raw VBus data stream using TCP
     * (for example provided by a serial-to-LAN gateway).
     */
    constructor(options) {
        super(options);

        applyDefaultOptions(this, options, /** @lends TcpConnection.prototype */ {

            /**
            * Host name or IP address of the connection target.
            * @type {string}
            */
            host: null,

            /**
            * Port number of the connection target.
            * @type {number}
            */
            port: (options && options.tlsOptions) ? 57053 : 7053,

            /**
            * Via tag if connection target is accessed using the VBus.net service.
            * @type {string}
            */
            viaTag: null,

            /**
            * Password needed to connect to target.
            * @type {string}
            */
            password: null,

            channelListCallback: null,

            /**
            * Channel number to connect to.
            * @type {string|number}
            */
            channel: 0,

            /**
            * Indicates that connection does not need to perform login handshake.
            * Useful for serial-to-LAN converters.
            * @type {boolean}
            */
            rawVBusDataOnly: false,

            tlsOptions: null,

            /**
             * Disable automatic reconnect on connection interruption.
             * @type {boolean}
             */
            disableReconnect: false,

        });
    }

    async connect(force) {
        if (this.connectionState !== TcpConnection.STATE_DISCONNECTED) {
            throw new Error('Connection is not disconnected (' + this.connectionState + ')');
        }

        this._setConnectionState(TcpConnection.STATE_CONNECTING);

        return this._connect(force);
    }

    disconnect() {
        if (this.connectionState === TcpConnection.STATE_DISCONNECTING) {
            if (this.socket) {
                this.socket.destroy();

                this.socket = null;
            }

            this._setConnectionState(TcpConnection.STATE_DISCONNECTED);
        } else if (this.connectionState !== TcpConnection.STATE_DISCONNECTED) {
            this._setConnectionState(TcpConnection.STATE_DISCONNECTING);

            if (this.socket) {
                this.socket.end();
            } else {
                this._setConnectionState(TcpConnection.STATE_DISCONNECTED);
            }
        }
    }

    _connect(force) {
        const _this = this;

        let socket;

        return new Promise((resolve, reject) => {
            const done = function(err, result) {
                if (err) {
                    reject(err);
                } else {
                    resolve(result);
                }
            };

            const options = {
                host: this.host,
                port: this.port
            };

            let phase = this.rawVBusDataOnly ? 1000 : 0;
            let rxBuffer = null;

            const write = function() {
                return socket.write.apply(socket, arguments);
            };

            const onConnectionEstablished = function() {
                _this.reconnectTimeout = 0;

                _this._setConnectionState(TcpConnection.STATE_CONNECTED);

                done();
            };

            const onConnect = function() {
                if (phase === 1000) {
                    onConnectionEstablished();
                }
            };

            const changePhase = function (newPhase) {
                if (newPhase >= 0) {
                    phase = newPhase;

                    if (phase === 20) {
                        // CONNECT
                        write('CONNECT ' + _this.viaTag + '\r\n');
                    } else if (phase === 40) {
                        // PASS
                        write('PASS ' + _this.password + '\r\n');
                    } else if (phase === 60) {
                        // CHANNELLIST
                        write('CHANNELLIST\r\n');
                    } else if (phase === 80) {
                        // CHANNEL
                        write('CHANNEL ' + _this.channel + '\r\n');
                    } else if (phase === 800) {
                        // QUIT
                        write('QUIT\r\n');
                    } else if (phase === 900) {
                        // DATA
                        write('DATA\r\n');
                    } else if (phase === 1000) {
                        onConnectionEstablished();
                    }
                }
            };

            const channelList = [];

            const onLine = function(line) {
                let newPhase = -1;
                if (line [0] === '+') {
                    if (phase === 0) {
                        if (_this.viaTag) {
                            // CONNECT ...
                            newPhase = 20;
                        } else {
                            // PASS ...
                            newPhase = 40;
                        }
                    } else if (phase === 20) {
                        newPhase = 40;
                    } else if (phase === 40) {
                        if (_this.channelListCallback) {
                            newPhase = 60;
                        } else if (_this.channel) {
                            newPhase = 80;
                        } else {
                            newPhase = 900;
                        }
                    } else if (phase === 60) {
                        newPhase = 70;

                        const channelListCallbackDone = (err, channel) => {
                            if (err) {
                                _this.socket.destroy();
                                _this.socket = null;

                                _this._setConnectionState(TcpConnection.STATE_DISCONNECTED);

                                done(err);
                            } else {
                                if (channel !== undefined) {
                                    if (isNumber(channel)) {
                                        _this.channel = channel;
                                    } else if (isString(channel)) {
                                        _this.channel = parseInt(channel);
                                    } else if (isObject(channel) && hasOwnProperty(channel, 'channel')) {
                                        _this.channel = channel.channel;
                                    } else {
                                        done(new Error('Invalid channel selection ' + JSON.stringify(channel)));
                                    }
                                }

                                let newPhase;
                                if (_this.channel) {
                                    newPhase = 80;
                                } else {
                                    newPhase = 900;
                                }

                                process.nextTick(() => {
                                    changePhase(newPhase);
                                });
                            }
                        };

                        const channelListCallbackResult = _this.channelListCallback(channelList, channelListCallbackDone);
                        if (isPromise(channelListCallbackResult)) {
                            channelListCallbackResult.then(result => {
                                channelListCallbackDone(null, result);
                            }, err => {
                                channelListCallbackDone(err);
                            });
                        }
                    } else if (phase === 80) {
                        newPhase = 900;
                    } else if (phase === 900) {
                        newPhase = 1000;
                    }
                } else if (line [0] === '-') {
                    newPhase = 800;

                    const error = new Error('Remote side responded with ' + JSON.stringify(line));

                    switch (phase) {
                    case 20:
                        error.vbusPhase = 'CONNECT';
                        break;
                    case 40:
                        error.vbusPhase = 'PASS';
                        break;
                    case 60:
                        error.vbusPhase = 'CHANNELLIST';
                        break;
                    case 80:
                        error.vbusPhase = 'CHANNEL';
                        break;
                    case 900:
                        error.vbusPhase = 'DATA';
                        break;
                    }

                    done(error);
                } else if (line [0] === '*') {
                    if (phase === 60) {
                        const md = /^\*([\d]+):(.*)$/.exec(line);
                        if (md) {
                            channelList.push({
                                channel: md [1],
                                name: md [2],
                            });
                        }
                    }
                } else {
                    // nop
                }

                changePhase(newPhase);
            };

            const onSocketData = function(chunk) {
                // console.log('onData');

                if (phase < 1000) {
                    // console.log(chunk.toString('utf8'));

                    let buffer;
                    if (rxBuffer) {
                        buffer = Buffer.concat([ rxBuffer, chunk ]);
                    } else {
                        buffer = chunk;
                    }

                    let start = 0, index = 0;
                    /* eslint-disable-next-line no-unmodified-loop-condition */
                    while ((index < buffer.length) && (phase < 1000)) {
                        if ((buffer [index] === 13) || (buffer [index] === 10)) {
                            if (start < index) {
                                const line = buffer.toString('utf8', start, index);
                                onLine(line);
                            }

                            start = index + 1;
                        }

                        index++;
                    }

                    if (start < buffer.length) {
                        if (phase >= 1000) {
                            _this._write(buffer.slice(start));

                            rxBuffer = null;
                        } else {
                            rxBuffer = buffer.slice(start);
                        }
                    } else {
                        rxBuffer = null;
                    }
                } else {
                    _this._write(chunk);
                }
            };

            const onConnectionData = function(chunk) {
                write(chunk);
            };

            const onSocketTermination = function() {
                _this.removeListener('data', onConnectionData);

                if (_this.socket !== socket) {
                    // nop
                } else if (!force && (_this.connectionState === TcpConnection.STATE_CONNECTING)) {
                    // failed to connect
                    _this._setConnectionState(TcpConnection.STATE_DISCONNECTED);

                    _this.socket = null;

                    done(new Error('Unable to connect'));
                } else if (_this.connectionState === TcpConnection.STATE_DISCONNECTING) {
                    _this._setConnectionState(TcpConnection.STATE_DISCONNECTED);

                    _this.socket = null;
                } else {
                    _this._setConnectionState(TcpConnection.STATE_INTERRUPTED);

                    _this.socket = null;

                    if (!_this.disableReconnect) {
                        const timeout = _this.reconnectTimeout;
                        if (_this.reconnectTimeout < _this.reconnectTimeoutMax) {
                            _this.reconnectTimeout += _this.reconnectTimeoutIncr;
                        }

                        setTimeout(() => {
                            _this._setConnectionState(TcpConnection.STATE_RECONNECTING);

                            _this._connect().then(null, err => {
                                _this.emit('error', err);
                            });
                        }, timeout);
                    }
                }
            };

            const onEnd = function() {
                onSocketTermination();
            };

            const onError = function(/* err */) {
                socket.destroy();
                onSocketTermination();
            };

            const onTimeout = function() {
                socket.destroy();
                onSocketTermination();
            };

            this.on('data', onConnectionData);

            if (this.tlsOptions) {
                Object.assign(options, this.tlsOptions);
                socket = tls.connect(options, onConnect);
            } else {
                socket = net.connect(options, onConnect);
            }
            socket.on('data', onSocketData);
            socket.on('end', onEnd);
            socket.on('error', onError);
            socket.setTimeout(30000, onTimeout);
            socket.setKeepAlive(true, 60000);

            this.socket = socket;
        });
    }

}


Object.assign(TcpConnection.prototype, /** @lends TcpConnection.prototype */ {

    /**
     * Host name or IP address of the connection target.
     * @type {string}
     */
    host: null,

    /**
     * Port number of the connection target.
     * @type {number}
     */
    port: null,

    /**
     * Via tag if connection target is accessed using the VBus.net service.
     * @type {string}
     */
    viaTag: null,

    /**
     * Password needed to connect to target.
     * @type {string}
     */
    password: null,

    channelListCallback: null,

    /**
     * Channel number to connect to.
     * @type {string|number}
     */
    channel: 0,

    /**
     * Indicates that connection does not need to perform login handshake.
     * Useful for serial-to-LAN converters.
     * @type {boolean}
     */
    rawVBusDataOnly: false,

    tlsOptions: null,

    /**
     * Timeout in milliseconds to way between reconnection retries.
     * @type {number}
     */
    reconnectTimeout: 0,

    /**
     * Value to increment timeout after every unsuccessful reconnection retry.
     * @type {number}
     */
    reconnectTimeoutIncr: 10000,

    /**
     * Maximum timeout value between unsuccessful reconnection retry.
     * @type {number}
     */
    reconnectTimeoutMax: 60000,

    /**
     * Disable automatic reconnect on connection interruption.
     * @type {boolean}
     */
    disableReconnect: false,

});



module.exports = TcpConnection;