resource-watch/doc-orchestrator

View on GitHub
app/src/services/task.service.js

Summary

Maintainability
D
3 days
Test Coverage
A
91%
const logger = require('logger');
const Task = require('models/task.model');
const TaskNotFound = require('errors/task-not-found.error');
const { TASK_STATUS } = require('app.constants');
const elasticService = require('services/elastic.service');

class TaskService {

    static getFilteredQuery(query) {
        const allowedSearchFields = {
            type: 'type',
            status: 'status',
            datasetId: 'datasetId',
            createdAt: 'createdAt',
            updatedAt: 'updatedAt',
            createdBefore: 'createdAt',
            createdAfter: 'createdAt',
            updatedBefore: 'updatedAt',
            updatedAfter: 'updatedAt'
        };
        const filteredQuery = {};

        logger.debug('Object.keys(query)', Object.keys(query));
        Object.keys(query).filter((param) => Object.prototype.hasOwnProperty.call(allowedSearchFields, param)).forEach((param) => {
            switch (Task.schema.paths[allowedSearchFields[param]].instance) {

                case 'String':
                    filteredQuery[param] = {
                        $regex: query[param],
                        $options: 'i'
                    };
                    break;
                case 'Array':
                    if (query[param].indexOf('@') >= 0) {
                        filteredQuery[param] = {
                            $all: query[param].split('@').map((elem) => elem.trim())
                        };
                    } else {
                        filteredQuery[param] = {
                            $in: query[param].split(',').map((elem) => elem.trim())
                        };
                    }
                    break;
                case 'Mixed':
                    filteredQuery[param] = { $ne: null };
                    break;
                case 'Date':
                    filteredQuery[allowedSearchFields[param]] = filteredQuery[allowedSearchFields[param]] || {};
                    switch (param) {

                        case 'createdBefore':
                        case 'updatedBefore':
                            filteredQuery[allowedSearchFields[param]].$lte = new Date(query[param]);
                            break;
                        case 'createdAfter':
                        case 'updatedAfter':
                            filteredQuery[allowedSearchFields[param]].$gte = new Date(query[param]);
                            break;
                        default:
                            filteredQuery[allowedSearchFields[param]].$eq = new Date(query[param]);
                            break;

                    }
                    break;
                default:

            }
        });
        logger.debug(filteredQuery);
        return filteredQuery;
    }

    static async get(id) {
        logger.debug(`[TaskService]: Getting task with id: ${id}`);
        logger.debug(`[DBACCESS-FIND]: task.id: ${id}`);
        const task = await Task.findById(id).exec();
        if (!task) {
            logger.error(`[TaskService]: Task with id ${id} not found`);
            throw new TaskNotFound(`Task with id '${id}' not found`);
        }

        await Promise.all(task.logs.map(async (log, index) => {
            if (!log.elasticTaskId) {
                return Promise.resolve();
            }
            logger.debug(`[TaskRouter] Getting Elasticsearch task data for elasticsearchTaskIds: ${log.elasticTaskId}`);

            try {
                task.logs[index].elasticTaskStatus = await elasticService.getTaskStatus(log.elasticTaskId);
            } catch (err) {
                task.logs[index].elasticTaskStatus = err;
            }

            return Promise.resolve();
        }));

        return task;
    }

    static async create(taskData) {
        logger.debug(`[TaskService]: Creating new task of type ${taskData.type}`);
        const task = await new Task({
            _id: taskData.id,
            type: taskData.type,
            message: taskData,
            filesProcessed: 0,
            reads: 0,
            writes: 0,
            datasetId: taskData.datasetId
        }).save();
        return task;
    }

