danielwippermann/resol-vbus

View on GitHub
examples/vbustouch-proxy/index.js

Summary

Maintainability
F
6 days
Test Coverage
/*! resol-vbus | Copyright (c) 2013-present, Daniel Wippermann | MIT license */

const fs = require('fs');
const os = require('os');
const path = require('path');


const express = require('express');
const morgan = require('morgan');
const request = require('request');
const winston = require('winston');
const mqtt = require('mqtt');


const {
    Converter,
    DLxJsonConverter,
    FileListReader,
    HeaderSet,
    HeaderSetConsolidator,
    Packet,
    SerialConnection,
    Specification,
    TcpConnection,
    TextConverter,
    VBusRecordingConverter,
    utils: {
        normalizeDatecode,
        promisify,
    },
} = require('../resol-vbus');


const config = require('./config');



const specification = Specification.getDefaultSpecification();



const logger = winston.createLogger({
    transports: [
        new winston.transports.Console({
            level: 'debug',
            format: winston.format.combine(
                winston.format.colorize(),
                winston.format.simple()
            ),
        }),
    ],
});


const connectionClassByName = {
    SerialConnection,
    TcpConnection,
};


const headerSetConsolidator = new HeaderSetConsolidator({
    interval: config.loggingInterval,
    timeToLive: config.loggingTimeToLive,
});


const textHeaderSetConsolidator = new HeaderSetConsolidator({
    timeToLive: config.textLoggingTimeToLive,
});


/**
 * This function is called once the header set is considered "settled".
 * That means that the amount of unique packets in the header set has
 * been stable for a certain amount of time.
 *
 * @param {HeaderSet} headerSet
 */
const headerSetHasSettled = function(headerSet) {
    const packetFields = specification.getPacketFieldsForHeaders(headerSet.getHeaders());

    logger.debug(packetFields.map((packetField) => {
        return packetField.id + ': ' + packetField.name;
    }).join('\n'));
};


/**
 * Connect to the VBus and store the packets into the global HeaderSetConsolidator.
 */
const connectToVBus = async () => {
    const ConnectionClass = connectionClassByName [config.connectionClassName];
    const connection = new ConnectionClass(config.connectionOptions);

    connection.on('connectionState', (connectionState) => {
        logger.debug('Connection state changed to ' + connectionState);
    });

    let hasSettled = false;
    let headerSet = new HeaderSet();
    let settledCountdown = 0;

    connection.on('packet', (packet) => {
        // logger.debug('Packet received...', packet);

        if (!hasSettled) {
            const headerCountBefore = headerSet.getHeaderCount();
            headerSet.addHeader(packet);
            const headerCountAfter = headerSet.getHeaderCount();

            if (headerCountBefore !== headerCountAfter) {
                settledCountdown = headerCountAfter * 2;
            } else if (settledCountdown > 0) {
                settledCountdown -= 1;
            } else {
                hasSettled = true;

                headerSetHasSettled(headerSet);
                headerSet = null;
            }
        }

        headerSetConsolidator.addHeader(packet);
        textHeaderSetConsolidator.addHeader(packet);
    });

    logger.debug('Connecting to VBus...');

    await connection.connect();

    logger.debug('Connected to VBus...');
};


/**
 * Responds to '/cgi-bin/get_resol_device_information' requests.
 * It replies with a fake DL2v2 response to trick VBusTouch to connect with it.
 */
const processGetResolDeviceInformationRequest = function(req, res) {
    const content = [
        'vendor = "RESOL"',
        'product = "DL2"',
        'serial = "001E66000000"',
        'version = "2.1.0"',
        'build = "201311280853"',
        'name = "DL2-001E66000000"',
        'features = "vbus,dl2"',
    ].join('\n');

    res.status(200);
    res.set('Content-Type', 'text/plain');
    res.send(content);
};


/**
 * Allows rewriting a HeaderSet read from the file system before it is send over the network.
 *
 * The example below converts the information from a DeltaSol MX to a Vitosolic 200.
 *
 * @param headerSet A `HeaderSet` instance containing the received `Packet` instances.
 * @returns A `HeaderSet` instance containing the forged `Packet` instances.
 */
