jeresig/pharos-images

View on GitHub
schemas/Import.js

Summary

Maintainability
A
1 hr
Test Coverage
"use strict";

const async = require("async");

const models = require("../lib/models");
const db = require("../lib/db");
const config = require("../lib/config");

const Import = new db.schema({
    // An ID for the import, based on the source and time
    _id: String,

    // The date that this batch was created
    created: {
        type: Date,
        default: Date.now,
    },

    // The date that this batch was updated
    modified: {
        type: Date,
    },

    // The source that the image is associated with
    source: {
        type: String,
        required: true,
    },

    // The state of the batch upload
    state: {
        type: String,
        required: true,
        default: "started",
    },

    // An error message, if the state is set to "error"
    error: "String",

    // The results of the import
    results: [{}],
});

Import.methods = {
    getSource() {
        return models("Source").getSource(this.source);
    },

    saveState(state, callback) {
        this.state = state;
        this.save(callback);
    },

    getCurState() {
        return this.getStates().find((state) => state.id === this.state);
    },

    getNextState() {
        const states = this.getStates();
        return states[states.indexOf(this.getCurState()) + 1];
    },

    getStateName(req) {
        const curState = this.getCurState();
        return curState ? curState.name(req) :
            req.format(req.gettext("Error: %(error)s"),
                {error: this.getError(req)});
    },

    canAdvance() {
        const curState = this.getCurState();
        if (!curState) {
            return false;
        }
        return !!curState.advance;
    },

    advance(callback) {
        const state = this.getCurState();
        const nextState = this.getNextState();

        if (!this.canAdvance()) {
            return process.nextTick(callback);
        }

        this.saveState(nextState.id, (err) => {
            /* istanbul ignore if */
            if (err) {
                return callback(err);
            }

            state.advance(this, (err) => {
                // If there was an error then we save the error message
                // and set the state of the batch to "error" to avoid
                // retries.
                if (err) {
                    this.error = err.message;
                    return this.saveState("error", callback);
                }

                // Advance to the next state
                const nextState = this.getNextState();
                if (nextState) {
                    this.markModified("results");
                    this.saveState(nextState.id, callback);
                } else {
                    callback();
                }
            });
        });
    },
};

Import.statics = {
    advance(callback) {
        this.find({
            state: {
                $nin: ["completed", "error"],
            },
        }, "_id state", {}, (err, batches) => {
            if (err || !batches || batches.length === 0) {
                return callback(err);
            }

            const queues = {};

            batches
                .filter((batch) => batch.canAdvance())
                .forEach((batch) => {
                    if (!queues[batch.state]) {
                        queues[batch.state] = [];
                    }

                    queues[batch.state].push(batch);
                });

            // Run all the queues in parallel
            async.each(Object.keys(queues), (queueName, callback) => {
                const queue = queues[queueName];

                // But do each queue in series
                async.eachLimit(queue, 1, (batch, callback) => {
                    // We now load the complete batch with all fields intact
                    this.findById(batch._id, (err, batch) => {
                        /* istanbul ignore if */
                        if (config.NODE_ENV !== "test") {
                            console.log(`Advancing ${batch._id} to ` +
                                `${batch.getNextState().id}...`);
                        }
                        batch.advance(callback);
                    });
                }, callback);
            }, callback);
        });
    },
};

Import.pre("validate", function(next) {
    // Create the ID if one hasn't been set before
    if (!this._id) {
        this._id = `${this.source}/${Date.now()}`;
    }

    next();
});

/* istanbul ignore next */
Import.pre("save", function(next) {
    // Always updated the modified time on every save
    this.modified = new Date();

    next();
});

module.exports = Import;