ezpaarse-project/ezpaarse

View on GitHub
lib/lineprocessor.js

Summary

Maintainability
F
3 days
Test Coverage
'use strict';

const EventEmitter = require('events').EventEmitter;
const util         = require('util');
const co           = require('co');
const crypto       = require('crypto');
const Organizer    = require('./organizer.js');

/**
 * Create a Line Processor
 * Handle the process chain, from line parsing up to the final EC
 */
const LinesProcessor = function (job) {
  let ecNumber   = 0;
  let nbBadLines = 0;
  let firstLine  = true;
  const emit = this.emit.bind(this);

  if (!job) {
    this.ended = true;
    return this.emit('end');
  }

  const logParser   = job.logParser;
  const ecOrganizer = new Organizer();

  ecOrganizer.on('ec', ec => {
    this.emit('ec', ec);
    // count masters values for reporting
    if (!job.report.get('stats', 'platform-' + ec.platform)) {
      job.report.inc('stats', 'platforms');
    }
    job.report.inc('stats', 'platform-' + ec.platform);
    if (ec.rtype) { job.report.inc('stats', 'rtype-' + ec.rtype); }
    if (ec.mime)  { job.report.inc('stats', 'mime-' + ec.mime); }
  });

  ecOrganizer.on('drain', () => {
    job.logger?.verbose('EC organizer drained');
    this.end();
  });

  this.end = function () {
    if (!this.ended) {
      this.ended = true;
      this.emit('end');
    }
  };

  /**
   * Parse a line and push the resulting EC into the enhancement process (if valid)
   * @param  {String} line
   */
  this.push = function processLine(line) {
    let isObjectSource = (typeof line === 'object');
    let ec;

    if (isObjectSource) {
      ec = line;
      line = JSON.stringify(line);
    }

    if (firstLine) {
      firstLine = false;
      job.report.set('general', 'input-first-line', line);
    }
    if (!job.parsedLines && nbBadLines >= job.maxParseAttempts) { return; }

    job.report.inc('general', 'nb-lines-input');

    line = line.replace(/\r$/, '');

    if (!line) { return; }
    if (!ec) {
      // If an EC is already defined, we got a CSV source
      ec = logParser.parse(line, {
        ignoreUrl: true,
        ignoreDate: true
      });
    }

    if (!ec) {
      job.logStreams.write('unknown-formats', line + '\n');
      job.report.inc('rejets', 'nb-lines-unknown-formats');

      if (!job.parsedLines) {
        nbBadLines++;
        job.logger?.warn(`Line #${nbBadLines} not recognized`);

        if (nbBadLines >= job.maxParseAttempts) {
          job.logger?.warn(`The first ${job.maxParseAttempts} line(s) couldn't be parsed`);
          job._stop(job.error(4003, 400));
        }
      }
      return;
    }

    if (!job.parsedLines) {
      if (!isObjectSource) {
        job.report.set('general', 'input-format-proxy',
          logParser.getProxy() || 'none, auto-recognition failed');
        job.report.set('general', 'input-format-literal',
          logParser.getFormat() || 'none, auto-recognition failed');
        job.report.set('general', 'input-format-regex',
          logParser.getRegexp(true) || 'none, bad format given or auto-recognition failed');

        // Add or remove user fields from those extracted by the log parser
        // We can't do it before because we need to process one line to autodetect the format
        job.addOutputFields(logParser.getFields());
      } else {
        job.addOutputFields(job.csvColumns);
      }
    }

    job.parsedLines = true;

    if (!ec['log_id']) {
      ec['log_id'] = crypto.createHash('sha1').update(line).digest('hex');
    }

    // Versions
    if (!ec['ezpaarse_version']) {
      ec['ezpaarse_version'] = job.report.get('general', 'git-tag');
    }
    if (!ec['ezpaarse_date']) {
      ec['ezpaarse_date'] = job.report.get('general', 'git-date');
    }

    if (!ec['middlewares_version']) {
      ec['middlewares_version'] = job.report.get('general', 'middlewares-version');
    }
    if (!ec['middlewares_date']) {
      ec['middlewares_date'] = job.report.get('general', 'middlewares-date');
    }

    if (!ec['platforms_version']) {
      ec['platforms_version'] = job.report.get('general', 'platforms-version');
    }
    if (!ec['platforms_date']) {
      ec['platforms_date'] = job.report.get('general', 'platforms-date');
    }

    if (!ec['middlewares']) {
      ec['middlewares'] = job.report.get('general', 'middlewares');
    }

    logParser.parseUrl(ec);
    logParser.parseDate(ec, { ignoreFormat: isObjectSource });

    Object.defineProperty(ec, '_meta', {
      value: {
        originalLine: line,
        lineNumber: ++ecNumber,
      },
      writable: true,
      enumerable: false,
      configurable: false
    });

    co(function* () {
      for (const mw of job.middlewares) {
        if (this.ended) { break; }

        yield new Promise((resolve, reject) => {
          mw.process(ec, err => {
            if (err) { reject(err); }
            else { resolve(); }
          });
        });
      }

      if (ec._meta.granted === false) {
        emit('denied', ec);
        job.logger?.silly(`Organizer: skipping EC #${ec._meta.lineNumber}`);
        ecOrganizer.skip(ec._meta.lineNumber);
      } else {
        job.logger?.silly(`Organizer: pushing EC #${ec._meta.lineNumber}`);
        ecOrganizer.push(ec);
      }
    }).catch(err => handleError(err, ec));
  };

  this.drain = function () {
    ecOrganizer.setLast(ecNumber);
    job.logger?.verbose(`Organizer: last EC number is ${ecNumber}`);

    job.logger?.info('Terminating middlewares...');

    co(function* () {
      for (const mw of job.middlewares) {
        job.logger?.verbose(`Sending termination signal to ${mw.name}`);

        yield new Promise((resolve, reject) => {
          mw.process(null, err => {
            if (err) {
              job.logger?.error(`${mw.name} failed to terminate`);
              reject(err);
            } else {
              job.logger?.verbose(`${mw.name} terminated`);
              resolve();
            }
          });
        });
      }

      job.logger?.info('All middlewares terminated');
    });
  };

  function handleError(err, ec) {
    ecOrganizer.skip(ec._meta.lineNumber);
    job.logger?.silly(`Organizer: skipping EC #${ec._meta.lineNumber}`);

    switch (err.type) {
    case 'ECLEAN':
      emit('ec', ec);
      break;
    case 'ENOPARSER':
      job.notifiers['unknown-domains'].increment(ec.domain);
      job.logStreams.write('unknown-domains', ec._meta.originalLine + '\n');
      job.report.inc('rejets', 'nb-lines-unknown-domains');
      break;
    case 'ECHRONO':
      job.logger?.verbose('A log line is not chronological : ' + ec._meta.originalLine);
      job.report.inc('rejets', 'nb-lines-unordered-ecs');
      job.logStreams.write('unordered-ecs', ec._meta.originalLine + '\n');
      break;
    case 'EDUPLICATE':
      job.report.inc('rejets', 'nb-lines-duplicate-ecs');
      job.logStreams.write('duplicate-ecs', ec._meta.originalLine + '\n');
      break;
    case 'EIRRELEVANT':
      job.report.inc('rejets', 'nb-lines-ignored');
      job.logStreams.write('filtered-ecs', ec._meta.originalLine + '\n');
      break;
    case 'EIGNOREDDOMAIN':
      job.report.inc('rejets', 'nb-lines-ignored-domains');
      job.logStreams.write('ignored-domains', ec._meta.originalLine + '\n');
      break;
    case 'EIGNOREDHOST':
      job.report.inc('rejets', 'nb-lines-ignored-hosts');
      job.logStreams.write('ignored-hosts', ec._meta.originalLine + '\n');
      break;
    case 'EROBOT':
      job.report.inc('rejets', 'nb-lines-robots-ecs');
      job.logStreams.write('robots-ecs', ec._meta.originalLine + '\n');
      break;
    case 'ENOTQUALIFIED':
      if (!ec._meta.denied) {
        job.report.inc('rejets', 'nb-lines-unqualified-ecs');
        job.logStreams.write('unqualified-ecs', ec._meta.originalLine + '\n');
      }
      break;
    default:
      job.logger?.error(`[Error][EC#${ec._meta.lineNumber}] ${err.stack}`);
      job.report.inc('rejets', 'nb-lines-unknown-errors');
      job.logStreams.write('unknown-errors', ec._meta.originalLine + '\n');
      break;
    }
  }
};

util.inherits(LinesProcessor, EventEmitter);
module.exports = LinesProcessor;