index.js
'use strict';
if (process.env.COVERAGE) require('blanket');
const events = require('events');
const fs = require('fs');
const path = require('path');
const zlib = require('zlib');
const EOL = require('os').EOL;
const logger = require('./lib/logger');
const Bookmark = require('./lib/bookmark');
const Splitter = require('./lib/line-splitter');
class Reader extends events.EventEmitter {
constructor (fileOrPath, options) {
super()
// the file we're reading lines from
this.filePath = path.resolve(fileOrPath);
this.isArchive = false;
this.sawEndOfFile = false;
this.canUseBookmarkBytes = false;
this.startBytes = 0;
this.watchDelay = process.env.NODE_ENV === 'test' ? 100 : 2000;
this.applyOptions(options)
this.bookmark = new Bookmark(this.bookmarkDir)
this.resetPosition()
this.startReader()
}
applyOptions (options) {
if (!options) options = { bookmark: { } };
this.watchOpts = { persistent: true, recursive: false };
this.encoding = options.encoding || 'utf8';
this.noBookmark = options.noBookmark || false;
this.bookmarkDir = options.bookmark.dir || path.resolve('./', '.bookmark');
this.batch = { count: 0, limit: 0, delay: 0 };
if (options.batchLimit) this.batch.limit = options.batchLimit;
if (options.batchDelay) this.batch.delay = options.batchDelay;
if (options.watchDelay) this.watchDelay = options.watchDelay * 1000;
}
resetPosition () {
this.lines = { start: 0, position: 0, skip: 0 };
this.bytesOffset = 0;
}
startReader () {
// does the log file exist?
fs.stat(this.filePath, (err, stat) => {
if (err) {
if (err.code === 'ENOENT') {
logger.info(`watching for ${this.filePath} to appear`);
return this.watch(this.filePath);
}
logger.error(err);
return;
}
this.createStream(); // a Transform stream
})
}
notifyAndWatch () {
this.emit('drain', (err, delay) => { this.batchSaveDone(err, delay); })
this.emit('end');
this.watch(this.filePath);
}
endStream () {
logger.info(`end of ${this.filePath}`);
if (this.sawEndOfFile) {
logger.debug('endStream: dampening extra EOF');
return;
}
this.sawEndOfFile = true;
this.canUseBookmarkBytes = true;
this.linesAtEndOfFile = this.lines.position;
this.notifyAndWatch();
}
readLine () {
this.canUseBookmarkBytes = false;
if (this.alreadyRead()) return;
if (this.batchIsFull()) return;
const line = this.liner.read();
// console.log(`\treadLine from ${path.basename(this.filePath)}: ${line}`)
if (line === null) return; // EOF
this.batch.count++;
this.lines.position++;
if (line) this.bytesOffset += (line.length + EOL.length);
this.emit('read', line, this.lines.position);
}
alreadyRead () {
if (this.lines.start && this.lines.position < this.lines.start) {
this.lines.skip++;
const line = this.liner.read();
if (line) this.bytesOffset += (line.length + EOL.length);
this.lines.position++;
return true;
}
if (this.lines.skip) {
logger.info(`\tskipped ${this.lines.skip} lines`);
this.lines.skip = 0;
}
return false;
}
batchIsFull () {
if (!this.batch.limit) return;
if (this.batch.count < this.batch.limit) return;
logger.debug(`batchIsFull, limit: ${this.batch.limit} count: ${this.batch.count}`);
process.nextTick(() => {
this.emit('drain', (err, delay) => {
this.batchSaveDone(err, delay);
})
})
return true;
}
batchSaveDone (err, delay) {
const saveArgs = {
file: this.filePath,
lines: this.lines.position,
bytes: this.bytesOffset,
}
this.bookmark.save(saveArgs, (err) => {
if (err) {
logger.error(err);
logger.error('bookmark save failed, halting');
return;
}
this.batch.count = 0;
if (!this.liner.readable) return;
// the log shipper can ask us to wait 'delay' seconds before
// emitting the next batch. This is useful as a backoff mechanism.
if (isNaN(delay)) delay = this.batch.delay;
if (delay) console.log(`\t\tpause ${delay} seconds`);
setTimeout(() => {
for (let i=0; i<=this.batch.limit; i++) {
if (this.liner.readable) this.readLine();
}
}, delay * 1000);
})
}
createStream () {
// entered when:
// new startup
// after EOF, when fs.watch saw a change
//
// with transform streams, files/archives are closed automatically
// at EOF. Reset the line position upon (re)open.
this.resetPosition();
// splitters are gone after EOF. Start a new one
this.lineSplitter();
this.bookmark.read(this.filePath, (err, mark) => {
if (err && err.code !== 'ENOENT') {
logger.error(`Error reading bookmark: ${err.message}`);
return;
}
if (/\.gz$/.test(this.filePath)) {
if (mark && mark.lines && !this.noBookmark) {
logger.debug('\tlines.start: ' + mark.lines);
this.lines.start = mark.lines;
}
return this.createStreamGzip();
}
// if (/\.bz2$/.test(this.filePath)) return this.createStreamBz2();
// options used only by plain text log files
const fileOpts = {
autoClose: true,
encoding: this.encoding,
}
if (mark && !this.noBookmark) {
// the only time byte position is safe is when we've read to EOF.
// Otherwise, the byte position contains buffered data that hasn't
// been emitted as lines.
// This is now saved in a separate variable
// the alternative to 'start' here, is splitting the entire file
// into lines (again) and counting lines. Avoid if possible.
if (this.canUseBookmarkBytes && mark.size) {
if (this.linesAtEndOfFile !== mark.lines) {
logger.error(`lines@EOF: ${this.linesAtEndOfFile}`);
logger.error(`mark.lines: ${mark.lines}`);
}
logger.info(`\tbytes.start: ${mark.size} (lines: ${mark.lines} )`);
fileOpts.start = mark.size;
this.lines.position = mark.lines;
this.bytesOffset = mark.size;
}
else if (mark.lines) {
logger.debug(`\tlines.start: ${mark.lines}`);
this.lines.start = mark.lines;
}
}
// we need to start fresh
this.sawEndOfFile = false;
logger.debug(`opening for read: ${this.filePath}`);
fs.createReadStream(this.filePath, fileOpts).pipe(this.liner);
})
}
createStreamGzip () {
this.isArchive = true;
fs.createReadStream(this.filePath).pipe(zlib.createGunzip()).pipe(this.liner);
}
createStreamBz2 () {
this.isArchive = true;
// ick. to use in pipe, compressjs has a node-gyp dep. I think I'd
// rather spawn a child process using CLI bunzip2. TODO
throw('no bzip2 support yet');
}
lineSplitter () {
this.liner = new Splitter({
encoding: this.encoding, // for archives
})
.on('readable', () => {
if (process.env.WANTS_SHUTDOWN) return; // cease reading
this.emit('testSetup')
this.readLine();
})
.on('end', () => {
this.endStream();
})
}
resolveAncestor (filePath, done) {
// fs apex, break recursion
if (filePath === '/') return done(null, filePath);
// walk up a directory tree until an existing one is found
fs.stat(filePath, (err, stat) => {
if (err) {
// logger.info('resolveAncestor: ' + err.code);
if (err.code === 'ENOENT') {
return this.resolveAncestor(path.dirname(filePath), done);
}
return done(err);
}
logger.debug(`\tresolveAncestor: ${filePath}`);
done(null, filePath);
})
}
watch (fileOrDir) {
// archives don't get appended, don't watch
if (this.isArchive) return;
this.resolveAncestor(fileOrDir, (err, existentPath) => {
if (err) return logger.error(err);
logger.info(`watching ${existentPath}`);
this.isWatchingParent = existentPath === fileOrDir ? false : true;
this.watcher = fs.watch(
existentPath,
this.watchOpts,
this.watchEvent.bind(this)
);
})
}
watchEvent (event, filename) {
logger.debug(`watcher saw ${event} on ${filename}`);
if (this.isWatchingParent) {
// make sure event filename matches
if (filename !== path.basename(this.filePath)) {
this.emit('irrelevantFile', filename);
return;
}
}
switch (event) {
case 'change':
this.watchChange(filename);
break;
case 'rename':
this.watchRename(filename);
break;
}
}
watchStop (filename) {
logger.debug(`stopping watching ${filename}`)
if (!this.watcher) return;
this.watcher.close();
this.watcher = null;
}
watchChange (filename) {
// Depending on underlying OS semantics, we can get multiple of these
// events in rapid succession. ignore subsequent.
if (!this.watcher) return;
this.watchStop(this.filePath);
// give the events a chance to settle
setTimeout(() => { this.createStream(); }, this.watchDelay);
}
watchRename (filename) {
// logger.info('\trename: ' + filename);
if (this.watcher) this.watchStop('');
switch (process.platform) {
case 'darwin':
this.renameMacOS(filename);
break;
case 'freebsd':
case 'linux':
this.renameLinux(filename);
break;
case 'win32':
this.renameLinux(filename);
break;
default:
// falls through
logger.error(`report this as GitHub Issue:\n\trename: ${filename} on ${process.platform}`)
}
}
renameLinux (filename) {
// we only get the source filename (foo.log), not dest
// what happened? (create, delete, move)
fs.stat(this.filePath, (err, stats) => {
if (err) {
if (err.code === 'ENOENT') { // mv or rm
this.lines.start = 0;
// watch for file to reappear
this.watch(this.filePath);
return;
}
logger.error(err);
}
logger.debug(stats);
setTimeout(() => {
this.createStream();
}, this.watchDelay);
})
}
renameMacOS (filename) {
this.lines.start = 0;
// log file just (re)appeared
if (filename === path.basename(this.filePath)) {
setTimeout(() => {
this.createStream();
}, this.watchDelay);
return;
}
// this.watch handles case when file does not exist
this.watch(this.filePath);
}
}
module.exports = {
createReader: function (filePath, options) {
return new Reader(filePath, options);
}
}