kristok/node-pgq

View on GitHub
lib/eventbatch.js

Summary

Maintainability
A
1 hr
Test Coverage
var _ = require('lodash'),
    Event = require('./event');

var EventBatch = function(consumer) {
    this.consumer = consumer;
    this.queueName = consumer.config.source.queue;
    this.ids = {};
    this.events = [];
    this.dbapi = consumer.dbapi;
    this.log = consumer.log;

    return this;
};

EventBatch.prototype.allEventsProcessed = function(callback) {
    var self = this,
        newErr;


    this.dbapi.finishBatch(this.batchId, function(err, data) {
        if (err) {
            self.consumer.emit('error', err);
        } else if (!_.has(data, 'finish_batch')) {
            err = new Error('database did not respond with finish_batch');
            self.consumer.emit('error', newErr);
        } else {
            self.consumer.emit('batchProcessed');
        }
        if (callback && _.isFunction(callback)) callback(err);
    });
};

EventBatch.prototype.markEventProcessed = function(eventId, state, callback) {
    this.ids[eventId] = Event[state];
    this.unchangedIds--;
    if (this.unchangedIds === 0) {
        this.allEventsProcessed(callback);
    } else {
        if (callback && _.isFunction(callback)) callback();
    }
};

EventBatch.prototype.tagDone = function(eventId, callback) {
    this.markEventProcessed(eventId, Event.DONE, callback);
};

EventBatch.prototype.tagRetrySeconds = function(eventId, delay, callback) {
    var self = this;
    this.tagRetry(
        self.dbapi.eventTagRetrySeconds,
        eventId,
        delay,
        callback
    );
};

EventBatch.prototype.failBatch = function() {
    var err = new Error('failing batch due to event.failBatch');
    this.consumer.emit('error', err);
};

EventBatch.prototype.tagRetryTimestamp = function(eventId, reinsertTimestamp, callback) {
    var self = this;
    this.tagRetry(
        self.dbapi.eventTagRetryTimestamp,
        eventId,
        reinsertTimestamp,
        callback
    );
};

EventBatch.prototype.tagRetry = function(APIMethod, eventId, delay, callback) {
    var self = this;

    var callOrEmit = function(callback, err) {
        if (callback && _.isFunction(callback)) {
            callback(err);
        } else {
            self.consumer.emit('error', err);
        }
    };

    APIMethod(this.batchId, eventId, delay, function(err) {
        if (err) {
            callOrEmit(callback, err);
        } else {
            self.markEventProcessed(eventId, Event.RETRY, function(err) {
                callOrEmit(callback, err);
            });
        }
    });
};

EventBatch.prototype.load = function(batchId, callback) {
    var self = this;

    self.batchId = batchId;
    self.dbapi.loadBatchEvents({
        queueName: self.queueName,
        consumerName: self.consumerName,
        batchId: batchId
    }, function(err, rawBatch) {
        if (err) return callback(err);

        self.unchangedIds = rawBatch.length;

        _.each(rawBatch, function(eventDataRow) {
            var builtEvent = new Event(
                self,
                self.queueName,
                self.batchId,
                eventDataRow
            );
            self.ids[builtEvent.id] = builtEvent.state;
            self.events.push(builtEvent);
        });

        callback(null, batchId, self.events);
    });
};

module.exports = function(consumer) {
    return new EventBatch(consumer);
};