huridocs/uwazi

View on GitHub
app/api/queue.v2/infrastructure/specs/QueueWorker.spec.ts

Summary

Maintainability
C
7 hrs
Test Coverage
/* eslint-disable max-statements */
/* eslint-disable no-void */
/* eslint-disable max-classes-per-file */
import { Dispatchable, HeartbeatCallback } from 'api/queue.v2/application/contracts/Dispatchable';
import { testingEnvironment } from 'api/utils/testingEnvironment';
import { DefaultTestingQueueAdapter } from 'api/queue.v2/configuration/factories';
import { NamespacedDispatcher } from '../NamespacedDispatcher';
import { QueueWorker } from '../QueueWorker';
import { createSignals } from './Signals';

async function sleep(ms: number) {
  return new Promise(resolve => {
    setTimeout(resolve, ms);
  });
}

class TestJob implements Dispatchable {
  private signal: (index: string) => void;

  private logger: (message: string, index: number) => void;

  constructor(signal: (index: string) => void, logger: (message: string, index: number) => void) {
    this.signal = signal;
    this.logger = logger;
  }

  async handleDispatch(
    _heartbeat: HeartbeatCallback,
    params: { data: { pieceOfData: string[] }; aNumber: number }
  ): Promise<void> {
    this.signal(`starting-${params.aNumber}`);
    await sleep(50);
    this.logger(`${params.aNumber}, ${params.data.pieceOfData.join('|')}`, params.aNumber);
    this.signal(`ending-${params.aNumber}`);
  }
}

beforeEach(async () => {
  await testingEnvironment.setUp({});
});

afterAll(async () => {
  await testingEnvironment.tearDown();
});

it('should process all the jobs', async () => {
  const output: string[] = [];
  const adapter = DefaultTestingQueueAdapter();
  const dispatcher1 = new NamespacedDispatcher('namespace1', 'name', adapter);
  const dispatcher2 = new NamespacedDispatcher('namespace2', 'name', adapter);

  const worker = new QueueWorker('name', adapter, () => {});

  const signals = createSignals();

  worker.register(
    TestJob,
    async namespace =>
      new TestJob(signals.signal, message => output.push(`${namespace} ${message}`))
  );

  const dispatch = async (params: any, i: number) => {
    await sleep(5);
    return (i % 2 ? dispatcher2 : dispatcher1).dispatch(TestJob, params);
  };

  await dispatch({ data: { pieceOfData: ['.'] }, aNumber: 1 }, 0);
  await dispatch({ data: { pieceOfData: ['.', '.'] }, aNumber: 2 }, 1);
  await dispatch({ data: { pieceOfData: ['.', '.', '.'] }, aNumber: 3 }, 2);
  output.push('finished enqueueing jobs pre worker.start');
  await sleep(5);
  await Promise.all([
    worker.start(),
    dispatch({ data: { pieceOfData: ['.', '.', '.', '.'] }, aNumber: 4 }, 0)
      .then(
        void dispatch({ data: { pieceOfData: ['.', '.', '.', '.', '.'] }, aNumber: 5 }, 1).then(
          void dispatch({ data: { pieceOfData: ['.', '.', '.', '.', '.', '.'] }, aNumber: 6 }, 2)
        )
      )
      .then(async () => {
        output.push('finished enqueueing jobs post worker.start');
        await signals.signaled('ending-6');
        await worker.stop();
        output.push('worker stopped');
      }),
  ]);

  expect(output).toEqual([
    'finished enqueueing jobs pre worker.start',
    'finished enqueueing jobs post worker.start',
    'namespace1 1, .',
    'namespace2 2, .|.',
    'namespace1 3, .|.|.',
    'namespace1 4, .|.|.|.',
    'namespace2 5, .|.|.|.|.',
    'namespace1 6, .|.|.|.|.|.',
    'worker stopped',
  ]);
}, 10000);

it('should finish the in-progress job before stopping', async () => {
  const output: string[] = [];
  const adapter = DefaultTestingQueueAdapter();
  const dispatcher1 = new NamespacedDispatcher('namespace1', 'name', adapter);
  const dispatcher2 = new NamespacedDispatcher('namespace2', 'name', adapter);

  const worker = new QueueWorker('name', adapter, () => {});

  const signals = createSignals();

  const buildLogger = (namespace: string) => (message: string) => {
    output.push(`${namespace} ${message}`);
  };

  worker.register(TestJob, async namespace => new TestJob(signals.signal, buildLogger(namespace)));

  const dispatch = async (params: any, i: number) => {
    await sleep(5);
    return (i % 2 ? dispatcher2 : dispatcher1).dispatch(TestJob, params);
  };

  await dispatch({ data: { pieceOfData: ['.'] }, aNumber: 1 }, 0);
  await dispatch({ data: { pieceOfData: ['.', '.'] }, aNumber: 2 }, 1);
  await dispatch({ data: { pieceOfData: ['.', '.', '.'] }, aNumber: 3 }, 2);

  output.push('finished enqueueing jobs pre worker.start');

  await Promise.all([
    worker.start(),
    dispatch({ data: { pieceOfData: ['.', '.', '.', '.'] }, aNumber: 4 }, 0)
      .then(
        void dispatch({ data: { pieceOfData: ['.', '.', '.', '.', '.'] }, aNumber: 5 }, 1).then(
          void dispatch({ data: { pieceOfData: ['.', '.', '.', '.', '.', '.'] }, aNumber: 6 }, 2)
        )
      )
      .then(async () => {
        output.push('finished enqueueing jobs post worker.start');
        await signals.signaled('starting-4');
        output.push('stopping worker');
        await worker.stop();
        output.push('worker stopped');
      }),
  ]);

  expect(output).toEqual([
    'finished enqueueing jobs pre worker.start',
    'finished enqueueing jobs post worker.start',
    'namespace1 1, .',
    'namespace2 2, .|.',
    'namespace1 3, .|.|.',
    'stopping worker',
    'namespace1 4, .|.|.|.',
    'worker stopped',
  ]);
}, 10000);

it('should log and continue if a job fails', async () => {
  class FailOnceJob implements Dispatchable {
    static failed = false;

    // eslint-disable-next-line class-methods-use-this
    async handleDispatch(): Promise<void> {
      if (FailOnceJob.failed) {
        return;
      }

      FailOnceJob.failed = true;
      throw new Error('failing');
    }
  }

  const logMock = jest.fn();

  const adapter = DefaultTestingQueueAdapter();
  const dispatcher = new NamespacedDispatcher('namespace', 'name', adapter);
  const queueWorker = new QueueWorker('name', adapter, logMock);

  queueWorker.register(FailOnceJob, async () => new FailOnceJob());

  await dispatcher.dispatch(FailOnceJob, undefined);

  await Promise.all([queueWorker.start(), sleep(200).then(async () => queueWorker.stop())]);

  expect(logMock).toHaveBeenCalledWith(
    'error',
    expect.objectContaining({ job: expect.objectContaining({ name: FailOnceJob.name }) })
  );
  expect(logMock).toHaveBeenCalledWith(
    'info',
    expect.objectContaining({ message: expect.stringContaining('Sleeping') })
  );
});