const rewriteWebHeaderSet = function(headerSet) {
    const timestamp = headerSet.timestamp;

    /*
     * Get list of packet fields in the received packets.
     */
    let packetFields = specification.getPacketFieldsForHeaders(headerSet.getHeaders());

    /*
     * Map all existing packet fields into an object for easier access.
     */
    const origRawValueById = packetFields.reduce((memo, packetField) => {
        memo [packetField.id] = packetField.rawValue;
        return memo;
    }, {});

    /*
     * Pick the interesting fields from the raw values.
     */
    const tempCollector = origRawValueById ['00_0010_7E11_10_0100_000_2_0'];
    const tempStoreBottom = origRawValueById ['00_0010_7E11_10_0100_002_2_0'];
    const tempStoreTop = origRawValueById ['00_0010_7E11_10_0100_004_2_0'];
    const pumpSpeed = origRawValueById ['00_0010_7E11_10_0100_076_1_0'];
    const info1 = origRawValueById ['00_0010_7E11_10_0100_077_1_0'];
    const info2 = origRawValueById ['00_0010_7E11_10_0100_078_1_0'];
    const errorMask = origRawValueById ['00_0010_7E11_10_0100_096_4_0'];
    const tempFlow = origRawValueById ['00_0010_7E11_10_0100_006_2_0'];
    const tempReturn = origRawValueById ['00_0010_7E11_10_0100_008_2_0'];
    const flow = origRawValueById ['00_0010_7E11_10_0100_040_4_0'];
    const heat = origRawValueById ['00_0010_7E31_10_0100_000_4_0'];

    /*
     * Create an empty packet of the supported Vitosolic 200.
     */
    const supportedPacket1 = new Packet({
        timestamp,
        channel: 0,
        destinationAddress: 0x0010,  // DFA
        sourceAddress: 0x7321,  // Vitosolic 200 [Regler]
        command: 0x0100,
        frameCount: 20,
    });

    /*
     * Create an empty packet for the heat quantity information.
     */
    const supportedPacket2 = new Packet({
        timestamp,
        channel: 0,
        destinationAddress: 0x0010,  // DFA
        sourceAddress: 0x7326,  // Vitosolic 200 [WMZ1]
        command: 0x0100,
        frameCount: 3,
    });

    /*
     * Create a new header set and add the forged packets to it.
     */
    headerSet = new HeaderSet({
        timestamp,
    });
    headerSet.addHeader(supportedPacket1);
    headerSet.addHeader(supportedPacket2);

    /*
     * Get list of packet fields in the forged packets.
     */
    packetFields = specification.getPacketFieldsForHeaders(headerSet.getHeaders());

    /*
     * Transfered the information picked from the received packets into the forged ones.
     */
    specification.setPacketFieldRawValues(packetFields, {
        '00_0010_7321_10_0100_000_2_0': tempCollector,
        '00_0010_7321_10_0100_002_2_0': tempStoreBottom,
        '00_0010_7321_10_0100_004_2_0': tempStoreTop,
        '00_0010_7321_10_0100_044_1_0': pumpSpeed,
        '00_0010_7321_10_0100_045_1_0': info1,
        '00_0010_7321_10_0100_046_1_0': info2,
        '00_0010_7321_10_0100_060_2_0': errorMask,
        '00_0010_7326_10_0100_000_2_0': tempFlow,
        '00_0010_7326_10_0100_002_2_0': tempReturn,
        '00_0010_7326_10_0100_004_2_0': flow,
    });

    supportedPacket2.frameData.writeUInt16LE(heat % 1000, 6);
    supportedPacket2.frameData.writeUInt16LE((heat / 1000) % 1000, 8);
    supportedPacket2.frameData.writeUInt16LE((heat / 1000000) % 1000, 10);

    /*
     * Return the forged header set.
     */
    return headerSet;
};


/**
 * Responds to '/dlx/download/download' requests.
 * It only implements the parts of the endpoint that are necessary for VBusTouch to work.
 */
