huridocs/uwazi

View on GitHub
scripts/checkServicesTasks.js

Summary

Maintainability
A
1 hr
Test Coverage
const RedisSMQ = require('rsmq');
const Redis = require('redis');

const { tenant, redisUrl, read, deleteAll, service, deleteOne } = require('yargs')
  .option('tenant', {
    alias: 't',
    type: 'string',
    describe: 'Tenant to check',
  })
  .option('redisUrl', {
    alias: 'u',
    type: 'string',
    describe: 'Redis url',
    default: 'redis://localhost:6379',
  })
  .option('service', {
    alias: 's',
    type: 'string',
    describe: 'Service queue',
    choices: ['information_extraction', 'segmentation'],
  })
  .option('read', {
    alias: 'r',
    type: 'boolean',
    describe: 'Read the queue',
    default: true,
  })
  .option('deleteAll', {
    alias: 'D',
    type: 'boolean',
    describe: 'Delete all messages from the queue',
    default: false,
  })
  .option('deleteOne', {
    alias: 'd',
    type: 'string',
    describe: 'ID of the message to be deleted',
  })

  .demandOption(['service'], '\n\n').argv;

const run = async () => {
  const redisClient = Redis.createClient(redisUrl);
  const redisSMQ = new RedisSMQ({ client: redisClient });

  const readFirstTaskMessage = async () => {
    const message = await redisSMQ.receiveMessageAsync({
      qname: `${service}_tasks`,
      vt: 1,
    });

    if (!message.id) {
      return undefined;
    }

    return message;
  };

  const deleteMessage = async id => {
    await redisSMQ.deleteMessageAsync({
      qname: `${service}_tasks`,
      id,
    });
  };

  const readAllTaskMessages = async () => {
    const messages = [];
    let message = await readFirstTaskMessage();

    while (message) {
      messages.push(message);
      // eslint-disable-next-line no-await-in-loop
      message = await readFirstTaskMessage();
    }

    return messages;
  };

  const formatMessage = message => {
    const messageData = JSON.parse(message.message);
    return {
      id: message.id,
      task: messageData.task,
      tenant: messageData.tenant,
      params: messageData.params,
      sent: new Date(message.sent).toLocaleString(),
    };
  };

  if (deleteOne) {
    process.stdout.write(`💣 \u001b[31m DELETING message \u001b[32m${deleteOne}\u001b[37m\n`);
    try {
      await deleteMessage(deleteOne);
    } catch (e) {
      process.stderr.write.log(e);
    }
  }

  const queue = await redisSMQ.getQueueAttributesAsync({ qname: `${service}_tasks` });
  process.stdout.write(
    `⚠️ \u001b[33m ${queue.hiddenmsgs} hidden\u001b[37m tasks (been read by others)\n`
  );
  let messages = await readAllTaskMessages();
  messages = messages.map(formatMessage);

  if (tenant) {
    messages = messages.filter(message => message.tenant === tenant);
  }

  if (deleteAll) {
    process.stdout.write.log(`💣 \u001b[31m DELETING ${messages.length} messages\u001b[37m\n`);
    await Promise.all(messages.map(message => deleteMessage(message.id)));
    messages = [];
  }

  if (read) {
    // eslint-disable-next-line no-console
    console.table(messages);
  }

  await redisClient.end(true);
};
run();