workers/uploader.js
const { Constants } = require('../core/constants');
const { Context } = require('../core/context');
const fs = require('fs');
const path = require('path');
const { OperationResult } = require('../core/operation_result');
const { PluginManager } = require('../plugins/plugin_manager');
const { StorageService } = require('../core/storage_service');
const { Util } = require('../core/util');
const { Worker } = require('./worker');
// This implementation is convoluted and messy...
/**
* The Uploader performs the upload operations for a job.
* It catches events from the underlying network plugin,
* wraps them in {@link JobStatus} objects, and writes them
* to STDOUT and STDERR to communicate with the parent process.
*
* param {Job} - The job to run. The job object must contain at
* at least one {@link UploadOperation}, or the upload will do
* nothing.
*
* @param {Job}
*
*/
class Uploader extends Worker {
constructor(job) {
super('upload')
this.job = job;
}
/**
* This runs all of the job's upload operations.
* It returns an array of promises. This runs all uploads
* in parallel.
*
* @returns {Array<Promise>}
*/
run() {
let uploader = this;
let errors = this.validateParams();
if (errors.length > 0) {
return Promise.all([new Promise(function(resolve, reject) {
reject(uploader.validationError(errors));
})]);
}
let promises = [];
for (let op of this.job.uploadOps) {
promises = promises.concat(this.doUpload(op));
}
return Promise.all(promises);
}
/**
* doUpload initiates a single upload operation and returns
* an array of promises. If a single operation includes multiple
* source files, it will upload them in parallel. This returns
* one promise per source file.
*
* Because each upload resets the exitCode, the caller should
* ignore the final exitCode and check the results of all of the
* promises instead. [This needs a more elegant fix.]
*
* @param {UploadOperation} - An upload operation containing one
* or more source files.
*
* @returns {Array<Promise>}
*/
doUpload(uploadOp) {
let uploader = this;
// Clear prior upload results. They may cause the UI to
// report successful upload as failed, or vice-versa.
uploadOp.results = [];
let ss = StorageService.find(uploadOp.storageServiceId);
if (!ss) {
uploadOp.results[0] = new OperationResult('upload', 'none');
uploadOp.results[0].start();
uploadOp.results[0].finish(Context.y18n.__('Cannot find StorageService record'));
return new Promise(function(resolve, reject) {
reject(uploader.validationError(uploadOp.results[0].errors));
});
}
let providerClass = this.getProvider(ss.protocol);
let promises = [];
for (let filepath of uploadOp.sourceFiles) {
let provider = new providerClass(ss);
var promise = new Promise(function(resolve, reject) {
let lastPercentComplete = 0;
provider.on('start', function(result) {
// Note: percentComplete is -1 because we don't
// yet have a way of getting that info.
uploader.info('start',
Constants.OP_IN_PROGRESS,
ss.name,
-1,
false);
});
provider.on('finish', function(result) {
uploadOp.results.push(result);
if (result.errors.length > 0) {
uploader.completedWithError(result.errors);
} else {
uploader.completedSuccess(ss.name, false);
}
resolve(result);
});
provider.on('error', function(result) {
// Reject causes the entire Promise.all chain
// to fail. We want to let other pending promises
// complete instead of stopping the chain. We will
// handle retries elsewhere.
uploadOp.results.push(result);
uploader.runtimeError('completed', result.errors);
resolve(result);
provider = null;
});
provider.on('warning', function(result) {
// Note: percentComplete is -1 because we don't
// yet have a way of getting that info.
uploader.info('upload', Constants.OP_IN_PROGRESS,
result.warning, -1, false);
});
provider.on('status', function(xfer) {
// Uploader reads faster than it writes, so fudge this.
// Only write for changes >= 1%, otherwise, on large
// uploads, this can write thousands of lines into the
// logs.
let pctComplete = xfer.percentComplete() * 0.985;
if (pctComplete - lastPercentComplete > 1) {
uploader.info('status', Constants.OP_IN_PROGRESS,
`${ss.name } - ${pctComplete.toFixed(2)}%`,
pctComplete, false);
lastPercentComplete = pctComplete;
}
});
});
promises.push(promise);
provider.upload(filepath, path.basename(filepath));
}
return promises;
}
/**
* This checks to ensure that the {@link UploadOperation} includes a
* target and at least one source file. It also ensures that the
* specified source files exist. It returns an array of strings decribing
* the validation errors.
*
* @return {Array<string>}
*/
validateParams() {
let errors = [];
for (let op of this.job.uploadOps) {
let opErrors = []
if (Util.isEmpty(op.storageServiceId)) {
opErrors.push(Context.y18n.__('Specify where you want to upload the file.'));
}
if (!op.sourceFiles || Util.isEmptyStringArray(op.sourceFiles)) {
opErrors.push(Context.y18n.__('Specify at least one file to upload.'));
}
for (let f of op.sourceFiles) {
if (!fs.existsSync(f)) {
opErrors.push(Context.y18n.__(`File to be uploaded does not exist: %s.`, f));
}
}
if (opErrors.length > 0) {
let providerDesc = 'Unknown upload provider';
try {
let ss = StorageService.find(op.storageServiceId);
let providerClass = this.getProvider(ss.protocol)
providerDesc = providerClass.description().name;
} catch (err) {
providerDesc += ` ${err.message}`;
}
let result = new OperationResult('upload', providerDesc);
result.start();
result.finish(opErrors.join(' '));
op.results.push(result);
errors = errors.concat(opErrors);
}
}
return errors;
}
/**
* This returns the first network provider plugin that implements the
* S3 protocol. If it can't find a plugin for the S3 protocol, it throws
* an exception.
*
* @returns {Plugin}
*/
getProvider(protocol) {
let providers = PluginManager.implementsProtocol(protocol);
if (providers.length == 0) {
throw `Cannot find a plugin that implements the ${protocol} protocol.`
}
return providers[0];
}
}
module.exports.Uploader = Uploader;