src/Storage/WritableStorage.js
const fs = require('fs');
const path = require('path');
const WritablePartition = require('../Partition/WritablePartition');
const WritableIndex = require('../Index/WritableIndex');
const ReadableStorage = require('./ReadableStorage');
const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata, ensureDirectory } = require('../util');
const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
const LOCK_RECLAIM = 0x1;
const LOCK_THROW = 0x2;
class StorageLockedError extends Error {}
/**
* @typedef {object|function(object):boolean} Matcher
*/
/**
* An append-only storage with highly performant positional range scans.
* It's highly optimized for an event-store and hence does not support compaction or data-rewrite, nor any querying
*/
class WritableStorage extends ReadableStorage {
/**
* @param {string} [storageName] The name of the storage.
* @param {object} [config] An object with storage parameters.
* @param {object} [config.serializer] A serializer object with methods serialize(document) and deserialize(data).
* @param {function(object): string} config.serializer.serialize Default is JSON.stringify.
* @param {function(string): object} config.serializer.deserialize Default is JSON.parse.
* @param {string} [config.dataDirectory] The path where the storage data should reside. Default '.'.
* @param {string} [config.indexDirectory] The path where the indexes should be stored. Defaults to dataDirectory.
* @param {string} [config.indexFile] The name of the primary index. Default '{storageName}.index'.
* @param {number} [config.readBufferSize] Size of the read buffer in bytes. Default 4096.
* @param {number} [config.writeBufferSize] Size of the write buffer in bytes. Default 16384.
* @param {number} [config.maxWriteBufferDocuments] How many documents to have in the write buffer at max. 0 means as much as possible. Default 0.
* @param {boolean} [config.syncOnFlush] If fsync should be called on write buffer flush. Set this if you need strict durability. Defaults to false.
* @param {boolean} [config.dirtyReads] If dirty reads should be allowed. This means that writes that are in write buffer but not yet flushed can be read. Defaults to true.
* @param {function(object, number): string} [config.partitioner] A function that takes a document and sequence number and returns a partition name that the document should be stored in. Defaults to write all documents to the primary partition.
* @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
* @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
* @param {number} [config.lock] One of LOCK_* constants that defines how an existing lock should be handled.
*/
constructor(storageName = 'storage', config = {}) {
if (typeof storageName !== 'string') {
config = storageName;
storageName = undefined;
}
const defaults = {
partitioner: (document, number) => '',
writeBufferSize: DEFAULT_WRITE_BUFFER_SIZE,
maxWriteBufferDocuments: 0,
syncOnFlush: false,
dirtyReads: true,
dataDirectory: '.'
};
config = Object.assign(defaults, config);
config.indexOptions = Object.assign({ syncOnFlush: config.syncOnFlush }, config.indexOptions);
ensureDirectory(config.dataDirectory);
super(storageName, config);
this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock');
if (config.lock === LOCK_RECLAIM) {
this.unlock();
}
this.partitioner = config.partitioner;
}
/**
* @inheritDoc
* @returns {boolean}
* @throws {StorageLockedError} If this storage is locked by another process.
*/
open() {
if (!this.lock()) {
return true;
}
return super.open();
}
/**
* Check all partitions torn writes and truncate the storage to the position before the first torn write.
* This might delete correctly written events in partitions, if their sequence number is higher than the
* torn write in another partition.
*/
checkTornWrites() {
let lastValidSequenceNumber = Number.MAX_SAFE_INTEGER;
this.forEachPartition(partition => {
partition.open();
const tornSequenceNumber = partition.checkTornWrite();
if (tornSequenceNumber >= 0) {
lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSequenceNumber);
}
});
if (lastValidSequenceNumber < Number.MAX_SAFE_INTEGER) {
this.truncate(lastValidSequenceNumber);
}
this.forEachPartition(partition => partition.close());
}
/**
* Attempt to lock this storage by means of a lock directory.
* @returns {boolean} True if the lock was created or false if the lock is already in place.
* @throws {StorageLockedError} If this storage is already locked by another process.
* @throws {Error} If the lock could not be created.
*/
lock() {
if (this.locked) {
return false;
}
try {
fs.mkdirSync(this.lockFile);
this.locked = true;
} catch (e) {
/* istanbul ignore if */
if (e.code !== 'EEXIST') {
throw new Error(`Error creating lock for storage ${this.storageFile}: ` + e.message);
}
throw new StorageLockedError(`Storage ${this.storageFile} is locked by another process`);
}
return true;
}
/**
* Unlock this storage, no matter if it was previously locked by this writer.
* Only use this if you are sure there is no other process still having a writer open.
* Current implementation just deletes a lock file that is named like the storage.
*/
unlock() {
if (fs.existsSync(this.lockFile)) {
if (!this.locked) {
this.checkTornWrites();
}
fs.rmdirSync(this.lockFile);
}
this.locked = false;
}
/**
* @inheritDoc
*/
close() {
if (this.locked) {
this.unlock();
}
super.close();
}
/**
* Add an index entry for the given document at the position and size.
*
* @private
* @param {number} partitionId The partition where the document is stored.
* @param {number} position The file offset where the document is stored.
* @param {number} size The size of the stored document.
* @param {object} document The document to add to the index.
* @param {function} [callback] The callback to call when the index is written to disk.
* @returns {EntryInterface} The index entry item.
*/
addIndex(partitionId, position, size, document, callback) {
if (!this.index.isOpen()) {
this.index.open();
}
/*if (this.index.lastEntry.position + this.index.lastEntry.size !== position) {
this.emit('index-corrupted');
throw new Error('Corrupted index, needs to be rebuilt!');
}*/
const entry = new WritableIndex.Entry(this.index.length + 1, position, size, partitionId);
this.index.add(entry, (indexPosition) => {
this.emit('wrote', document, entry, indexPosition);
/* istanbul ignore if */
if (typeof callback === 'function') {
return callback(indexPosition);
}
});
return entry;
}
/**
* Get a partition either by name or by id.
* If a partition with the given name does not exist, a new one will be created.
* If a partition with the given id does not exist, an error is thrown.
*
* @protected
* @param {string|number} partitionIdentifier The partition name or the partition Id
* @returns {ReadablePartition}
* @throws {Error} If an id is given and no such partition exists.
*/
getPartition(partitionIdentifier) {
if (typeof partitionIdentifier === 'string') {
const partitionName = this.storageFile + (partitionIdentifier.length ? '.' + partitionIdentifier : '');
partitionIdentifier = WritablePartition.idFor(partitionName);
if (!this.partitions[partitionIdentifier]) {
this.partitions[partitionIdentifier] = this.createPartition(partitionName, this.partitionConfig);
this.emit('partition-created', partitionIdentifier);
}
this.partitions[partitionIdentifier].open();
return this.partitions[partitionIdentifier];
}
return super.getPartition(partitionIdentifier);
}
/**
* @api
* @param {object} document The document to write to storage.
* @param {function} [callback] A function that will be called when the document is written to disk.
* @returns {number} The 1-based document sequence number in the storage.
*/
write(document, callback) {
const data = this.serializer.serialize(document).toString();
const dataSize = Buffer.byteLength(data, 'utf8');
const partitionName = this.partitioner(document, this.index.length + 1);
const partition = this.getPartition(partitionName);
const position = partition.write(data, this.length, callback);
assert(position !== false, 'Error writing document.');
const indexEntry = this.addIndex(partition.id, position, dataSize, document);
this.forEachSecondaryIndex((index, name) => {
if (!index.isOpen()) {
index.open();
}
index.add(indexEntry);
this.emit('index-add', name, index.length, document);
}, document);
return this.index.length;
}
/**
* Ensure that an index with the given name and document matcher exists.
* Will create the index if it doesn't exist, otherwise return the existing index.
*
* @api
* @param {string} name The index name.
* @param {Matcher} [matcher] An object that describes the document properties that need to match to add it this index or a function that receives a document and returns true if the document should be indexed.
* @returns {ReadableIndex} The index containing all documents that match the query.
* @throws {Error} if the index doesn't exist yet and no matcher was specified.
*/
ensureIndex(name, matcher) {
if (name === '_all') {
return this.index;
}
if (name in this.secondaryIndexes) {
return this.secondaryIndexes[name].index;
}
const indexName = this.storageFile + '.' + name + '.index';
if (fs.existsSync(path.join(this.indexDirectory, indexName))) {
return this.openIndex(name, matcher);
}
assert((typeof matcher === 'object' || typeof matcher === 'function') && matcher !== null, 'Need to specify a matcher.');
const metadata = buildMetadataForMatcher(matcher, this.hmac);
const { index } = this.createIndex(indexName, Object.assign({}, this.indexOptions, { metadata }));
try {
this.forEachDocument((document, indexEntry) => {
if (matches(document, matcher)) {
index.add(indexEntry);
}
});
} catch (e) {
index.destroy();
throw e;
}
this.secondaryIndexes[name] = { index, matcher };
this.emit('index-created', name);
return index;
}
/**
* Flush all write buffers to disk.
* This is a sync method and will invoke all previously registered flush callbacks.
*
* @api
* @returns {boolean} Returns true if a flush on any partition or the main index was executed.
*/
flush() {
let result = this.index.flush();
this.forEachPartition(partition => result = result | partition.flush());
this.forEachSecondaryIndex(index => index.flush());
return result;
}
/**
* Iterate all distinct partitions in which the given iterable list of entries are stored.
* @param {Iterable<Index.Entry>} entries
* @param {function(Index.Entry)} iterationHandler
*/
forEachDistinctPartitionOf(entries, iterationHandler) {
const partitions = [];
const numPartitions = Object.keys(this.partitions).length;
for (let entry of entries) {
if (partitions.indexOf(entry.partition) >= 0) {
continue;
}
partitions.push(entry.partition);
iterationHandler(entry);
if (partitions.length === numPartitions) {
break;
}
}
}
/**
* Truncate all partitions after the given (global) sequence number.
*
* @private
* @param {number} after The document sequence number to truncate after.
*/
truncatePartitions(after) {
if (after === 0) {
this.forEachPartition(partition => partition.truncate(0));
return;
}
const entries = this.index.range(after + 1); // We need the first entry that is cut off
if (entries === false || entries.length === 0) {
return;
}
this.forEachDistinctPartitionOf(entries, entry => this.getPartition(entry.partition).truncate(entry.position));
}
/**
* Truncate the storage after the given sequence number.
*
* @param {number} after The document sequence number to truncate after.
*/
truncate(after) {
/*
To truncate the store following steps need to be done:
1) find all partition positions after which their files should be truncated
2) truncate all partitions accordingly
3) truncate/rewrite all indexes
*/
if (!this.index.isOpen()) {
this.index.open();
}
this.truncatePartitions(after);
this.index.truncate(after);
this.forEachSecondaryIndex(index => {
/* istanbul ignore if */
if (!(index instanceof WritableIndex)) {
return;
}
let closeIndex = false;
if (!index.isOpen()) {
index.open();
closeIndex = true;
}
index.truncate(index.find(after));
if (closeIndex) {
index.close();
}
});
}
/**
* @protected
* @param {string} name
* @param {object} [options]
* @returns {{ index: WritableIndex, matcher: Matcher }}
*/
createIndex(name, options = {}) {
const index = new WritableIndex(name, options);
let matcher;
// If the index contains a matcher (possibly a serialized function) we check HMAC
// to prevent evaluating unknown code.
if (index.metadata.matcher) {
try {
matcher = buildMatcherFromMetadata(index.metadata, this.hmac);
} catch (e) {
index.destroy();
throw e;
}
}
return { index, matcher };
}
/**
* @protected
* @param {string} name
* @param {object} [config]
* @returns {WritablePartition}
*/
createPartition(name, config = {}) {
return new WritablePartition(name, config);
}
}
module.exports = WritableStorage;
module.exports.StorageLockedError = StorageLockedError;
module.exports.LOCK_THROW = LOCK_THROW;
module.exports.LOCK_RECLAIM = LOCK_RECLAIM;