dbmedialab/reader-critics

View on GitHub
src/app/queue/index.ts

Summary

Maintainability
A
0 mins
Test Coverage
//
// LESERKRITIKK v2 (aka Reader Critics)
// Copyright (C) 2017 DB Medialab/Aller Media AS, Oslo, Norway
// https://github.com/dbmedialab/reader-critics/
//
// This program is free software: you can redistribute it and/or modify it under
// the terms of the GNU General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later
// version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <http://www.gnu.org/licenses/>.
//

import * as colors from 'ansicolors';
import * as kue from 'kue';
import * as Promise from 'bluebird';

import {
    createRedisConnection,
    dbMessageQueue,
} from 'app/db';

import {
    onCheckAwaitFeedback,
    onCheckEscalationToEditor,
    onCollectArticlesForPolling,
    onCompileUnrevisedDigest,
    onNewFeedback,
    onPollArticleUpdate,
    onSendEditorEscalation,
} from './handlers';

import * as app from 'app/util/applib';

// Define which job handlers are available to what worker type

const jobWorkerHandlers = Object.freeze({
    onCheckAwaitFeedback,
    onCheckEscalationToEditor,
    onCollectArticlesForPolling,
    onCompileUnrevisedDigest,
    onNewFeedback,
    onPollArticleUpdate,
    onSendEditorEscalation,
});

// const webWorkerHandlers = Object.freeze({
// }); - still empty

// Internal resources

const log = app.createLog();

let queue : kue.Queue;

// All message types

export enum MessageType {
    CheckAwaitFeedback = 'check-await-feedback',
    CheckEscalationToEditor = 'check-escalation-to-editor',
    CollectArticlesForPolling = 'collect-articles-for-polling',
    CompileUnrevisedDigest = 'compile-unrevised-digest',
    NewFeedback = 'new-feedback',
    PollArticleUpdate = 'poll-article-update',
    SendEditorEscalation = 'send-editor-escalation',
    SendSuggestionDigest = 'send-suggestion-digest',
}

// Queue initialization for the different worker types

export function initMasterQueue() : Promise <void> {
    log('Initialising %s worker queue', colors.brightRed('master'));

    queue = kue.createQueue({
        redis: {
            createClientFactory: () => createRedisConnection(dbMessageQueue),
        },
    });

    return maintenance(5000);  // Clean up a greater batch on startup
}

export function initJobWorkerQueue() : Promise <void> {
    log('Initialising %s worker queue', colors.brightYellow('job'));
    queue = kue.createQueue({
        redis: {
            createClientFactory: () => createRedisConnection(dbMessageQueue),
        },
    });

    Object.keys(MessageType).forEach((msgType : string) => {
        const handler = jobWorkerHandlers[`on${msgType}`];
        if (handler) {
            queue.process(MessageType[msgType], handler);
            log('Installed handler for "%s" messages', colors.brightGreen(msgType));
        }
        else {
            log('No handler found for "%s" messages', colors.brightRed(msgType));
        }
    });

    return Promise.resolve();
}

export function initWebWorkerQueue() : Promise <void> {
    log('Initialising %s worker queue', colors.brightGreen('web'));
    queue = kue.createQueue({
        redis: {
            createClientFactory: () => createRedisConnection(dbMessageQueue),
        },
    });

    return Promise.resolve();
}

// Push messages into the queue

export function sendMessage(type : MessageType, payload? : {}, options? : {}) : Promise <void> {
    const paypayloadload = payload === undefined ? {} : payload;
    log(type, app.inspect(paypayloadload, 1, false));

    queue.create(type, paypayloadload)
        .priority('normal')
        .attempts(1)
        .ttl(10 * 60 * 1000)  // That should be ten minutes
        .removeOnComplete(true)
        .save();

    return Promise.resolve();
}

// Regularly clean up finished and stuck jobs

export function maintenance(maxJobs = 500) : Promise <void> {
    return Promise.all([
        cleanThatUp('complete', maxJobs),
        cleanThatUp('inactive', maxJobs),
        cleanThatUp('failed', maxJobs),
    ])
    .then(() => undefined);
}

function cleanThatUp(jobStatus : string, maxJobs) : Promise <void> {
    return new Promise((resolve, reject) => {
        kue.Job.rangeByState(jobStatus, 0, maxJobs - 1, 'asc', (error, jobs) => {
            if (error) {
                return reject(error);
            }

            if (jobs.length <= 0) {
                return resolve();  // Nothing to do, get out of here
            }

            log('Cleaning up %d %s queue jobs', jobs.length, jobStatus);

            Promise.map(jobs, (job : kue.Job) => {
                return new Promise((resolgrmpf) => {
                    job.remove(() => resolgrmpf(job.id));
                });
            })
            .then(() => resolve());
        });
    });
}