    static async update(id, taskData) {
        logger.debug(`[TaskService]: Updating task with id:  ${id}`);
        let task = await TaskService.get(id);
        task.status = taskData.status || task.status;
        task.index = taskData.index || task.index;
        task.elasticTaskId = taskData.elasticTaskId || task.elasticTaskId;
        task.error = taskData.error || task.error;
        task.filesProcessed = taskData.filesProcessed || task.filesProcessed;
        if (taskData.log && taskData.log instanceof Object) {
            const strippedLog = { ...taskData.log };
            delete strippedLog.id;
            delete strippedLog.hash;
            delete strippedLog.taskId;
            // if (strippedLog.withErrors !== true) {
            //     delete strippedLog.withErrors;
            //     delete strippedLog.detail;
            // }
            task.logs.push(strippedLog);
        }
        task.updatedAt = new Date();
        logger.debug(`[DBACCESS-SAVE]: update task.id ${id}`);
        task = await task.save();
        return task;
    }

    static async resetCounters(id) {
        logger.debug(`[TaskService]: ResetCounters of task with id:  ${id}`);
        let task = await TaskService.get(id);
        task.reads = 0;
        task.writes = 0;
        logger.debug(`[DBACCESS-SAVE]: update task.id ${id}`);
        task = await task.save();
        return task;
    }

    static async delete(id) {
        logger.debug(`[TaskService]: Deleting task with id:  ${id}`);
        let task = await TaskService.get(id);
        logger.debug(`[DBACCESS-REMOVE]: task.id ${id}`);
        task = await task.remove();
        return task;
    }

    static async getAll(query = {}) {
        logger.debug(`[TaskService]: Getting all tasks`);

        const page = query['page[number]'] ? parseInt(query['page[number]'], 10) : 1;
        const limit = query['page[size]'] ? parseInt(query['page[size]'], 10) : 10;
        const sort = { createdAt: 'asc' };

        const paginationOptions = {
            page,
            limit,
            sort
        };

        const filteredQuery = TaskService.getFilteredQuery({ ...query });
        const pages = await Task.paginate(filteredQuery, paginationOptions);

        await Promise.all(pages.docs.map(async (task) => {
            await Promise.all(task.logs.map(async (log, logIndex) => {
                if (!log.elasticTaskId) {
                    return Promise.resolve();
                }
                logger.debug(`[TaskRouter] Getting Elasticsearch task data for elasticsearchTaskIds: ${log.elasticTaskId}`);

                try {
                    task.logs[logIndex].elasticTaskStatus = await elasticService.getTaskStatus(log.elasticTaskId);
                } catch (err) {
                    task.logs[logIndex].elasticTaskStatus = err;
                }

                return Promise.resolve();
            }));
        }));

        return pages;
    }

    static async addWrite(id) {
        logger.debug(`[TaskService]: addWrite to task with id:  ${id}`);
        logger.debug(`[DBACCESS-FIND]: task.id: ${id}`);
        let task = await TaskService.get(id);
        task.writes += 1;
        logger.debug(`[DBACCESS-SAVE]: update task.id ${id}`);
        task = await task.save();
        return task;
    }

    static async addRead(id) {
        logger.debug(`[TaskService]: addRead to task with id:  ${id}`);
        logger.debug(`[DBACCESS-FIND]: task.id: ${id}`);
        let task = await TaskService.get(id);
        task.reads += 1;
        logger.debug(`[DBACCESS-SAVE]: update task.id ${id}`);
        task = await task.save();
        return task;
    }

    static async checkCounter(task) {
        logger.debug(`[TaskService]: checking counter of task with id: ${task.id}`);

        if (task.filesProcessed < task.message.fileUrl.length) {
            return false;
        }

        if ((task.writes - task.reads === 0) && (task.status === TASK_STATUS.READ)) {
            return true;
        }
        return false;
    }

    static async getRunningTasks(datasetId) {
        logger.debug(`[TaskService]: checking running task for datasetId:  ${datasetId}`);
        logger.debug(`[DBACCESS-FIND]: task.datasetId: ${datasetId}`);
        return Task.find({ datasetId, status: { $nin: [TASK_STATUS.SAVED, TASK_STATUS.ERROR] } }).exec();
    }

}

module.exports = TaskService;