jbielick/faktory_worker_node

View on GitHub
src/worker.ts

Summary

Maintainability
A
3 hrs
Test Coverage
import makeDebug from "debug";
import { v4 as uuid } from "uuid";
import { strict as assert } from "assert";
import { ComposedMiddleware, Middleware as KoaMiddleware } from "koa-compose";
import { EventEmitter } from "events";

import { JobPayload, JobType } from "./job";
import { Client, ClientOptions } from "./client";
import { wrapNonErrors } from "./utils";
import { sleep } from "./utils";
import createExecutionChain from "./create-execution-chain";
import { strictlyOrdered, weightedRandom } from "./queues";

const debug = makeDebug("faktory-worker:worker");

export type Registry = {
  [jobtype: string]: JobFunction;
};

export type JobFunctionContextWrapper = {
  (...args: unknown[]): ContextProvider;
};

export type UnWrappedJobFunction = {
  (...args: unknown[]): unknown;
};

export type JobFunction = JobFunctionContextWrapper | UnWrappedJobFunction;

export type ContextProvider = (ctx: MiddlewareContext) => unknown;

export interface MiddlewareContext {
  job: JobPayload;
  fn?: JobFunction;
}

export type Middleware = KoaMiddleware<MiddlewareContext>;

export type WorkerOptions = {
  wid?: string;
  concurrency?: number;
  timeout?: number;
  beatInterval?: number;
  queues?: string[] | { [name: string]: number } | (() => string[]);
  middleware?: Middleware[];
  registry?: Registry;
  poolSize?: number;
} & ClientOptions;

/**
 * Representation of a worker process with many concurrent job processors. Works at the
 * concurrency set in options during construction. Will hold at most `concurrency` jobs
 * in-memory while processing at any one time. Listens for signals to quiet or shutdown.
 * Should not be started more than once per-process, nor should more than one worker be
 * started per-process.
 *
 * @example
 * const worker = new Worker({
 *   queues: ['critical', 'default', 'low'],
 * });
 *
 * worker.work();
 */
export class Worker extends EventEmitter {
  readonly wid: string;
  private concurrency: number;
  private shutdownTimeout: number;
  private beatInterval: number;
  private readonly queueFn: () => string[];
  readonly middleware: Middleware[];
  private readonly registry: Registry;
  private quieted: boolean | undefined;
  private working: Map<string, Promise<string>>;
  private execute: ComposedMiddleware<MiddlewareContext>;
  private pulse: NodeJS.Timer;
  readonly client: Client;

  /**
   * @param {object} [options]
   * @param  {String} [options.wid=uuid().slice(0, 8)]: the wid the worker will use
   * @param  {Number} [options.concurrency=20]: how many jobs this worker can process at once
   * @param  {Number} [options.shutdownTimeout=8]: the amount of time in seconds that the worker
   *                                             may take to finish a job before exiting
   *                                             ungracefully
   * @param  {Number} [options.beatInterval=15]: the amount of time in seconds between each
   *                                             heartbeat
   * @param  {string[]} [options.queues=['default']]: the queues this worker will fetch jobs from
   * @param  {function[]} [options.middleware=[]]: a set of middleware to run before performing
   *                                               each job
   *                                       in koa.js-style middleware execution signature
   * @param  {Registry} [options.registry=Registry]: the job registry to use when working
   * @param {Number} [options.poolSize=concurrency+2] the client connection pool size for
   *                                                  this worker
   */
  constructor(options: WorkerOptions = {}) {
    super();
    this.wid = options.wid || uuid().slice(0, 8);
    this.concurrency = options.concurrency || 20;
    this.shutdownTimeout = (options.timeout || 8) * 1000;
    this.beatInterval = (options.beatInterval || 15) * 1000;
    const queues = options.queues || [];
    if (typeof queues === "function") {
      this.queueFn = queues;
    } else if (Array.isArray(queues)) {
      this.queueFn = strictlyOrdered(queues.length ? queues : ["default"]);
    } else {
      this.queueFn = weightedRandom(queues);
    }
    this.middleware = options.middleware || [];
    this.registry = options.registry || {};
    this.working = new Map();
    this.client = new Client({
      wid: this.wid,
      url: options.url,
      host: options.host,
      port: options.port,
      password: options.password,
      poolSize: options.poolSize || this.concurrency + 2,
      labels: options.labels || [],
    });
    this.on("error", this.onerror);
  }

  private async tick(): Promise<void> {
    if (this.quieted) return;
    try {
      if (this.working.size >= this.concurrency) {
        await Promise.race(this.working.values());
      } else {
        const job = await this.fetch();
        if (job) {
          const { jid } = job;
          this.working.set(jid, this.handle(job));
        }
      }
    } catch (e) {
      this.emit("error", e);
      await sleep(1000);
    } finally {
      this.tick();
    }
  }

