app/src/services/queueSvc.js
/**
* @module QueueService
*
* Puts data on the queue for asynchronous processing.
* Uses a DataService to read and write data (including getting the email).
* Uses an EmailService to deliver the email.
*
* QueueListener will call the QueueService as events are fired in the queue
*
* @see DataService
* @see EmailService
* @see QueueConnection
* @see QueueListener
*
* @see Bull
*
* @exports QueueService
*/
const config = require('config');
const log = require('../components/log')(module.filename);
const { queueState } = require('../components/state');
const DataService = require('./dataSvc');
const EmailService = require('./emailSvc');
const QueueConnection = require('./queueConn');
const maxAttempts = Number(config.get('server.maxAttempts'));
class ClientMismatchError extends Error {
constructor(...args) {
super(...args);
Error.captureStackTrace(this, ClientMismatchError);
}
}
class DataIntegrityError extends Error {
constructor(...args) {
super(...args);
Error.captureStackTrace(this, DataIntegrityError);
}
}
class UncancellableError extends Error {
constructor(...args) {
super(...args);
Error.captureStackTrace(this, UncancellableError);
}
}
class UnpromotableError extends Error {
constructor(...args) {
super(...args);
Error.captureStackTrace(this, UnpromotableError);
}
}
class QueueService {
/**
* Creates a new QueueService with default connection, dataService and emailService.
* @class
*/
constructor() {
this.connection = new QueueConnection();
this.dataService = new DataService();
this.emailService = new EmailService();
}
/**
* @function connection
* Gets the current QueueConnection
*/
get connection() {
return this._connection;
}
/**
* @function connection
* Sets the current QueueConnection
* also sets the internal queue
* @param {object} v - the QueueConnection
*/
set connection(v) {
this._connection = v;
this._queue = this._connection.queue;
}
/**
* @function dataService
* Gets the current DataService
*/
get dataService() {
return this._dataService;
}
/**
* @function dataService
* Sets the current DataService
* @param {object} v - the DataService
*/
set dataService(v) {
this._dataService = v;
}
/**
* @function emailService
* Gets the current DataService
*/
get emailService() {
return this._emailService;
}
/**
* @function emailService
* Sets the current DataService
* @param {object} v - the EmailService
*/
set emailService(v) {
this._emailService = v;
}
/**
* @function queue
* Gets the current QueueConnection's queue
*/
get queue() {
return this._queue;
}
/**
* @function enqueue
* Adds a new job to the queue.
* Job contains enough information for the service to read and update data required for the job.
*
* @param {string} client - the client that owns the message
* @param {object} message - a Message object
* @param {object} opts - Bull opts, including setting delay time
*/
async enqueue(client, message, opts = {}) {
try {
const job = await this.queue.add({
client: client,
messageId: message.messageId
}, Object.assign(opts, {
jobId: message.messageId
}));
log.info(`Job ${job.id} enqueued`, { function: 'enqueue' });
} catch (e) {
e.message = 'Queue Error: ' + e.message;
throw e;
}
await this.dataService.updateStatus(client, message.messageId, queueState.ENQUEUED);
}
/**
* @function updateContent
* Update the persisted content (email message) for a Message.
* When jobs are completed or failed/errored, we want to remove the email content.
*
* @param {object} job - the queue job
*/
async updateContent(job) {
try {
if (job && job.data && job.data.messageId && job.data.client) {
// Only drop email content when completed or failed max number of retries
if (!job.failedReason || job.failedReason && job.attemptsMade >= maxAttempts) {
await this.dataService.deleteMessageEmail(job.data.client, job.data.messageId);
}
}
} catch (e) {
log.error(`Failed to update content for message ${job.id}. ${e.message}`, { function: 'updateContent' });
}
}
/**
* @function updateStatus
* Update the persisted status for a Message
*
* @param {object} job - the queue job
* @param {string} status - the queue related status
* @param {string} description - optional description for the status, generally an error message
*/
async updateStatus(job, status, description) {
if (job && job.data && job.data.messageId && job.data.client) {
await this.dataService.updateStatus(job.data.client, job.data.messageId, status, description);
}
}
/**
* @function sendMessage
* Get the persisted email content and send it through the EmailService.
*
* @param {object} job - the queue job
*/
async sendMessage(job) {
if (job && job.data && job.data.messageId && job.data.client) {
try {
// Use pooled connection only on first attempt
const isFirstAttempt = job.attemptsMade < 1;
const msg = await this.dataService.readMessage(job.data.client, job.data.messageId);
const smtpResult = await this.emailService.send(msg.email, isFirstAttempt);
const sendResult = { smtpMsgId: smtpResult.messageId, response: smtpResult.response };
await this.dataService.updateMessageSendResult(job.data.client, job.data.messageId, sendResult);
} catch (e) {
log.error(`Error sending message from queue: client = ${job.data.client}, messageId = ${job.data.messageId}.`, { function: 'sendMessage' });
throw (e);
}
}
}
/**
* @function removeJob
* Attempts to remove the job from the queue.
*
* @param {string} client - the authorized party / client
* @param {object} jobId - the job id of the desired message
* @throws ClientMismatchError if job client does not match `client`
* @throws DataIntegrityError if job data is in an inconsistent state
* @throws UncancellableError if job state must is not 'delayed'
* @returns {boolean} True if successful, false if job was not found
*/
async removeJob(client, jobId) {
const job = await this.queue.getJob(jobId);
if (job && job.data && job.data.client && job.data.messageId) {
// Job found with proper structure
if (job.data.messageId !== jobId) {
throw new DataIntegrityError(`Message ${jobId} data is inconsistent or corrupted.`);
} else if (job.data.client !== client) {
throw new ClientMismatchError(`Message ${jobId} is not owned by client ${client}.`);
} else if (await job.getState() !== 'delayed') {
throw new UncancellableError(`Message ${jobId} is not cancellable.`);
} else {
// Immediately remove from queue
await job.remove();
log.info(`Message ${job.data.messageId} removed from queue`, { function: 'removeJob' });
return true;
}
} else {
return false;
}
}
/**
* @function promoteJob
* Attempts to promote the job to run as soon as possible in the queue.
*
* @param {string} client - the authorized party / client
* @param {object} jobId - the job id of the desired message
* @throws ClientMismatchError if job client does not match `client`
* @throws DataIntegrityError if job data is in an inconsistent state
* @throws UncancellableError if job state must is not 'delayed'
* @returns {boolean} True if successful, false if job was not found
*/
async promoteJob(client, jobId) {
const job = await this.queue.getJob(jobId);
if (job && job.data && job.data.client && job.data.messageId) {
// Job found with proper structure
if (job.data.messageId !== jobId) {
throw new DataIntegrityError(`Message ${jobId} data is inconsistent or corrupted.`);
} else if (job.data.client !== client) {
throw new ClientMismatchError(`Message ${jobId} is not owned by client ${client}.`);
} else if (await job.getState() !== 'delayed') {
throw new UnpromotableError(`Message ${jobId} is not promotable.`);
} else {
// Immediately promote in queue
await job.promote();
await this.updateStatus(job, queueState.PROMOTED, 'Promotion requested');
log.info(`Message ${job.data.messageId} promoted in queue`, { function: 'promoteJob' });
return true;
}
} else {
return false;
}
}
}
module.exports = { ClientMismatchError, DataIntegrityError, UncancellableError, UnpromotableError, QueueService };