danielwippermann/resol-vbus

View on GitHub
src/connection.js

Summary

Maintainability
F
1 wk
Test Coverage
/*! resol-vbus | Copyright (c) 2013-present, Daniel Wippermann | MIT license */

const { Duplex } = require('stream');


const Header = require('./header');
const Packet = require('./packet');
const Datagram = require('./datagram');
const Telegram = require('./telegram');
const { applyDefaultOptions } = require('./utils');



const states = [
    'DISCONNECTED',
    'CONNECTING',
    'CONNECTED',
    'INTERRUPTED',
    'RECONNECTING',
    'DISCONNECTING',
].reduce((memo, state) => {
    memo ['STATE_' + state] = state;
    return memo;
}, {});



function promiseToCallback(maybePromise, callback) {
    if (maybePromise && (typeof maybePromise.then === 'function')) {
        maybePromise.then((result) => {
            if (result != null) {
                callback(null, result);
            }
        }, (err) => {
            callback(err);
        });
    }
}



class Connection extends Duplex {

    /**
     * Creates a new Connection instance and optionally initializes its member with the given values.
     *
     * @constructs
     * @augments Duplex
     * @param {object} options Initialization values for this instance's members
     * @param {number} options.channel See {@link Connection#channel}
     * @param {number} options.selfAddress See {@link Connection#selfAddress}
     *
     * @classdesc
     * The `Connection` class is the abstract base class for all VBus live data connections.
     * It extends the `Duplex` stream class. Any data written to a `Connection` instance is
     * parsed according to the VBus Protocol Specification. Once a valid instance of one of the
     * `Header` sub-classes (`Packet`, `Datagram` or `Telegram`)
     * is created from the binary data stream, the respective event is emitted on
     * the `Connection` instance.
     *
     * In addition to receiving incoming data the `Connection` class
     * offers several helper methods e.g. to send data to the underlying VBus connection.
     *
     * The `Connection` class itself has no knowledge about the underlying VBus connection.
     * Several sub-classes exist that know how to contact different types of VBus live streams.
     *
     * See `SerialConnection` or `TcpConnection` for concrete implementations.
     *
     * @example
     * var connection = new SerialConnection({ path: '/dev/tty.usbserial' });
     * connection.on('connectionState', function(state) {
     *     console.log(state);
     * });
     * connection.on('packet', function(packet) {
     *     console.log(packet.getId());
     * });
     * connection.on('datagram', function(datagram) {
     *     console.log(datagram.getId());
     * });
     * connection.connect();
     */
    constructor(options) {
        super();

        applyDefaultOptions(this, options, {
            channel: 0,
            selfAddress: 0x0020,
        });
    }

    /**
     * Establish underlying connection and start streaming data to the writable side
     * of this `Connection` instance's stream.
     *
     * @abstract
     * @returns {Promise} A promise that resolves once the connection has been established.
     */
    connect(force) {
        throw new Error('Must be implemented by sub-class');
    }

    /**
     * Diconnect this instance.
     * @abstract
     */
    disconnect() {
        throw new Error('Must be implemented by sub-class');
    }

    _write(chunk, encoding, callback) {
        this.receive(new Date(), chunk);

        if (callback) {
            callback(null);
        }
    }

