jbielick/faktory_worker_node

View on GitHub
src/client.ts

Summary

Maintainability
A
0 mins
Test Coverage
import { default as makeDebug } from "debug";
import { URL } from "url";
import { unescape } from "querystring";
import { hostname } from "os";
import { createPool, Pool } from "generic-pool";

import { encode, hash, toJobPayloadWithDefaults } from "./utils";
import { Job, JobPayload, JobType, PartialJobPayload } from "./job";
import { Mutation, RETRIES, DEAD, SCHEDULED } from "./mutation";
import { Connection, Greeting, Command } from "./connection";
import { ConnectionFactory } from "./connection-factory";

const debug = makeDebug("faktory-worker:client");
const heartDebug = makeDebug("faktory-worker:client:heart");

const FAKTORY_PROTOCOL_VERSION = 2;
const FAKTORY_PROVIDER = process.env.FAKTORY_PROVIDER || "FAKTORY_URL";
const FAKTORY_URL = process.env[FAKTORY_PROVIDER] || "tcp://127.0.0.1:7419";
const BULK_SIZE_WARN_THRESHOLD = 5001;

export type ClientOptions = {
  host?: string;
  port?: string | number;
  password?: string;
  url?: string;
  wid?: string;
  labels?: string[];
  poolSize?: number;
};

export type RejectedJobFromPushBulk = {
  reason: string;
  payload: JobPayload;
};

export type RejectedJobsFromPushBulk = Record<string, RejectedJobFromPushBulk>;

export type Hello = {
  hostname: string;
  v: number;
  wid?: string;
  labels?: string[];
  pid?: number;
  pwdhash?: string;
};

export type ServerInfo = {
  server_utc_time: string;
  faktory: {
    queues: {
      [name: string]: number;
    };
    tasks: {
      Retries: {
        size: number;
      };
      Dead: {
        size: number;
      };
      Scheduled: {
        size: number;
      };
    };
  };
};

/**
 * A client connection handle for interacting with the faktory server. Holds a pool of 1 or more
 * underlying connections. Safe for concurrent use and tolerant of unexpected
 * connection terminations. Use this object for all interactions with the factory server.
 *
 * @example
 * const client = new Client();
 *
 * const job = await client.fetch('default');
 *
 */
export class Client {
  password?: string;
  labels: string[];
  wid?: string;
  connectionFactory: ConnectionFactory;
  pool: Pool<Connection>;

  /**
   * Creates a Client with a connection pool
   *
   * @param {object} [options]
   * @param {string} [options.url=tcp://127.0.0.1:7419] connection string for the faktory server
   *                                                    (checks for FAKTORY_PROVIDER and
   *                                                    FAKTORY_URL)
   * @param {string} [options.host=127.0.0.1] host string to connect to
   * @param {number|string} [options.port=7419] port to connect to faktory server on
   * @param {string} [options.password] faktory server password to use during HELLO
   * @param {string} [options.wid] optional wid that should be provided to the server
   *                               (only necessary for a worker process consuming jobs)
   * @param {string[]} [options.labels=[]] optional labels to provide the faktory server
   *                                       for this client
   * @param {number} [options.poolSize=10] the maxmimum size of the connection pool
   */
  constructor(options: ClientOptions = {}) {
    const url = new URL(options.url || FAKTORY_URL);

    this.password = options.password || unescape(url.password);
    this.labels = options.labels || [];
    this.wid = options.wid;
    this.connectionFactory = new ConnectionFactory({
      host: options.host || url.hostname,
      port: options.port || url.port,
      handshake: this.handshake.bind(this),
    });
    this.pool = createPool(this.connectionFactory, {
      testOnBorrow: true,
      acquireTimeoutMillis: 5000,
      idleTimeoutMillis: 10000,
      evictionRunIntervalMillis: 11000,
      min: 1,
      max: options.poolSize || 20,
      autostart: false,
    }).on("factoryCreateError", (e) => console.error(e));
  }

  static assertVersion(version: number): void {
    if (version !== FAKTORY_PROTOCOL_VERSION) {
      throw new Error(`
  Client / server version mismatch
  Client: ${FAKTORY_PROTOCOL_VERSION} Server: ${version}
`);
    }
  }

  /**
   * Explicitly opens a connection and then closes it to test connectivity.
   * Under normal circumstances you don't need to call this method as all of the
   * communication methods will check out a connection before executing. If a connection is
   * not available, one will be created. This method exists to ensure connection is possible
   * if you need to do so. You can think of this like {@link https://godoc.org/github.com/jmoiron/sqlx#MustConnect|sqlx#MustConnect}
   *
   * @return {Promise.<Client>} resolves when a connection is opened
   */
  async connect(): Promise<Client> {
    const conn = await this.connectionFactory.create();
    await this.connectionFactory.destroy(conn);
    return this;
  }

  /**
   * Closes the connection to the server
   * @return {Promise.<undefined>}
   */
  async close(): Promise<void> {
    await this.pool.drain();
    return this.pool.clear();
  }

  /**
   * Creates a new Job object to build a job payload
   * @param  {String}    jobtype name of the job function
   * @param  {...*} args    arguments to the job function
   * @return {Job}            a job builder with attached Client for PUSHing
   * @see  Job
   */
  job(jobtype: JobType, ...args: unknown[]): Job {
    const job = new Job(jobtype, this);
    job.args = args;
    return job;
  }

  handshake(conn: Connection, greeting: Greeting): Promise<string> {
    debug("handshake");

    Client.assertVersion(greeting.v);

    return conn.sendWithAssert(
      ["HELLO", encode(this.buildHello(greeting))],
      "OK"
    );
  }