const processDownloadDownloadRequest = async (req, res) => {
    try {
        const i18n = specification.i18n;

        const startDate = req.query.startDate ? i18n.momentUtc(req.query.startDate, 'MM/DD/YYYY') : i18n.momentUtc();
        const endDate = req.query.endDate ? i18n.momentUtc(req.query.endDate, 'MM/DD/YYYY') : i18n.momentUtc();

        const query = {
            source: req.query.source,
            outputType: req.query.outputType,
            dataLanguage: req.query.dataLanguage || 'en',
            startDate: startDate.toDate(),
            endDate: endDate.toDate(),
            sieveInterval: req.query.sieveInterval | 0,
            ttl: req.query.ttl | 0,
        };

        // logger.debug(req.query, query);

        let outputConverter, contentType;
        if (query.outputType === 'vbus') {
            outputConverter = new VBusRecordingConverter({});
            contentType = 'application/octet-stream';
        } else if (query.outputType === 'json') {
            outputConverter = new DLxJsonConverter({});
            contentType = 'application/json; charset=utf-8';
        } else {
            throw new Error('Unsupported output type ' + JSON.stringify(query.outputType));
        }

        const chunks = [];

        const onOutputConverterReadable = function() {
            let chunk;
            while ((chunk = outputConverter.read()) != null) {
                chunks.push(chunk);
            }
        };

        const onOutputConverterEnd = function() {
            // logger.debug('C2 end event received');
        };

        outputConverter.on('readable', onOutputConverterReadable);

        outputConverter.on('end', onOutputConverterEnd);

        try {
            if (query.source === 'current') {
                outputConverter.convertHeaderSet(headerSetConsolidator);
            } else {
                await promisify(cb => {
                    let cleanup = () => {
                        cb(new Error('cleanup called too early'));
                    };

                    const hsc = new HeaderSetConsolidator({
                        interval: query.sieveInterval * 1000,
                        timeToLive: query.ttl * 1000,
                        minTimestamp: startDate.startOf('day').toDate(),
                        maxTimestamp: startDate.endOf('day').toDate(),
                    });

                    const onHscHeaderSet = function(headerSet) {
                        // logger.debug(`HSC @headerSet ${headerSet.timestamp.toISOString()}`);

                        if (config.rewriteWebHeaderSets) {
                            headerSet = rewriteWebHeaderSet(headerSet);
                        }

                        outputConverter.convertHeaderSet(headerSet);
                    };

                    hsc.on('headerSet', onHscHeaderSet);

                    const inputConverter = new VBusRecordingConverter();

                    let first = true;

                    const onInputConverterHeaderSet = function(headerSet) {
                        // logger.debug('C1 headerSet event received', headerSet.timestamp);

                        if (first) {
                            first = false;
                            hsc.timestamp = headerSet.timestamp;
                        }

                        hsc.processHeaderSet(headerSet);
                    };

                    const onInputConverterFinish = function() {
                        // logger.debug('C1 finish event received');

                        cleanup(null);
                    };

                    inputConverter.on('headerSet', onInputConverterHeaderSet);

                    inputConverter.on('finish', onInputConverterFinish);

                    const flr = new FileListReader({
                        dirname: config.loggingPath,
                        minDatecode: query.startDate,
                        maxDatecode: query.endDate,
                    });

                    flr.on('error', err => {
                        cleanup(err);
                    });

                    flr.pipe(inputConverter);

                    cleanup = (err) => {
                        inputConverter.removeListener('headerSet', onInputConverterHeaderSet);
                        inputConverter.removeListener('finish', onInputConverterFinish);
                        hsc.removeListener('headerSet', onHscHeaderSet);

                        cb(err);
                    };
                });
            }

            await outputConverter.finish();
        } finally {
            outputConverter.removeListener('readable', onOutputConverterReadable);
            outputConverter.removeListener('end', onOutputConverterEnd);
        }

        const buffer = Buffer.concat(chunks);

        res.status(200);
        res.set('content-type', contentType);
        res.send(buffer);
    } catch (err) {
        global.console.log(err, err.toString(), err.stack);

        res.status(500);
        res.set('content-type', 'text/plain');
        res.send(err.toString());
    }
};