    receive(timestamp, chunk) {
        const _this = this;

        if (this.listenerCount('rawData') > 0) {
            this.emit('rawData', chunk, timestamp);
        }

        let buffer;
        if (this.rxBuffer) {
            buffer = Buffer.concat([ this.rxBuffer, chunk ]);
        } else {
            buffer = chunk;
        }

        let processed = 0;

        const reportJunk = function(index) {
            if (index > processed) {
                if (_this.listenerCount('junkData') > 0) {
                    const junkData = buffer.slice(processed, index);
                    _this.emit('junkData', junkData, timestamp);
                }
            }
        };

        // console.log('_write (start):', this.rxBuffer, chunk);

        let index = 0, start = null;
        while (index < buffer.length) {
            const b = buffer [index] & 255;
            if (b === 0xAA) {
                reportJunk(index);

                start = index;
                processed = index;
            } else if (b >= 0x80) {
                start = null;
            } else if (start === null) {
                // skip junk
            } else if (index >= start + 5) {
                const version = buffer [start + 5] & 255;
                const majorVersion = version >> 4;
                let length;
                if (majorVersion === 1) {
                    if (index >= start + 8) {
                        length = 10 + buffer [start + 8] * 6;
                    } else {
                        length = 10;
                    }
                } else if (majorVersion === 2) {
                    length = 16;
                } else if (majorVersion === 3) {
                    if (index >= start + 6) {
                        length = 8 + Telegram.getFrameCountForCommand(buffer [start + 6]) * 9;
                    } else {
                        length = 8;
                    }
                } else {
                    length = 0;
                }

                if (index === start + length - 1) {
                    let valid = true;
                    if (version === 0x10) {
                        if (!Header.calcAndCompareChecksum(version, buffer, start + 1, start + 9)) {
                            // console.log('checksum error in header');
                            valid = false;
                        }

                        let frameIndex = start + 10;
                        while (valid && (frameIndex < start + length)) {
                            if (!Header.calcAndCompareChecksum(version, buffer, frameIndex, frameIndex + 5)) {
                                // console.log('checksum error in frame index ' + frameIndex);
                                valid = false;
                            }
                            frameIndex += 6;
                        }
                    } else if (version === 0x20) {
                        if (!Header.calcAndCompareChecksum(version, buffer, start + 1, start + 15)) {
                            valid = false;
                        }
                    } else if (version === 0x30) {
                        if (!Header.calcAndCompareChecksum(version, buffer, start + 1, start + 7)) {
                            valid = false;
                        }

                        let frameIndex = start + 8;
                        while (valid && (frameIndex < start + length)) {
                            if (!Header.calcAndCompareChecksum(version, buffer, frameIndex, frameIndex + 8)) {
                                valid = false;
                            }
                            frameIndex += 9;
                        }
                    } else {
                        valid = false;
                    }

                    if (valid) {
                        if (majorVersion === 1) {
                            if (this.listenerCount('packet') > 0) {
                                const packet = Packet.fromLiveBuffer(buffer, start, index);
                                packet.timestamp = new Date(timestamp);
                                packet.channel = this.channel;
                                this.emit('packet', packet);
                            }
                        } else if (majorVersion === 2) {
                            if (this.listenerCount('datagram') > 0) {
                                const datagram = Datagram.fromLiveBuffer(buffer, start, index);
                                datagram.timestamp = new Date(timestamp);
                                datagram.channel = this.channel;
                                this.emit('datagram', datagram);
                            }
                        } else if (majorVersion === 3) {
                            if (this.listenerCount('telegram') > 0) {
                                const telegram = Telegram.fromLiveBuffer(buffer, start, index);
                                telegram.timestamp = new Date(timestamp);
                                telegram.channel = this.channel;
                                this.emit('telegram', telegram);
                            }
                        }
                    } else {
                        reportJunk(index + 1);
                    }

                    start = null;
                    processed = index + 1;
                }
            }

            index++;
        }

        const minProcessed = buffer.length - 1024;
        if (processed < minProcessed) {
            reportJunk(minProcessed);
            processed = minProcessed;
        }

        if (processed < buffer.length) {
            this.rxBuffer = buffer.slice(processed);
        } else {
            this.rxBuffer = null;
        }
    }

    _read() {
        // nop
    }

    _setConnectionState(newState) {
        if (this.connectionState !== newState) {
            this.connectionState = newState;

            this.rxBuffer = null;

            if (this.listenerCount('connectionState') > 0) {
                this.emit('connectionState', newState);
            }
        }
    }

    /**
     * Send raw data over this Connection instance.
     *
     * @param {Header|Buffer} data The Header or Buffer instance to be sent.
     */
    send(data) {
        if (data instanceof Header) {
            data = data.toLiveBuffer();
        }
        return this.push(data);
    }

