resource-watch/doc-executor

View on GitHub
app/src/services/executor.service.js

Summary

Maintainability
B
5 hrs
Test Coverage
B
85%
const logger = require('logger');
const statusQueueService = require('services/status-queue.service');
const { execution } = require('rw-doc-importer-messages');
const ImporterService = require('services/importer.service');
const elasticService = require('services/elastic.service');
const ReindexingInProgress = require('errors/reindexingInProgress');
const UrlNotFound = require('errors/urlNotFound');
const docImporterMessages = require('rw-doc-importer-messages');

const ExecutionMessages = execution.MESSAGE_TYPES;

class ExecutorService {

    static async processMessage(msg, executorQueueService) {
        // logger.debug('Processing message', msg);
        switch (msg.type) {

            case ExecutionMessages.EXECUTION_CREATE:
                await ExecutorService.create(msg, executorQueueService);
                break;

            case ExecutionMessages.EXECUTION_CONCAT:
                await ExecutorService.concat(msg, executorQueueService);
                break;

            case ExecutionMessages.EXECUTION_APPEND:
                await ExecutorService.append(msg, executorQueueService);
                break;

            case ExecutionMessages.EXECUTION_CREATE_INDEX:
                await ExecutorService.createIndex(msg);
                break;

            case ExecutionMessages.EXECUTION_DELETE:
                await ExecutorService.deleteQuery(msg);
                break;

            case ExecutionMessages.EXECUTION_CONFIRM_DELETE:
                await ExecutorService.confirmDelete(msg);
                break;

            case ExecutionMessages.EXECUTION_DELETE_INDEX:
                await ExecutorService.deleteIndex(msg);
                break;

            case ExecutionMessages.EXECUTION_CONFIRM_IMPORT:
                await ExecutorService.confirmImport(msg);
                break;

            case ExecutionMessages.EXECUTION_CONFIRM_REINDEX:
                await ExecutorService.confirmReIndex(msg);
                break;

            case ExecutionMessages.EXECUTION_REINDEX:
                await ExecutorService.reindex(msg);
                break;

            case ExecutionMessages.EXECUTION_READ_FILE:
                await ExecutorService.readFile(msg);
                break;

            default:
                logger.error('Message not supported');

        }
    }

    static async create(msg, executorQueueService) {
        // Create the index
        logger.debug('Create task');
        logger.debug('Creating index');
        const index = `index_${msg.datasetId.replace(/-/g, '')}_${Date.now()}`;
        await elasticService.createIndex(index, msg.legend);
        await elasticService.deactivateIndex(index);
        msg.index = index;

        // Now send a STATUS_INDEX_CREATED to StatusQueue
        await statusQueueService.sendIndexCreated(msg.taskId, index);
        logger.debug('Queueing files for reading');

        msg.fileUrl.forEach(async (fileUrl) => executorQueueService.sendMessage(
            docImporterMessages.execution.createMessage(
                docImporterMessages.execution.MESSAGE_TYPES.EXECUTION_READ_FILE,
                { ...msg, fileUrl }
            )
        ));
    }

    static async concat(msg, executorQueueService) {
        // The Index is already created when concatenating
        logger.debug('Starting importing service');
        logger.debug('Creating index');
        const index = `index_${msg.datasetId.replace(/-/g, '')}_${Date.now()}`;
        await elasticService.createIndex(index, msg.legend);
        await elasticService.deactivateIndex(index);
        msg.indexType = 'type';
        msg.index = index;

        if (!Array.isArray(msg.fileUrl)) {
            msg.fileUrl = [msg.fileUrl];
        }

        // Now send a STATUS_INDEX_CREATED to StatusQueue
        await statusQueueService.sendIndexCreated(msg.taskId, index);
        logger.debug('Queueing files for reading');

        msg.fileUrl.forEach(async (fileUrl) => executorQueueService.sendMessage(
            docImporterMessages.execution.createMessage(
                docImporterMessages.execution.MESSAGE_TYPES.EXECUTION_READ_FILE,
                { ...msg, fileUrl }
            )
        ));
    }

