resource-watch/document-adapter

View on GitHub
app/src/services/taskQueueService.js

Summary

Maintainability
B
4 hrs
Test Coverage
B
86%
const { task } = require('rw-doc-importer-messages');
const config = require('config');
const { IllegalOperationError } = require('amqplib/lib/error');
const logger = require('logger');
const QueueService = require('services/queueService');

class TaskQueueService extends QueueService {

    constructor() {
        super(config.get('queues.tasks'));
    }

    async sendMessage(msg, retryOnChannelClosed = true) {
        logger.info(`Sending message to ${this.q}`, msg);
        try {
            // Sending to queue
            await this.isInit;
            this.channel.sendToQueue(this.q, Buffer.from(JSON.stringify(msg)));
        } catch (err) {
            if (err instanceof IllegalOperationError && err.message === 'Channel closed' && retryOnChannelClosed) {
                await this.init(false);
                await this.sendMessage(msg, false);
            } else {
                logger.error(`Error sending message to ${this.q}: ${err}`);
            }
        }
    }

    async import(data) {
        const taskMessage = task.createMessage(task.MESSAGE_TYPES.TASK_CREATE, data);
        await this.sendMessage(taskMessage);
    }

    async overwrite(data) {
        const taskMessage = task.createMessage(task.MESSAGE_TYPES.TASK_OVERWRITE, data);
        await this.sendMessage(taskMessage);
    }

    async concat(data) {
        const taskMessage = task.createMessage(task.MESSAGE_TYPES.TASK_CONCAT, data);
        await this.sendMessage(taskMessage);
    }

    async append(data) {
        const taskMessage = task.createMessage(task.MESSAGE_TYPES.TASK_APPEND, data);
        await this.sendMessage(taskMessage);
    }

    async reindex(data) {
        const taskMessage = task.createMessage(task.MESSAGE_TYPES.TASK_REINDEX, data);
        await this.sendMessage(taskMessage);
    }

    async delete(data) {
        const taskMessage = task.createMessage(task.MESSAGE_TYPES.TASK_DELETE, data);
        await this.sendMessage(taskMessage);
    }

    async deleteIndex(data) {
        const taskMessage = task.createMessage(task.MESSAGE_TYPES.TASK_DELETE_INDEX, data);
        await this.sendMessage(taskMessage);
    }

}

module.exports = new TaskQueueService();