    /**
     * Sends and / or receives a VBus data.
     *
     * @param {Header|Buffer} txData The Header or Buffer instance to be sent.
     * @param {object} options
     * @param {number} options.timeout Timeout in milliseconds after which the `txData` will be sent again
     * @param {number} options.timeoutIncr After each timeout retransmission the timeout value for the next try is increment by this value.
     * @param {number} options.tries After this number of tries the returned Promise will resolve with value `null`.
     * @param {?function} options.filterPacket Will be called when a Packet has been received with the Packet and a callback as arguments.
     * @param {?function} options.filterDatagram Will be called when a Datagram has been received with the Datagram and a callback as arguments.
     * @param {?function} options.filterTelegram Will be called when a Telegram has been received with the Telegram and a callback as arguments.
     * @returns {Promise} A Promise that either resolves to the VBus data selected by one of the filter callbacks or `null` on timeout.
     */
    async transceive(txData, options) {
        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 0,
            tries: 1,
            filterPacket: null,
            filterDatagram: null,
            filterTelegram: null,
        });

        let onPacket, onDatagram, onTelegram;
        try {
            let doneHandler = null;

            function done(err, result) {
                if (doneHandler) {
                    doneHandler(err, result);
                }
            }

            if (options.filterPacket) {
                onPacket = function(rxPacket) {
                    const result = options.filterPacket(rxPacket, done);

                    promiseToCallback(result, done);
                };

                this.on('packet', onPacket);
            }

            if (options.filterDatagram) {
                onDatagram = function(rxDatagram) {
                    const result = options.filterDatagram(rxDatagram, done);

                    promiseToCallback(result, done);
                };

                this.on('datagram', onDatagram);
            }

            if (options.filterTelegram) {
                onTelegram = function(rxTelegram) {
                    const result = options.filterTelegram(rxTelegram, done);

                    promiseToCallback(result, done);
                };

                this.on('telegram', onTelegram);
            }

            let { timeout } = options;

            for (let currentTry = 0; currentTry < options.tries; currentTry++) {
                let timer;
                try {
                    const donePromise = new Promise((resolve, reject) => {
                        doneHandler = (err, result) => {
                            if (err) {
                                reject(err);
                            } else {
                                resolve(result);
                            }
                        };
                    });

                    timer = setTimeout(() => {
                        timer = null;
                        done(null, null);
                    }, timeout);

                    if (txData) {
                        this.send(txData);
                    }

                    const result = await donePromise;

                    if (result) {
                        return result;
                    }

                    timeout += options.timeoutIncr;
                } finally {
                    if (timer) {
                        clearTimeout(timer);
                    }
                }
            }

            return null;
        } finally {
            if (onPacket) {
                this.removeListener('packet', onPacket);
                onPacket = null;
            }

            if (onDatagram) {
                this.removeListener('datagram', onDatagram);
                onDatagram = null;
            }

            if (onTelegram) {
                this.removeListener('telegram', onTelegram);
                onTelegram = null;
            }
        }
    }

    /**
     * Waits for a VBus bus offering datagram (Command 0x0500).
     *
     * Returns a Promise that resolves with the Datagram or `null` if the method timed out.
     * @param {number} timeout=20000 Timeout in milliseconds
     * @returns {Promise} A Promise that resolves to the bus offering Datagram or `null` on timeout.
     */
    waitForFreeBus(timeout) {
        const options = {
            tries: 1,
            timeout: timeout || 20000,
        };

        options.filterDatagram = function(rxDatagram, done) {
            if (rxDatagram.command === 0x0500) {
                done(null, rxDatagram);
            }
        };

        return this.transceive(null, options);
    }

    /**
     * Sends a VBus bus release datagram (Command 0x0600).
     * Returns a Promise that resolves with the first VBus packet received after the release or `null` on timeout.
     *
     * @param {number} address The VBus address of the master device to give the bus ownership back to.
     * @param {object} options
     * @param {number} options.tries=2 Number of tries to give the bus ownership back.
     * @param {number} options.timeout=1500 Time in milliseconds to wait between tries.
     */
    releaseBus(address, options) {
        options = applyDefaultOptions({}, options, {
            tries: 2,
            timeout: 1500
        });

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: 0x0600,
            valueId: 0,
            value: 0
        }).toLiveBuffer();

        options.filterPacket = function(rxPacket, done) {
            done(null, rxPacket);
        };

        return this.transceive(txDatagram, options);
    }

    /**
     * Sends a Datagram to get a value from a device.
     * Returns a Promise that resolves to the answer Datagram or `null` on timeout.
     *
     * @param {number} address The VBus address of the device to get the value from
     * @param {number} valueId The ID of the value to read from the device.
     * @param {object} options
     * @param {number} options.timeout=500 Time in milliseconds between tries.
     * @param {number} options.timeoutIncr=500 Additional time in milliseconds to increase the timeout per try.
     * @param {number} options.tries=3 Number of tries to get the value.
     * @returns {Promise} A promise that resolves to the received Datagram or `null` on timeout.
     */
    getValueById(address, valueId, options) {
        const _this = this;

        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 500,
            tries: 3,
        });

        const subIndex = (valueId >> 16) & 0x7F;
        valueId = valueId & 0xFFFF;
        const reqCommand = 0x0300 | subIndex;
        const ackCommand = 0x0100 | subIndex;
        const nackCommand = 0x4300 | subIndex;

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: reqCommand,
            valueId,
            value: 0
        }).toLiveBuffer();

        options.filterDatagram = function(rxDatagram, done) {
            if (rxDatagram.destinationAddress !== _this.selfAddress) {
                // nop
            } else if (rxDatagram.sourceAddress !== address) {
                // nop
            } else if (rxDatagram.command === ackCommand) {
                if (rxDatagram.valueId !== valueId) {
                    // nop
                } else {
                    done(null, rxDatagram);
                }
            } else if (rxDatagram.command !== nackCommand) {
                if (rxDatagram.valueId !== valueId) {
                    // nop
                } else {
                    done(null, rxDatagram);
                }
            }
        };

        return this.transceive(txDatagram, options);
    }

    /**
     * Sends a Datagram to set a value in a device.
     * Returns a Promise that resolves to the answer Datagram or `null` on timeout.
     *
     * @param {number} address The VBus address of the device to set the value in
     * @param {number} valueId The ID of the value to write to the device.
     * @param {number} value The value to write to the device.
     * @param {object} options
     * @param {number} options.timeout=500 Time in milliseconds between tries.
     * @param {number} options.timeoutIncr=500 Additional time in milliseconds to increase the timeout per try.
     * @param {number} options.tries=3 Number of tries to get the value.
     * @returns {Promise} A promise that resolves to the received Datagram or `null` on timeout.
     */
    setValueById(address, valueId, value, options) {
        const _this = this;

        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 500,
            tries: 3,
            save: false,
        });

        const subIndex = (valueId >> 16) & 0x7F;
        valueId = valueId & 0xFFFF;

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: (options.save ? 0x0400 : 0x0200) | subIndex,
            valueId,
            value
        }).toLiveBuffer();

        options.filterDatagram = function(rxDatagram, done) {
            if (rxDatagram.destinationAddress !== _this.selfAddress) {
                // nop
            } else if (rxDatagram.sourceAddress !== address) {
                // nop
            } else if (rxDatagram.command !== (0x0100 | subIndex)) {
                // nop
            } else if (rxDatagram.valueId !== valueId) {
                // nop
            } else {
                done(null, rxDatagram);
            }
        };

        return this.transceive(txDatagram, options);
    }

    /**
     * Sends a Datagram to lookup a value ID hash in a device.
     * Returns a Promise that resolves to the answer Datagram or `null` on timeout.
     *
     * @param  {number} address The VBus address of the device to lookup the value in.
     * @param  {number} valueId The ID of the value to lookup in the device.
     * @param  {object} options
     * @param  {number} options.timeout=500 Time in milliseconds between tries.
     * @param {number} options.timeoutIncr=500 Additional time in milliseconds to increase the timeout per try.
     * @param  {number} options.tries=3 Number of tries to lookup the value.
     * @return {Promise} A Promise the resolves to the received Datagram or `null` on timeout.
     */
    getValueIdHashById(address, valueId, options) {
        const _this = this;

        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 500,
            tries: 3,
        });

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: 0x1000,
            valueId,
            value: 0
        }).toLiveBuffer();

        options.filterDatagram = function(rxDatagram, done) {
            if (rxDatagram.destinationAddress !== _this.selfAddress) {
                // nop
            } else if (rxDatagram.sourceAddress !== address) {
                // nop
            } else if (rxDatagram.command !== 0x0100) {
                // nop
            } else if (rxDatagram.valueId !== valueId) {
                // nop
            } else {
                done(null, rxDatagram);
            }
        };

        return this.transceive(txDatagram, options);
    }

    /**
     * Sends a Datagram to lookup a value ID in a device.
     * Returns a Promise that resolves to the answer Datagram or `null` on timeout.
     *
     * @param  {number} address The VBus address of the device to lookup the value in.
     * @param  {number} valueIdHash The ID hash of the value to lookup in the device.
     * @param  {object} options
     * @param  {number} options.timeout=500 Time in milliseconds between tries.
     * @param {number} options.timeoutIncr=500 Additional time in milliseconds to increase the timeout per try.
     * @param  {number} options.tries=3 Number of tries to lookup the value.
     * @return {Promise} A Promise the resolves to the received Datagram or `null` on timeout.
     */
    getValueIdByIdHash(address, valueIdHash, options) {
        const _this = this;

        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 500,
            tries: 3,
        });

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: 0x1100,
            valueId: 0,
            value: valueIdHash
        }).toLiveBuffer();

        options.filterDatagram = function(rxDatagram, done) {
            if (rxDatagram.destinationAddress !== _this.selfAddress) {
                // nop
            } else if (rxDatagram.sourceAddress !== address) {
                // nop
            } else if ((rxDatagram.command !== 0x0100) && (rxDatagram.command !== 0x1101)) {
                // nop
            } else if (rxDatagram.value !== valueIdHash) {
                // nop
            } else {
                done(null, rxDatagram);
            }
        };

        return this.transceive(txDatagram, options);
    }

    /**
     * Sends a Datagram to lookup the controller's capabilities (part 1).
     * Returns a Promise that resolves to the answer Datagram or `null` on timeout.
     *
     * @param  {number} address The VBus address of the device to get the capabilities from.
     * @param  {object} options
     * @param  {number} options.timeout=500 Time in milliseconds between tries.
     * @param  {number} options.timeoutIncr=500 Additional time in milliseconds to increase the timeout per try.
     * @param  {number} options.tries=3 Number of tries to lookup the value.
     * @return {Promise} A Promise the resolves to the received Datagram or `null` on timeout.
     */
    getCaps1(address, options) {
        const _this = this;

        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 500,
            tries: 3,
        });

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: 0x1300,
            valueId: 0,
            value: 0,
        }).toLiveBuffer();

        options.filterDatagram = function(rxDatagram, done) {
            if (rxDatagram.destinationAddress !== _this.selfAddress) {
                // nop
            } else if (rxDatagram.sourceAddress !== address) {
                // nop
            } else if (rxDatagram.command !== 0x1301) {
                // nop
            } else {
                done(null, rxDatagram);
            }
        };

        return this.transceive(txDatagram, options);
    }

    /**
     * Sends a Datagram to begin a bulk valke transaction.
     * Returns a Promise that resolves to the answer Datagram or `null` on timeout.
     *
     * @param  {number} address The VBus address of the device to begin the transaction on.
     * @param  {number} txTimeout The number of seconds of inactivity after which the transaction is rolled back.
     * @param  {object} options
     * @param  {number} options.timeout=500 Time in milliseconds between tries.
     * @param  {number} options.timeoutIncr=500 Additional time in milliseconds to increase the timeout per try.
     * @param  {number} options.tries=3 Number of tries to lookup the value.
     * @return {Promise} A Promise the resolves to the received Datagram or `null` on timeout.
     */
    beginBulkValueTransaction(address, txTimeout, options) {
        const _this = this;

        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 500,
            tries: 3,
        });

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: 0x1400,
            valueId: 0,
            value: txTimeout
        }).toLiveBuffer();

        options.filterDatagram = function(rxDatagram, done) {
            if (rxDatagram.destinationAddress !== _this.selfAddress) {
                // nop
            } else if (rxDatagram.sourceAddress !== address) {
                // nop
            } else if (rxDatagram.command !== 0x1401) {
                // nop
            } else {
                done(null, rxDatagram);
            }
        };

        return this.transceive(txDatagram, options);
    }

    /**
     * Sends a Datagram to commit a bulk valke transaction.
     * Returns a Promise that resolves to the answer Datagram or `null` on timeout.
     *
     * @param  {number} address The VBus address of the device to commit the transaction on.
     * @param  {object} options
     * @param  {number} options.timeout=500 Time in milliseconds between tries.
     * @param  {number} options.timeoutIncr=500 Additional time in milliseconds to increase the timeout per try.
     * @param  {number} options.tries=3 Number of tries to lookup the value.
     * @return {Promise} A Promise the resolves to the received Datagram or `null` on timeout.
     */
    commitBulkValueTransaction(address, options) {
        const _this = this;

        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 500,
            tries: 3,
        });

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: 0x1402,
            valueId: 0,
            value: 0
        }).toLiveBuffer();

        options.filterDatagram = function(rxDatagram, done) {
            if (rxDatagram.destinationAddress !== _this.selfAddress) {
                // nop
            } else if (rxDatagram.sourceAddress !== address) {
                // nop
            } else if (rxDatagram.command !== 0x1403) {
                // nop
            } else {
                done(null, rxDatagram);
            }
        };

        return this.transceive(txDatagram, options);
    }

    /**
     * Sends a Datagram to rollback a bulk valke transaction.
     * Returns a Promise that resolves to the answer Datagram or `null` on timeout.
     *
     * @param  {number} address The VBus address of the device to perform the rollback on.
     * @param  {object} options
     * @param  {number} options.timeout=500 Time in milliseconds between tries.
     * @param  {number} options.timeoutIncr=500 Additional time in milliseconds to increase the timeout per try.
     * @param  {number} options.tries=3 Number of tries to lookup the value.
     * @return {Promise} A Promise the resolves to the received Datagram or `null` on timeout.
     */
    rollbackBulkValueTransaction(address, options) {
        const _this = this;

        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 500,
            tries: 3,
        });

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: 0x1404,
            valueId: 0,
            value: 0
        }).toLiveBuffer();

        options.filterDatagram = function(rxDatagram, done) {
            if (rxDatagram.destinationAddress !== _this.selfAddress) {
                // nop
            } else if (rxDatagram.sourceAddress !== address) {
                // nop
            } else if (rxDatagram.command !== 0x1405) {
                // nop
            } else {
                done(null, rxDatagram);
            }
        };

        return this.transceive(txDatagram, options);
    }

    /**
     * Sends a Datagram to set a value during a bulk value transaction.
     * Returns a Promise that resolves to the answer Datagram or `null` on timeout.
     *
     * @param  {number} address The VBus address of the device to set the value on.
     * @param  {number} valueId The ID of the value to write to the device.
     * @param  {number} value The value to write to the device.
     * @param  {object} options
     * @param  {number} options.timeout=500 Time in milliseconds between tries.
     * @param  {number} options.timeoutIncr=500 Additional time in milliseconds to increase the timeout per try.
     * @param  {number} options.tries=3 Number of tries to lookup the value.
     * @return {Promise} A Promise the resolves to the received Datagram or `null` on timeout.
     */
    setBulkValueById(address, valueId, value, options) {
        const _this = this;

        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 500,
            tries: 3,
        });

        const subIndex = (valueId >> 16) & 0x7F;
        valueId = valueId & 0xFFFF;

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: 0x1500 | subIndex,
            valueId,
            value
        }).toLiveBuffer();

        options.filterDatagram = function(rxDatagram, done) {
            if (rxDatagram.destinationAddress !== _this.selfAddress) {
                // nop
            } else if (rxDatagram.sourceAddress !== address) {
                // nop
            } else if (rxDatagram.command !== (0x1600 | subIndex)) {
                // nop
            } else if (rxDatagram.valueId !== valueId) {
                // nop
            } else {
                done(null, rxDatagram);
            }
        };

        return this.transceive(txDatagram, options);
    }

    ping(address, valueId, value, options) {
        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 500,
            tries: 3,
        });

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: 0x1700,
            valueId,
            value,
        }).toLiveBuffer();

        options.filterDatagram = async (rxDatagram) => {
            if (rxDatagram.destinationAddress !== this.selfAddress) {
                // nop
            } else if (rxDatagram.sourceAddress !== address) {
                // nop
            } else if (rxDatagram.command !== 0x1701) {
                // nop
            } else if (rxDatagram.valueId !== valueId) {
                // nop
            } else if (rxDatagram.value !== value) {
                // nop
            } else {
                return rxDatagram;
            }
        };

        return this.transceive(txDatagram, options);
    }

    getStorageActivity(address, options) {
        options = applyDefaultOptions({}, options, {
            timeout: 500,
            timeoutIncr: 500,
            tries: 3,
        });

        const txDatagram = new Datagram({
            destinationAddress: address,
            sourceAddress: this.selfAddress,
            command: 0x1702,
            valueId: 0,
            value: 0,
        }).toLiveBuffer();

        options.filterDatagram = async (rxDatagram) => {
            if (rxDatagram.destinationAddress !== this.selfAddress) {
                // nop
            } else if (rxDatagram.sourceAddress !== address) {
                // nop
            } else if (rxDatagram.command !== 0x1703) {
                // nop
            } else {
                return rxDatagram;
            }
        };

        return this.transceive(txDatagram, options);
    }

    /**
     * Creates a promise that resolves when this Connection
     * instance is connected and rejects if it is disconnected.
     * If it is neither connected nor disconnected the promise
     * will stay pending until one of the states is entered.
     *
     * @returns {Promise}
     */
    createConnectedPromise() {
        const _this = this;

        return new Promise((resolve, reject) => {
            const checkConnectionState = function(state) {
                if (state === Connection.STATE_DISCONNECTED) {
                    reject(new Error(state));
                    return true;
                } else if (state === Connection.STATE_CONNECTED) {
                    resolve();
                    return true;
                } else {
                    return false;
                }
            };

            if (!checkConnectionState(_this.connectionState)) {
                const onConnectionState = function(state) {
                    if (checkConnectionState(state)) {
                        _this.removeListener('connectionState', onConnectionState);
                    }
                };

                _this.on('connectionState', onConnectionState);
            }
        });
    }

}


Object.assign(Connection.prototype, /** @lends Connection.prototype */ {

    /**
     * Reference to this instance's DataSource.
     * @type {DataSource}
     */
    dataSource: null,

    /**
     * The VBus channel that this connection is established to.
     * All `Header` instances created by this `Connection` instance will be assigned
     * this VBus channel.
     * @type {number}
     */
    channel: 0,

    /**
     * The VBus address used for sending information over this connection.
     * @type {number}
     */
    selfAddress: 0x0020,

    /**
     * The current connection state.
     * @type {string}
     */
    connectionState: states.STATE_DISCONNECTED,

    /**
     * The internal receive buffer of this conneciton.
     * @type {Buffer}
     */
    rxBuffer: null,

});


Object.assign(Connection, states);



module.exports = Connection;