    static async createIndex(msg) {
        logger.debug('Starting reindex process');
        logger.debug('Creating new index...');
        const index = `index_${msg.datasetId.replace(/-/g, '')}_${Date.now()}`;
        await elasticService.createIndex(index, msg.legend);
        await elasticService.deactivateIndex(index);

        // Now send a STATUS_INDEX_CREATED to StatusQueue
        await statusQueueService.sendIndexCreated(msg.taskId, index);
    }

    static async append(msg, executorQueueService) {
        // The Index is already created when concatenating
        logger.debug('Starting append workflow');

        const { index } = msg;

        logger.debug(`Deactivating index ${index}`);

        await elasticService.deactivateIndex(index);
        msg.indexType = 'type';

        if (!Array.isArray(msg.fileUrl)) msg.fileUrl = [msg.fileUrl];

        // Now send a STATUS_INDEX_DEACTIVATED to StatusQueue
        await statusQueueService.sendIndexDeactivated(msg.taskId, index);

        logger.debug('Queueing files for reading');

        msg.fileUrl.forEach(async (fileUrl) => executorQueueService.sendMessage(
            docImporterMessages.execution.createMessage(
                docImporterMessages.execution.MESSAGE_TYPES.EXECUTION_READ_FILE,
                { ...msg, fileUrl }
            )
        ));
    }

    static async readFile(msg) {
        logger.debug('Starting readFile');
        msg.indexType = 'type';

        try {
            const importerService = new ImporterService(msg);
            await importerService.start();
            logger.debug('Sending read file message');
            await statusQueueService.sendReadFile(msg.taskId, msg.fileUrl);
        } catch (err) {
            if (err instanceof UrlNotFound) {
                await statusQueueService.sendErrorMessage(msg.taskId, err.message);
                return;
            }
            throw err;
        }
    }

    static async deleteQuery(msg) {
        logger.debug('Delete data of index with query ', msg.query);
        try {
            const elasticTaskId = await elasticService.deleteQuery(msg.index, msg.query);
            // Generate Performed Delete Query event
            await statusQueueService.sendPerformedDeleteQuery(msg.taskId, elasticTaskId);
        } catch (err) {
            if (err.statusCode === 500) {
                await statusQueueService.sendErrorMessage(msg.taskId, err.message);
                return;
            }
            throw err;
        }

    }

    static async reindex(msg) {
        logger.debug(`Reindex from index ${msg.sourceIndex} to index ${msg.targetIndex}`);
        const elasticTaskId = await elasticService.reindex(msg.sourceIndex, msg.targetIndex);
        // Generate Performed Delete Query event
        await statusQueueService.sendPerformedReindex(msg.taskId, elasticTaskId);
    }

    static async confirmDelete(msg) {
        logger.debug('Confirm Delete data with elasticTaskId ', msg.elasticTaskId);
        const finished = await elasticService.checkFinishTaskId(msg.elasticTaskId);
        if (!finished) {
            throw new Error(`Delete index Elasticsearch task ${msg.elasticTaskId} not finished`);
        }
        // try {check elasticTask } catch (err) throw new Error
        // throwing an error here implies that the msg is going to
        // be "nacked"
        // set a timeout before throw the error
        // if not an error
        await statusQueueService.sendFinishedDeleteQuery(msg.taskId);
    }

    static async confirmReIndex(msg) {
        logger.debug('Confirm Reindex data with elasticTaskId ', msg.elasticTaskId);
        const finished = await elasticService.checkFinishTaskId(msg.elasticTaskId);
        if (!finished) {
            throw new ReindexingInProgress(`Reindex Elasticsearch task ${msg.elasticTaskId} not finished`);
        }

        await statusQueueService.sendFinishedReindex(msg.taskId);
    }

    static async deleteIndex(msg) {
        logger.debug('Deleting index', msg.index);
        try {
            await elasticService.deleteIndex(msg.index);
        } catch (error) {
            if (error.response) {
                const response = JSON.parse(error.response);
                if (response.error.type === 'index_not_found_exception') {
                    return statusQueueService.sendIndexDeleted(msg.taskId);
                }
            }

            throw error;
        }

        return statusQueueService.sendIndexDeleted(msg.taskId);

    }

    static async confirmImport(msg) {
        logger.debug('Confirming index', msg.index);
        await elasticService.activateIndex(msg.index);
        await statusQueueService.sendImportConfirmed(msg.taskId);
    }

}

module.exports = ExecutorService;