/**
 * Start the web server.
 */
const startWebServer = async () => {
    logger.debug('Starting web server...');

    const app = express();

    app.use(morgan('dev'));
    app.use(express.query());

    app.get('/cgi-bin/get_resol_device_information', processGetResolDeviceInformationRequest);
    app.get('/dlx/download/download', processDownloadDownloadRequest);

    app.listen(3000, () => {
        logger.debug('Started web server at: ');
        logger.debug('  - http://0.0.0.0:' + config.webServerPort + '/ (internal)');
        for (const iface of Object.values(os.networkInterfaces())) {
            for (const ifaceConfig of iface) {
                if (ifaceConfig.family === 'IPv4') {
                    logger.debug('  - http://' + ifaceConfig.address + ':' + config.webServerPort + '/' + (ifaceConfig.internal ? ' (internal)' : ''));
                }
            }
        }
    });
};


const startHeaderSetConsolidatorTimer = async () => {
    logger.debug('Starting HeaderSetConsolidator timer...');

    headerSetConsolidator.startTimer();
};


const startMqttLogging = async () => {
    const onHeaderSet = async (headerSet, client) => {
        const headers = headerSet.getSortedHeaders();
        const packetFields = specification.getPacketFieldsForHeaders(headers);

        const valuesById = packetFields.reduce((memo, pf) => {
            if (pf.rawValue != null) {
                const precision = pf.packetFieldSpec.type.precision;

                const roundedRawValue = pf.rawValue.toFixed(precision);

                // logger.debug('ID = ' + JSON.stringify(pf.id) + ', Name = ' + JSON.stringify(pf.name) + ', Value = ' + pf.rawValue + ', RoundedValue = ' + roundedRawValue);

                memo [pf.id] = roundedRawValue;
            }
            return memo;
        }, {});

        let payload;
        if (config.mqttEncoding === 'urlencoded') {
            payload = Object.keys(config.mqttPacketFieldMap).reduce((memo, key) => {
                const packetFieldId = config.mqttPacketFieldMap [key];

                let value;
                if (typeof packetFieldId === 'function') {
                    value = packetFieldId(valuesById);
                } else {
                    value = valuesById [packetFieldId];
                }
                if (typeof value === 'number') {
                    value = value.toString();
                }
                if (typeof value === 'string') {
                    if (memo.length > 0) {
                        memo += '&';
                    }
                    memo += `${encodeURIComponent(key)}=${encodeURIComponent(value)}`;
                }
                return memo;
            }, '');
        } else {
            const params = Object.keys(config.mqttPacketFieldMap).reduce((memo, key) => {
                const packetFieldId = config.mqttPacketFieldMap [key];

                let value;
                if (typeof packetFieldId === 'function') {
                    value = packetFieldId(valuesById);
                } else {
                    value = valuesById [packetFieldId];
                }
                if (typeof value === 'number') {
                    value = value.toString();
                }
                if (typeof value === 'string') {
                    memo [key] = value;
                }
                return memo;
            }, {});

            payload = JSON.stringify(params);
        }

        if (payload) {
            client.publish(config.mqttTopic, payload, err => {
                if (err) {
                    logger.error(err);
                }
            });
        }
    };

    if (config.mqttInterval) {
        logger.debug('Starting MQTT logging');
        const client = mqtt.connect(config.mqttConnect);

        client.on('error', err => {
            logger.error(err);
        });

        client.once('connect', () => {
            const hsc = new HeaderSetConsolidator({
                interval: config.mqttInterval,
            });

            hsc.on('headerSet', () => {
                onHeaderSet(headerSetConsolidator, client).then(null, err => {
                    if (err) {
                        logger.error(err);
                    }
                });
            });

            hsc.startTimer();
        });
    }
};

