lib/JobService.js
const assignIn = require('lodash.assignin');
const { sorter, AdapterService } = require('@feathersjs/adapter-commons');
const errors = require('@feathersjs/errors');
const sift = require('sift').default;
const { configureJob, transformError, transformJob } = require('./utils');
class JobService extends AdapterService {
constructor (options = {}) {
super(assignIn({
id: 'id',
matcher: sift,
sorter,
paginate: {
max: 25
},
events: ['succeeded', 'retrying', 'failed', 'progress']
}, options));
}
setup (app, path) {
if (this.options.events.indexOf('succeeded') > -1) {
this.queue.on('job succeeded',
(id, result) => this.emit('succeeded', { id, result })
);
}
if (this.options.events.indexOf('retrying') > -1) {
this.queue.on('job retrying',
(id, error) => this.emit('retrying', { id, error })
);
}
if (this.options.events.indexOf('failed') > -1) {
this.queue.on('job failed',
(id, error) => this.emit('failed', { id, error })
);
}
if (this.options.events.indexOf('progress') > -1) {
this.queue.on('job progress',
(id, progress) => this.emit('progress', { id, progress })
);
}
}
get queue () {
return this.options.queue;
}
async _find (params = {}) {
const { query, filters, paginate } = this.filterQuery(params);
const page = {};
const status = query.status || 'waiting';
switch (status) {
case 'waiting':
case 'active':
case 'delayed':
page.start = 0;
page.end = this.options.paginate.max;
break;
case 'failed':
case 'succeeded':
page.size = this.options.paginate.max;
break;
default:
throw new errors.BadRequest('status is missing or invalid');
}
let jobs = [];
try {
jobs = await this.queue.getJobs(status, page);
} catch (e) {
throw transformError(e);
}
const result = {
total: jobs.length,
limit: filters.$limit,
skip: filters.$skip || 0,
data: jobs.map(job => transformJob(job, params, this.id))
.filter(this.options.matcher(query))
};
if (filters.$sort !== undefined) {
result.data = result.data.sort(this.options.sorter(filters.$sort));
}
if (filters.$skip !== undefined) {
result.data = result.data.slice(filters.$skip);
}
if (filters.$limit !== undefined) {
result.data = result.data.slice(0, filters.$limit);
}
return paginate && paginate.default
? result
: result.data;
}
async _get (id, params = {}) {
const { query } = this.filterQuery(params);
try {
const job = await this.queue.getJob(id);
if (!job) {
throw new errors.NotFound();
}
const value = transformJob(job, params, this.id);
if (this.options.matcher(query)(value)) {
return value;
} else {
throw new errors.NotFound();
}
} catch (e) {
throw transformError(e);
}
}
async _create (data, params = {}) {
if (Array.isArray(data)) {
return Promise.all(data.map(current => this._create(current, params)));
}
try {
const job = this.queue.createJob(data);
configureJob(job, params);
await job.save();
return transformJob(job, params, this.id);
} catch (e) {
throw transformError(e);
}
}
_update (id, data, params = {}) {
throw new errors.NotImplemented('Update operations are not supported');
}
_patch (id, data, params = {}) {
throw new errors.NotImplemented('Patch operations are not supported');
}
async _remove (id, params = {}) {
if (id === null) {
const { query } = this.filterQuery(params);
const jobs = await this._find({
...params,
paginate: false,
query
});
return Promise.all(jobs.map(
current => this._remove(current[this.id], params))
);
}
try {
const job = await this._get(id, params);
await this.queue.removeJob(id);
return job;
} catch (e) {
throw transformError(e);
}
}
}
module.exports = JobService;