  /**
   * starts the worker fetch loop and job processing
   *
   * @return self, when working has been stopped by a signal or concurrent
   *                        call to stop or quiet
   * @see  Worker.quiet
   * @see  Worker.stop
   */
  async work(): Promise<Worker> {
    debug("work concurrency=%i", this.concurrency);
    this.execute = createExecutionChain(this.middleware, this.registry);
    await this.beat();
    this.pulse = setInterval(async () => {
      try {
        await this.beat();
      } catch (error) {
        this.emit(
          "error",
          new Error(`Worker failed heartbeat: ${error.message}\n${error.stack}`)
        );
      }
    }, this.beatInterval);
    this.trapSignals();
    this.tick();
    return this;
  }

  /**
   * Signals to the worker to discontinue fetching new jobs and allows the worker
   * to continue processing any currently-running jobs
   */
  quiet(): void {
    debug("quiet");
    this.quieted = true;
  }

  /**
   * stops the worker
   *
   * @return {promise} resolved when worker stops
   */
  async stop(): Promise<void> {
    Worker.removeSignalHandlers();
    debug("stop");
    this.quiet();
    clearInterval(this.pulse);
    let forced = false;

    return new Promise(async (resolve) => {
      const timeout = setTimeout(async () => {
        debug("shutdown timeout exceeded");
        forced = true;
        // @TODO fail in progress jobs so they retry faster
        this.client.close();
        resolve();
        process.exit(1);
      }, this.shutdownTimeout);

      process.nextTick(async () => {
        try {
          debug("awaiting in progress");
          await Promise.all(this.working.values());
          debug("all clear");
          if (forced) return;
          await this.client.close();
          clearTimeout(timeout);
          resolve();
        } catch (e) {
          console.warn("error during graceful shutdown:", e);
        }
      });
    });
  }

  /**
   * Sends a heartbeat for this server and interprets the response state (if present)
   * to quiet or terminate the worker
   */
  async beat(): Promise<void> {
    const response = await this.client.beat();
    switch (response) {
      case "quiet":
        this.quiet();
        break;
      case "terminate":
        this.stop();
        break;
      default:
        break;
    }
  }

  get queues(): string[] {
    return this.queueFn();
  }

  /**
   * Fetches a job from the defined queues.
   *
   * @private
   * @return {JobPayload|null} a job payload from the server or null when there are
   *                             no jobs
   */
  private fetch(): Promise<JobPayload | null> {
    return this.client.fetch(...this.queues);
  }

  /**
   * Handles a job from the server by executing it and either acknowledging
   * or failing the job when done
   *
   * @private
   * @param  {JobPayload} job the job payload from the server
   * @return {Promise<string>} 'ack' or 'fail' depending on job handling resu
   */
  private async handle(job: JobPayload): Promise<string> {
    const { jid } = job;
    let error;
    try {
      debug(`executing ${jid}`);
      await this.execute({ job });
    } catch (e) {
      error = wrapNonErrors(e);
    }
    try {
      if (!error) {
        await this.client.ack(jid);
        debug(`ACK ${jid}`);
        return "done";
      } else {
        await this.client.fail(jid, error);
        debug(`FAIL ${jid}`);
        this.emit("fail", { job, error });
        return "fail";
      }
    } catch (e) {
      this.emit("error", e);
      return "error";
    } finally {
      this.working.delete(jid);
    }
  }

  /**
   * Adds a middleware function to the stack
   *
   * @param  {Function} fn koa-compose-style middleware function
   * @return {FaktoryControl}      this
   * @instance
   * @see  {@link https://github.com/koajs/koa/blob/master/docs/guide.md#writing-middleware|koa middleware}
   * @example
   * faktory.use(async (ctx, next) => {
   *   // a pool you created to hold database connections
   *   pool.use(async (conn) => {
   *     ctx.db = conn;
   *     await next();
   *   });
   * });
   */
  use(fn: Middleware): Worker {
    assert(typeof fn === "function");
    debug("use %s", fn.name || "-");
    this.middleware.push(fn);
    return this;
  }

  onerror(error: Error): void {
    if (this.listenerCount("error") === 1) console.error(error);
  }

  /**
   * Adds a {@link JobFunction|JobFunction} to the {@link Registry}
   *
   * @param  {Jobtype}   name string descriptor for the jobtype
   * @param  {JobFunction} fn
   * @return {FaktoryControl}        this
   * @instance
   * @example
   * faktory.register('MyJob', (...args) => {
   *   // some work
   * });
   */
  register(name: JobType, fn: JobFunction): Worker {
    assert(typeof fn === "function", "a registered job must be a function");
    debug("registered %s", name);
    this.registry[name] = fn;
    return this;
  }

  /**
   * @private
   */
  private trapSignals(): void {
    // istanbul ignore next
    process
      .once("SIGTERM", () => this.stop())
      .once("SIGTSTP", () => this.quiet())
      .once("SIGINT", () => this.stop());
  }

  private static removeSignalHandlers(): void {
    process
      .removeAllListeners("SIGTERM")
      .removeAllListeners("SIGTSTP")
      .removeAllListeners("SIGINT");
  }
}