const startPvOutputOrgLogging = async () => {
    const onHeaderSet = async (headerSet) => {
        const headers = headerSet.getSortedHeaders();
        const packetFields = specification.getPacketFieldsForHeaders(headers);

        const valuesById = packetFields.reduce((memo, pf) => {
            const precision = pf.packetFieldSpec.type.precision;

            const roundedRawValue = pf.rawValue.toFixed(precision);

            // logger.debug('ID = ' + JSON.stringify(pf.id) + ', Name = ' + JSON.stringify(pf.name) + ', Value = ' + pf.rawValue + ', RoundedValue = ' + roundedRawValue);

            memo [pf.id] = roundedRawValue;
            return memo;
        }, {});

        const timestamp = specification.i18n.moment(headerSet.timestamp);

        const params = Object.keys(config.pvOutputOrgPacketFieldMap).reduce((memo, key) => {
            const packetFieldId = config.pvOutputOrgPacketFieldMap [key];

            let value;
            if (typeof packetFieldId === 'function') {
                value = packetFieldId(valuesById);
            } else {
                value = valuesById [packetFieldId];
            }
            if (typeof value === 'number') {
                value = value.toString();
            }
            if (typeof value === 'string') {
                memo [key] = value;
            }
            return memo;
        }, {
            key: config.pvOutputOrgApiKey,
            sid: config.pvOutputOrgSystemId,
            d: timestamp.format('YYYYMMDD'),
            t: timestamp.format('HH:mm'),
        });

        request({
            url: 'http://pvoutput.org/service/r2/addstatus.jsp',
            qs: params,
        }, (error, response, body) => {
            logger.debug(error, response, body);
        });
    };

    if (config.pvOutputOrgInterval) {
        logger.debug('Starting PvOutput.org logging');

        const hsc = new HeaderSetConsolidator({
            interval: config.pvOutputOrgInterval,
        });

        hsc.on('headerSet', () => {
            onHeaderSet(headerSetConsolidator).then(null, err => {
                logger.error(err);
            });
        });

        hsc.startTimer();
    }
};


const startTextLogging = async () => {
    let currentDatecode = null;

    let currentConverter = null;

    const onHeaderSet = async (headerSet) => {
        const datecode = specification.i18n.moment(headerSet.timestamp).format('YYYYMMDD');
        if (currentDatecode !== datecode) {
            currentDatecode = datecode;

            if (currentConverter) {
                currentConverter.finish();
                currentConverter = null;
            }

            const filename = path.resolve(config.textLoggingPath, datecode + '.csv');

            const file = fs.createWriteStream(filename, { flags: 'a' });

            const options = Object.assign({}, config.textLoggingOptions, {
                specification,
            });

            const converter = new TextConverter(options);
            converter.pipe(file);

            currentConverter = converter;
        }

        if (currentConverter) {
            currentConverter.convertHeaderSet(headerSet);
        }
    };

    if (config.textLoggingInterval) {
        logger.debug('Starting text logging');

        const hsc = new HeaderSetConsolidator({
            interval: config.textLoggingInterval,
        });

        hsc.on('headerSet', () => {
            onHeaderSet(textHeaderSetConsolidator);
        });

        hsc.startTimer();
    }
};


const startRecorder = async () => {
    let filename = null;

    const converter = new VBusRecordingConverter();

    const onReadable = () => {
        const chunks = [];

        let chunk;
        while ((chunk = converter.read()) != null) {
            chunks.push(chunk);
        }

        if (filename) {
            fs.appendFileSync(filename, Buffer.concat(chunks));
        }
    };

    const onHeaderSet = function(headerSet) {
        // logger.debug('HeaderSet consolidated...');

        const datecode = normalizeDatecode(headerSet.timestamp);

        filename = path.resolve(config.loggingPath, `${datecode}.vbus`);

        converter.convertHeaderSet(headerSet);
    };

    converter.on('readable', onReadable);
    headerSetConsolidator.on('headerSet', onHeaderSet);
};


const main = async () => {
    await connectToVBus();

    await startWebServer();

    await startHeaderSetConsolidatorTimer();

    await startMqttLogging();

    await startPvOutputOrgLogging();

    await startTextLogging();

    await startRecorder();
};



if (require.main === module) {
    main(process.argv.slice(2)).then(() => {
        logger.info('DONE!');
    }, err => {
        logger.error(err);
    });
} else {
    module.exports = main;
}