schemas/Import.js
"use strict";
const async = require("async");
const models = require("../lib/models");
const db = require("../lib/db");
const config = require("../lib/config");
const Import = new db.schema({
// An ID for the import, based on the source and time
_id: String,
// The date that this batch was created
created: {
type: Date,
default: Date.now,
},
// The date that this batch was updated
modified: {
type: Date,
},
// The source that the image is associated with
source: {
type: String,
required: true,
},
// The state of the batch upload
state: {
type: String,
required: true,
default: "started",
},
// An error message, if the state is set to "error"
error: "String",
// The results of the import
results: [{}],
});
Import.methods = {
getSource() {
return models("Source").getSource(this.source);
},
saveState(state, callback) {
this.state = state;
this.save(callback);
},
getCurState() {
return this.getStates().find((state) => state.id === this.state);
},
getNextState() {
const states = this.getStates();
return states[states.indexOf(this.getCurState()) + 1];
},
getStateName(req) {
const curState = this.getCurState();
return curState ? curState.name(req) :
req.format(req.gettext("Error: %(error)s"),
{error: this.getError(req)});
},
canAdvance() {
const curState = this.getCurState();
if (!curState) {
return false;
}
return !!curState.advance;
},
advance(callback) {
const state = this.getCurState();
const nextState = this.getNextState();
if (!this.canAdvance()) {
return process.nextTick(callback);
}
this.saveState(nextState.id, (err) => {
/* istanbul ignore if */
if (err) {
return callback(err);
}
state.advance(this, (err) => {
// If there was an error then we save the error message
// and set the state of the batch to "error" to avoid
// retries.
if (err) {
this.error = err.message;
return this.saveState("error", callback);
}
// Advance to the next state
const nextState = this.getNextState();
if (nextState) {
this.markModified("results");
this.saveState(nextState.id, callback);
} else {
callback();
}
});
});
},
};
Import.statics = {
advance(callback) {
this.find({
state: {
$nin: ["completed", "error"],
},
}, "_id state", {}, (err, batches) => {
if (err || !batches || batches.length === 0) {
return callback(err);
}
const queues = {};
batches
.filter((batch) => batch.canAdvance())
.forEach((batch) => {
if (!queues[batch.state]) {
queues[batch.state] = [];
}
queues[batch.state].push(batch);
});
// Run all the queues in parallel
async.each(Object.keys(queues), (queueName, callback) => {
const queue = queues[queueName];
// But do each queue in series
async.eachLimit(queue, 1, (batch, callback) => {
// We now load the complete batch with all fields intact
this.findById(batch._id, (err, batch) => {
/* istanbul ignore if */
if (config.NODE_ENV !== "test") {
console.log(`Advancing ${batch._id} to ` +
`${batch.getNextState().id}...`);
}
batch.advance(callback);
});
}, callback);
}, callback);
});
},
};
Import.pre("validate", function(next) {
// Create the ID if one hasn't been set before
if (!this._id) {
this._id = `${this.source}/${Date.now()}`;
}
next();
});
/* istanbul ignore next */
Import.pre("save", function(next) {
// Always updated the modified time on every save
this.modified = new Date();
next();
});
module.exports = Import;