src/vbus-recording-converter.js
/*! resol-vbus | Copyright (c) 2013-present, Daniel Wippermann | MIT license */
const moreints = require('buffer-more-ints');
const Datagram = require('./datagram');
const HeaderSet = require('./header-set');
const Packet = require('./packet');
const Telegram = require('./telegram');
const { applyDefaultOptions } = require('./utils');
const Converter = require('./converter');
class VBusRecordingConverter extends Converter {
/**
* Creates a new VBusRecordingConverter instance.
*
* @constructs
* @augments Converter
*
* @classdesc
* This Converter subclass converts Header and HeaderSet instances to and from a binary stream that
* conforms to the VBus Recording File Format (the binary file format used e.g. to store data on the
* Datalogger devices).
*/
constructor(options) {
super(options);
applyDefaultOptions(this, options, /** @lends VBusRecordingConverter.prototype */ {
topologyScanOnly: false,
});
this.knownHeaderIds = {};
}
reset() {
this.rxBuffer = null;
}
end() {
const _this = this;
if (this.objectMode) {
// nop
} else if (this.topologyScanOnly) {
this._processBuffer(Buffer.alloc(0), true, (record) => {
_this._processRecordForTopologyScan(record);
});
this._constructTopologyHeaderSet();
} else {
this._processBuffer(Buffer.alloc(0), true, (record) => {
_this._processRecord(record);
});
}
this._emitHeaderSet();
return Converter.prototype.end.apply(this, arguments);
}
convertRawData(rawData) {
if (this.objectMode) {
return Converter.prototype.convertRawData.apply(this, arguments);
} else {
const buffers = [];
const createBuffer = function(type, length, timestamp) {
const buffer = Buffer.alloc(length);
buffer.fill(0);
buffer [0] = 0xA5;
buffer [1] = (type & 0x0F) | ((type & 0x0F) << 4);
buffer.writeUInt16LE(length, 2);
buffer.writeUInt16LE(length, 4);
moreints.writeUInt64LE(buffer, timestamp.getTime(), 6);
return buffer;
};
let buffer;
if (rawData.channel !== this.currentChannel) {
buffer = createBuffer(7, 16, new Date(0));
buffer.writeUInt16LE(rawData.channel, 14);
buffers.push(buffer);
this.currentChannel = rawData.channel;
}
buffer = createBuffer(8, 22 + rawData.buffer.length, rawData.startTimestamp);
moreints.writeUInt64LE(buffer, rawData.endTimestamp.getTime(), 14);
rawData.buffer.copy(buffer, 22, 0, rawData.buffer.length);
buffers.push(buffer);
buffer = Buffer.concat(buffers);
return this.push(buffer);
}
}
convertComment(timestamp, comment) {
if (this.objectMode) {
return Converter.prototype.convertComment.apply(this, arguments);
} else {
const buffers = [];
const createBuffer = function(type, length, timestamp) {
const buffer = Buffer.alloc(length);
buffer.fill(0);
buffer [0] = 0xA5;
buffer [1] = (type & 0x0F) | ((type & 0x0F) << 4);
buffer.writeUInt16LE(length, 2);
buffer.writeUInt16LE(length, 4);
moreints.writeUInt64LE(buffer, timestamp.getTime(), 6);
return buffer;
};
const rawData = Buffer.from(comment.toString());
let buffer;
buffer = createBuffer(9, 14 + rawData.length, timestamp);
rawData.copy(buffer, 14, 0, rawData.length);
buffers.push(buffer);
buffer = Buffer.concat(buffers);
return this.push(buffer);
}
}
convertHeader(header) {
if (this.objectMode) {
return Converter.prototype.convertHeader.apply(this, arguments);
} else {
return this._convertHeaders(header.timestamp, [ header ]);
}
}
convertHeaderSet(headerSet) {
if (this.objectMode) {
return Converter.prototype.convertHeaderSet.apply(this, arguments);
} else {
return this._convertHeaders(headerSet.timestamp, headerSet.getSortedHeaders());
}
}
_convertHeaders(timestamp, headers) {
const buffers = [];
const createBuffer = function(type, length, timestamp) {
const buffer = Buffer.alloc(length);
buffer.fill(0);
buffer [0] = 0xA5;
buffer [1] = (type & 0x0F) | ((type & 0x0F) << 4);
buffer.writeUInt16LE(length, 2);
buffer.writeUInt16LE(length, 4);
moreints.writeUInt64LE(buffer, timestamp.getTime(), 6);
return buffer;
};
let buffer = createBuffer(4, 14, timestamp);
buffers.push(buffer);
let currentChannel = 0;
for (const header of headers) {
const majorVersion = header.getProtocolVersion() >> 4;
let dataLength;
if (majorVersion === 1) {
dataLength = header.frameCount * 4;
} else if (majorVersion === 2) {
dataLength = 6;
} else if (majorVersion === 3) {
dataLength = header.getFrameCount() * 7;
} else {
dataLength = -1;
}
if (dataLength >= 0) {
let buffer;
if (currentChannel !== header.channel) {
currentChannel = header.channel;
buffer = createBuffer(7, 16, new Date(0));
buffer.writeUInt16LE(header.channel, 14);
buffers.push(buffer);
}
buffer = createBuffer(6, 26 + dataLength, header.timestamp);
buffer.writeUInt16LE(header.destinationAddress, 14);
buffer.writeUInt16LE(header.sourceAddress, 16);
buffer.writeUInt16LE(header.getProtocolVersion(), 18);
/* istanbul ignore else */
if (majorVersion === 1) {
buffer.writeUInt16LE(header.command, 20);
buffer.writeUInt16LE(dataLength, 22);
buffer.writeUInt16LE(0, 24);
header.frameData.copy(buffer, 26, 0, dataLength);
} else if (majorVersion === 2) {
buffer.writeUInt16LE(header.command, 20);
buffer.writeUInt16LE(dataLength, 22);
buffer.writeUInt16LE(0, 24);
buffer.writeInt16LE(header.valueId, 26);
buffer.writeInt32LE(header.value, 28);
} else if (majorVersion === 3) {
buffer.writeUInt16LE(header.command, 20);
buffer.writeUInt16LE(dataLength, 22);
buffer.writeUInt16LE(0, 24);
header.frameData.copy(buffer, 26, 0, dataLength);
}
buffers.push(buffer);
}
}
buffer = Buffer.concat(buffers);
return this.push(buffer);
}
_read() {
// nop
}
_write(chunk, encoding, callback) {
const _this = this;
if (this.objectMode) {
return Converter.prototype._write.apply(this, arguments);
} else if (this.topologyScanOnly) {
this._processBuffer(chunk, false, (record) => {
_this._processRecordForTopologyScan(record);
});
callback();
} else {
this._processBuffer(chunk, false, (record) => {
_this._processRecord(record);
});
callback();
}
}
_processBuffer(chunk, endOfStream, processRecord) {
let buffer;
if (this.rxBuffer) {
buffer = Buffer.concat([ this.rxBuffer, chunk ]);
} else {
buffer = chunk;
}
const getRecordLength = function(index) {
let length;
if (index > buffer.length - 6) {
length = -1;
} else if (buffer [index] !== 0xA5) {
length = 0;
} else if ((buffer [index + 1] >> 4) !== (buffer [index + 1] & 15)) {
length = 0;
} else if (buffer [index + 2] !== buffer [index + 4]) {
length = 0;
} else if (buffer [index + 3] !== buffer [index + 5]) {
length = 0;
} else {
length = buffer.readUInt16LE(index + 2);
if ((index + length) > buffer.length) {
length = -1;
}
}
return length;
};
let currentIndex = 0, currentLength = getRecordLength(0), nextIndex, nextLength, start = 0;
while ((currentLength >= 0) && (currentIndex < buffer.length)) {
if (currentLength > 0) {
nextIndex = currentIndex + currentLength;
} else {
nextIndex = currentIndex + 1;
}
nextLength = getRecordLength(nextIndex);
if ((nextLength < 0) && !endOfStream) {
break;
}
if ((currentLength > 0) && ((nextLength > 0) || (nextIndex === buffer.length))) {
const record = buffer.slice(currentIndex, nextIndex);
processRecord(record);
start = nextIndex;
} else if (nextIndex !== (currentIndex + 1)) {
nextIndex = currentIndex + 1;
nextLength = getRecordLength(nextIndex);
if (nextLength < 0) {
break;
}
}
currentIndex = nextIndex;
currentLength = nextLength;
}
const maxLength = 65536;
if (buffer.length - start >= maxLength) {
start = buffer.length - maxLength;
}
if (start < buffer.length) {
this.rxBuffer = Buffer.from(buffer.slice(start));
} else {
this.rxBuffer = null;
}
}
_processRecord(buffer) {
const type = buffer [1] & 0x0F;
const timestamp = moreints.readUInt64LE(buffer, 6);
if (type === 3) {
this._processType3Record(buffer, timestamp);
} else if (type === 4) {
this._emitHeaderSet();
this.headerSet = new HeaderSet();
this.headerSetTimestamp = new Date(timestamp);
this.currentChannel = 0;
} else if ((type === 6) && (buffer.length >= 20)) {
const destinationAddress = buffer.readUInt16LE(14);
const sourceAddress = buffer.readUInt16LE(16);
const protocolVersion = buffer.readUInt16LE(18);
const majorVersion = protocolVersion >> 4;
if ((majorVersion === 1) && (buffer.length >= 26)) {
const command = buffer.readUInt16LE(20);
const dataLength = buffer.readUInt16LE(22);
if (buffer.length >= 26 + dataLength) {
const frameCount = Math.floor(dataLength / 4);
const frameData = Buffer.alloc(127 * 4);
buffer.copy(frameData, 0, 26, 26 + dataLength);
const header = new Packet({
timestamp: new Date(timestamp),
channel: this.currentChannel,
destinationAddress,
sourceAddress,
command,
frameCount,
frameData,
dontCopyFrameData: true,
});
if (this.headerSet) {
this.headerSet.addHeader(header);
}
this.emit('header', header);
}
} else if ((majorVersion === 2) && (buffer.length === 32)) {
const command = buffer.readUInt16LE(20);
const dataLength = buffer.readUInt16LE(22);
if (dataLength === 6) {
const valueId = buffer.readInt16LE(26);
const value = buffer.readInt32LE(28);
const header = new Datagram({
timestamp: new Date(timestamp),
channel: this.currentChannel,
destinationAddress,
sourceAddress,
command,
valueId,
value,
});
this.emit('header', header);
}
} else if ((majorVersion === 3) && (buffer.length >= 26)) {
const command = buffer [20];
const dataLength = buffer.readUInt16LE(22);
if (buffer.length >= 26 + dataLength) {
const frameCount = Math.floor(dataLength / 7);
const frameData = Buffer.alloc(3 * 7);
buffer.copy(frameData, 0, 26, 26 + dataLength);
const header = new Telegram({
timestamp: new Date(timestamp),
channel: this.currentChannel,
destinationAddress,
sourceAddress,
command,
frameCount,
frameData,
dontCopyFrameData: true,
});
this.emit('header', header);
}
}
} else if ((type === 7) && (buffer.length >= 16)) {
this.currentChannel = buffer [14];
} else if ((type === 8) && (buffer.length >= 22)) {
const endTimestamp = moreints.readUInt64LE(buffer, 14);
const rawBuffer = Buffer.alloc(buffer.length - 22);
buffer.copy(rawBuffer, 0, 22, buffer.length);
this.emit('rawData', {
startTimestamp: new Date(timestamp),
endTimestamp: new Date(endTimestamp),
channel: this.currentChannel,
buffer: rawBuffer,
});
} else if ((type === 9) && (buffer.length >= 14)) {
const comment = buffer.slice(14).toString();
this.emit('comment', {
timestamp: new Date(timestamp),
comment,
});
}
}
_processType3Record(buffer, timestamp) {
const destinationAddress = buffer.readUInt16LE(14);
const sourceAddress = buffer.readUInt16LE(16);
const protocolVersion = buffer.readUInt16LE(18);
const majorVersion = protocolVersion >> 4;
if ((majorVersion === 1) && (buffer.length >= 26)) {
const command = buffer.readUInt16LE(20);
const dataLength = buffer.readUInt16LE(22);
if (buffer.length >= 26 + dataLength) {
const frameCount = Math.floor(dataLength / 4);
const frameData = Buffer.alloc(127 * 4);
buffer.copy(frameData, 0, 26, 26 + dataLength);
const header = new Packet({
timestamp: new Date(timestamp),
channel: this.currentChannel,
destinationAddress,
sourceAddress,
command,
frameCount,
frameData,
dontCopyFrameData: true,
});
if (destinationAddress === 0x0010) {
this._emitHeaderSet();
} else if (this.headerSet && this.headerSet.containsHeader(header)) {
this._emitHeaderSet();
}
if (!this.headerSet) {
this.headerSet = new HeaderSet();
this.headerSet.timestamp = header.timestamp;
}
this.headerSet.addHeader(header);
this.emit('header', header);
}
}
}
_emitHeaderSet() {
if (this.headerSet) {
if (this.headerSetTimestamp) {
this.headerSet.timestamp = this.headerSetTimestamp;
}
this.emit('headerSet', this.headerSet);
this.headerSet = null;
}
}
_processRecordForTopologyScan(buffer) {
const type = buffer [1] & 0x0F;
let destinationAddress = 0, sourceAddress = 0, protocolVersion = 0, command = 0, hasHeader = false;
if (((type === 3) || (type === 6)) && (buffer.length >= 20)) {
destinationAddress = buffer.readUInt16LE(14);
sourceAddress = buffer.readUInt16LE(16);
protocolVersion = buffer.readUInt16LE(18);
const majorVersion = protocolVersion >> 4;
if ((majorVersion === 1) && (buffer.length >= 26)) {
command = buffer.readUInt16LE(20);
hasHeader = true;
}
} else if (type === 4) {
this.currentChannel = 0;
} else if ((type === 7) && (buffer.length >= 16)) {
this.currentChannel = buffer [14];
} else if ((type === 8) && (buffer.length >= 22)) {
const startTimestamp = moreints.readUInt64LE(buffer, 6);
const endTimestamp = moreints.readUInt64LE(buffer, 14);
const rawBuffer = Buffer.alloc(buffer.length - 22);
buffer.copy(rawBuffer, 0, 22, buffer.length);
this.emit('rawData', {
startTimestamp: new Date(startTimestamp),
endTimestamp: new Date(endTimestamp),
channel: this.currentChannel,
buffer: rawBuffer,
});
}
if (hasHeader) {
const headerIdBuffer = Buffer.alloc(8);
headerIdBuffer [0] = this.currentChannel;
headerIdBuffer.writeUInt16LE(destinationAddress, 1);
headerIdBuffer.writeUInt16LE(sourceAddress, 3);
headerIdBuffer [5] = protocolVersion;
headerIdBuffer.writeUInt16LE(command, 6);
const headerId = headerIdBuffer.toString('hex');
this.knownHeaderIds [headerId] = true;
}
}
_constructTopologyHeaderSet() {
const headerSet = new HeaderSet();
const timestamp = new Date(0);
for (const headerId of Object.getOwnPropertyNames(this.knownHeaderIds)) {
const headerIdBuffer = Buffer.from(headerId, 'hex');
const channel = headerIdBuffer [0];
const destinationAddress = headerIdBuffer.readUInt16LE(1);
const sourceAddress = headerIdBuffer.readUInt16LE(3);
const protocolVersion = headerIdBuffer [5];
const command = headerIdBuffer.readUInt16LE(6);
const majorVersion = (protocolVersion >> 4);
/* istanbul ignore else */
if (majorVersion === 1) {
const packet = new Packet({
timestamp,
channel,
destinationAddress,
sourceAddress,
command,
frameCount: 0,
});
headerSet.addHeader(packet);
} else {
throw new Error('Unsupported major version');
}
}
headerSet.timestamp = timestamp;
this.headerSet = headerSet;
}
}
Object.assign(VBusRecordingConverter.prototype, /** @lends VBusRecordingConverter.prototype */ {
topologyScanOnly: false,
rxBuffer: null,
headerSet: null,
headerSetTimestamp: null,
currentChannel: 0,
knownHeaderIds: null,
});
module.exports = VBusRecordingConverter;