  /**
   * builds a hello object for the server handshake
   * @param  {string} options.s: salt          the salt string from the server
   * @param  {number} options.i: iterations    the number of hash iterations to perform
   * @return {object}            the hello object to send back to the server
   * @private
   */
  buildHello({ s: salt, i: iterations }: Greeting): Hello {
    const hello: Hello = {
      hostname: hostname(),
      v: FAKTORY_PROTOCOL_VERSION,
    };

    if (this.wid) {
      hello.labels = this.labels;
      hello.pid = process.pid;
      hello.wid = this.wid;
    }

    if (salt && this.password) {
      hello.pwdhash = hash(this.password, salt, iterations);
    }

    return hello;
  }

  /**
   * Borrows a connection from the connection pool, forwards all arguments to
   * {@link Connection.send}, and checks the connection back into the pool when
   * the promise returned by the wrapped function is resolved or rejected.
   *
   * @param {...*} args arguments to {@link Connection.send}
   * @see Connection.send
   */
  send(command: Command): PromiseLike<string> {
    return this.pool.use((conn: Connection) => conn.send(command));
  }

  sendWithAssert(command: Command, assertion: string): PromiseLike<string> {
    return this.pool.use((conn: Connection) =>
      conn.sendWithAssert(command, assertion)
    );
  }

  /**
   * Fetches a job payload from the server from one of ...queues
   * @param  {...String} queues list of queues to pull a job from
   * @return {Promise.<object|null>}           a job payload if one is available, otherwise null
   */
  async fetch(...queues: string[]): Promise<JobPayload | null> {
    const response = await this.send(["FETCH", ...queues]);
    return JSON.parse(response);
  }

  /**
   * Sends a heartbeat for this.wid to the server
   * @return {Promise.<string>} string 'OK' when the heartbeat is accepted, otherwise
   *                           may return a state string when the server has a signal
   *                           to send this client (`quiet`, `terminate`)
   */
  async beat(): Promise<string> {
    heartDebug("BEAT");
    const response = await this.send(["BEAT", encode({ wid: this.wid })]);
    if (response[0] === "{") {
      return JSON.parse(response).state;
    }
    return response;
  }

  /**
   * Pushes a job payload to the server
   * @param  {Job|Object} job job payload to push
   * @return {Promise.<string>}         the jid for the pushed job
   */
  async push(job: Job | PartialJobPayload): Promise<string> {
    const payload = toJobPayloadWithDefaults(job);
    await this.sendWithAssert(["PUSH", encode(payload)], "OK");
    return payload.jid;
  }

  /**
   * Pushes multiple jobs to the server and return map containing failed job submissions if any
   * @param  {Array<Job>|Array<Object>} jobs jobs payload to push
   * @return {Promise<RejectedJobsFromPushBulk>}  response from the faktory server
   */
  async pushBulk(
    jobs: Array<Job | PartialJobPayload>
  ): Promise<RejectedJobsFromPushBulk> {
    if (jobs.length > BULK_SIZE_WARN_THRESHOLD) {
      console.warn(`[WARN] The maximum recommended pushBulk array size is ~1000.
For the best performance, consider pushing ~1000 jobs at a time to the server.
`);
    }
    const index: Record<string, JobPayload> = {};
    jobs.forEach((job) => {
      const payload = toJobPayloadWithDefaults(job);
      index[payload.jid] = payload;
    });
    const response: Record<string, string> = JSON.parse(
      await this.send(["PUSHB", encode(Object.values(index))])
    );
    const rejected: RejectedJobsFromPushBulk = {};
    Object.keys(response).forEach((jid) => {
      rejected[jid] = {
        reason: response[jid],
        payload: index[jid],
      };
    });
    return rejected;
  }

  /**
   * Sends a FLUSH to the server
   * @return {Promise.<string>} resolves with the server's response text
   */
  async flush(): Promise<string> {
    return this.send(["FLUSH"]);
  }

  /**
   * Sends an INFO command to the server
   * @return {Promise.<object>} the server's INFO response object
   */
  async info(): Promise<ServerInfo> {
    return JSON.parse(await this.send(["INFO"]));
  }

  /**
   * Sends an ACK to the server for a particular job ID
   * @param  {String} jid the jid of the job to acknowledge
   * @return {Promise.<string>}     the server's response text
   */
  async ack(jid: string): Promise<string> {
    return this.sendWithAssert(["ACK", encode({ jid })], "OK");
  }

  /**
   * Sends a FAIL command to the server for a particular job ID with error information
   * @param  {String} jid the jid of the job to FAIL
   * @param  {Error} e   an error object that caused the job to fail
   * @return {Promise.<string>}     the server's response text
   */
  fail(jid: string, e: Error): PromiseLike<string> {
    return this.sendWithAssert(
      [
        "FAIL",
        encode({
          message: e.message,
          errtype: `${(e as NodeJS.ErrnoException).code}`,
          backtrace: (e.stack || "").split("\n").slice(0, 100),
          jid,
        }),
      ],
      "OK"
    );
  }

  get [RETRIES](): Mutation {
    const mutation = new Mutation(this);
    mutation.target = RETRIES;
    return mutation;
  }

  get [SCHEDULED](): Mutation {
    const mutation = new Mutation(this);
    mutation.target = SCHEDULED;
    return mutation;
  }

  get [DEAD](): Mutation {
    const mutation = new Mutation(this);
    mutation.target = DEAD;
    return mutation;
  }
}