IBM/node-celery-ts

View on GitHub
src/task.ts

Summary

Maintainability
B
5 hrs
Test Coverage
// BSD 3-Clause License
//
// Copyright (c) 2018, IBM Corporation
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice, this
//   list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
//   this list of conditions and the following disclaimer in the documentation
//   and/or other materials provided with the distribution.
//
// * Neither the name of the copyright holder nor the names of its
//   contributors may be used to endorse or promote products derived from
//   this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.

import { FailoverStrategy } from "./config";
import { MessageBroker } from "./message_broker";
import {
    CompressionMime,
    ContentEncodingMime,
    ContentTypeMime,
    TaskHeaders,
    TaskProperties,
} from "./messages";
import { NullBackend } from "./null_backend";
import * as Packer from "./packer";
import { Result } from "./result";
import { ResultBackend } from "./result_backend";

import { isNullOrUndefined } from "./utility";

import * as Os from "os";
import * as Process from "process";

import * as Uuid from "uuid";

/**
 * `Task` encapsulates functionality relating to Celery Tasks. The generic
 * parameter T is the return type of the task to be called.
 *
 * @see Client#createTask
 */
export class Task<T> {
    private readonly appId: string;
    private readonly backend: ResultBackend;
    private broker: MessageBroker;
    private readonly brokers: Array<MessageBroker>;
    private readonly deliveryMode: "persistent" | "transient" = "persistent";
    private readonly failoverStrategy: FailoverStrategy;
    private readonly name: string;
    private readonly queue: string = "celery";
    private readonly timeLimit: [number | null, number | null] = [null, null];

    /**
     * @param appId The UUID of the parent app.
     * @param backend The result backend to use.
     * @param brokers The message brokers to use.
     * @param deliveryMode The persistence of the task queue.
     * @param failoverStrategy A function to be called to select a new message
     *                         broker in the case of a failover.
     * @param hardTimeLimit Maximum number of seconds that a worker can spend
     *                      executing and cleaning up after this task. If
     *                      exceeded, the worker will be killed and replaced.
     * @param name The name of the task to call.
     * @param queue The queue to communicate with workers on.
     * @param softTimeLimit Maximum number of seconds that a worker can spend
     *                      executing this task. If exceeded, the worker will
     *                      raise an exception in the executing task, allowing
     *                      the task to clean up before returning.
     * @returns A `Task` that is ready to be applied.
     */
    public constructor({
        appId,
        backend,
        brokers,
        deliveryMode = "persistent",
        failoverStrategy,
        hardTimeLimit,
        name,
        queue = "celery",
        softTimeLimit,
    }: TaskOptions) {
        this.appId = appId;
        this.backend = backend;
        this.broker = failoverStrategy(brokers);
        this.brokers = brokers;
        this.deliveryMode = deliveryMode;
        this.failoverStrategy = failoverStrategy;
        this.name = name;
        this.queue = queue;
        this.timeLimit = Task.getTimeLimit({
            hard: hardTimeLimit,
            soft: softTimeLimit,
        });
    }

    /**
     * @param args The positional arguments to invoke the task with.
     * @param compression The method of compression to apply to the task body.
     * @param eta The earliest time that this task should be invoked.
     * @param expires The latest time that this task should be invoke.
     * @param ignoreResult If true, no result backend will be used to store the
     *                     result of invoking this task.
     * @param kwargs The keyword arguments to invoke the task with.
     * @param priority The priority of this task. Must be in the range [0, 255].
     *                 For RabbitMQ message brokers, 255 is the highest
     *                 priority. For Redis message brokers, 0 is the highest
     *                 priority. Other message brokers do not support priority.
     * @param queue The name of the direct exchange to send this task to.
     * @param serializer The serializer to transform the task body into
     *                   a UTF-8 encoded string.
     * @returns A `Result` object. If `ignoreResult` is true or the client was
     *          created with a `NullBackend`, `Result#get` cannot be invoked.
     */
    public applyAsync({
        args,
        compression = Packer.Compressor.Identity,
        eta,
        expires,
        ignoreResult = false,
        kwargs,
        priority = 0,
        queue = this.queue,
        serializer = Packer.Serializer.Json,
    }: TaskApplyOptions): Result<T> {
        const backend = (() => {
            if (ignoreResult) {
                return new NullBackend();
            }

            return this.backend;
        })();

        const id = Uuid.v4();
        const result = new Result<T>(id, backend);

        const [packer, encoding] = Task.createPacker(serializer, compression);
        const body = Task.packBody({ args, kwargs, packer });

        const etaStr = Task.dateOrNull(eta);
        const expiresStr = Task.dateOrNull(expires);

        const publishOptions = {
            body,
            "content-encoding": ContentEncodingMime.Utf8,
            "content-type": Task.getContentTypeMime(serializer),
            headers: this.createHeaders({
                args,
                compression,
                eta: etaStr,
                expires: expiresStr,
                id,
                kwargs,
            }),
            properties: this.createProperties({
                deliveryMode: this.getDeliveryMode(),
                encoding,
                id,
                priority,
                queue,
            }),
        };

        const tryPublish = async (): Promise<string> => {
            try {
                return this.broker.publish(publishOptions);
            } catch {
                this.broker = this.failoverStrategy(this.brokers);

                return tryPublish();
            }
        };

        tryPublish();

        return result;
    }

    /**
     * @param date If `undefined`, will return `null`.
     * @returns An ISO 8601 date string or `null`.
     */
    private static dateOrNull(date?: Date): string | null {
        if (isNullOrUndefined(date)) {
            return null;
        }

        return date.toISOString();
    }

    /**
     * @param soft If `undefined`, will return `null` in place of the soft time
     *             limit.
     * @param hard If `undefined`, will return `null` in place of the hard time
     *             limit.
     * @returns A [soft, hard] tuple.
     */
    private static getTimeLimit({ soft, hard }: {
        soft?: number;
        hard?: number;
    }): [number | null, number | null] {
        if (!isNullOrUndefined(soft) && !isNullOrUndefined(hard)) {
            return [soft, hard];
        } else if (!isNullOrUndefined(soft)) {
            return [soft, null];
        } else if (!isNullOrUndefined(hard)) {
            return [null, hard];
        }

        return [null, null];
    }

    /**
     * @returns 1 if transient, 2 otherwise (if persistent).
     */
    private getDeliveryMode(): 1 | 2 {
        if (this.deliveryMode === "transient") {
            return 1;
        }

        return 2;
    }

    /**
     * If gzip compression is selected, will use zlib compression. This is a
     * quirk of Celery. https://github.com/celery/celery/issues/4917
     *
     * @param serializer The serialization method to use.
     * @param compressor The compression method to use. If the not "identity"
     *                   will use Base64 encoding. Will use plaintext (UTF-8)
     *                   encoding otherwise.
     * @returns A `Packer` with the requested serializer and compressor and the
     *          autodetected encoding.
     */
    private static createPacker(
        serializer: Packer.Serializer,
        compressor: Packer.Compressor
    ): [Packer.Packer, Packer.Encoder] {
        const encoder = Task.selectEncoder(compressor);

        const realCompressor = (() => {
            // this is the behavior of the Python client
            if (compressor === Packer.Compressor.Gzip) {
                return Packer.Compressor.Zlib;
            }

            return compressor;
        })();

        return [
            Packer.createPacker({
                compressor: realCompressor,
                encoder,
                serializer,
            }),
            encoder
        ];
    }

    /**
     * @param args The positional arguments to pack into the body.
     * @param kwargs The keyword arguments to pack into the body.
     * @param packer The packing strategy to use to create a task message body.
     * @returns The serialized, compressed, and encoded body of a task message.
     */
    private static packBody({ args, kwargs, packer }: {
        args: Args;
        kwargs: KeywordArgs;
        packer: Packer.Packer;
    }): string {
        return packer.pack([
            args,
            kwargs,
            { callbacks: null, chain: null, chord: null, errbacks: null },
        ]);
    }

    /**
     * Behavior is undefined if a serializer that isn't JSON or YAML is passed.
     *
     * @param serializer The string to convert into a MIME type.
     * @returns The matching MIME type.
     */
    private static getContentTypeMime(
        serializer: Packer.Serializer
    ): ContentTypeMime {
        switch (serializer) {
            case Packer.Serializer.Json: return ContentTypeMime.Json;
            case Packer.Serializer.Yaml: return ContentTypeMime.Yaml;
        }
    }

    /**
     * @returns The origin of this task - pid@hostname.
     */
    private static getOrigin(): string {
        return `${Process.pid}@${Os.hostname()}`;
    }

    /**
     * If `compression` is `"identity"`, there will not be a corresponding
     * field in the headers returned.
     *
     * @param args The positional arguments to pack into the headers.
     * @param compression The compression type to indicate.
     * @param eta The earliest time this task should be executed as an ISO 8601
     *            date string.
     * @param expires The latest time this task should be executed as an ISO
     *                8601 date string.
     * @param kwargs The keyword arguments to pack into the headers.
     * @param id The UUID of this task.
     * @returns The headers of a task message to be queued.
     */
    private createHeaders({ args, compression, eta, expires, kwargs, id }: {
        args: Args;
        compression: Packer.Compressor;
        eta: string | null;
        expires: string | null;
        id: string;
        kwargs: KeywordArgs;
    }): TaskHeaders {
        const base: TaskHeaders = {
            argsrepr: JSON.stringify(args),
            eta,
            expires,
            group: null,
            id,
            kwargsrepr: JSON.stringify(kwargs),
            lang: "py",
            origin: Task.getOrigin(),
            parent_id: null,
            retries: 0,
            root_id: id,
            shadow: null,
            task: this.name,
            timelimit: this.timeLimit,
        };

        if (compression === Packer.Compressor.Identity) {
            return base;
        }

        // both zlib and gzip map to using zlib with application/x-gzip MIME
        return {
            ...base,
            compression: CompressionMime.Zlib,
        };
    }

    /**
     * @param deliveryMode If 1, transient. If 2, persistent.
     * @param encoding The encoding type to indicate in the task message's
     *                 properties.
     * @param id The UUID of this task.
     * @param priority The priority of this task.
     * @param queue The queue to send this message to. Only direct exchanges
     *              are supported, so this is also the routing key of the
     *              exchange.
     * @returns The properties of a task message.
     */
    private createProperties({
        deliveryMode,
        encoding,
        id,
        priority,
        queue,
    }: {
        deliveryMode: 1 | 2;
        encoding: Packer.Encoder;
        id: string;
        priority: Priority;
        queue: string;
    }): TaskProperties {
        return {
            body_encoding: Task.getEncodingMime(encoding),
            correlation_id: id,
            delivery_info: {
                exchange: "",
                routing_key: queue,
            },
            delivery_mode: deliveryMode,
            delivery_tag: "celery",
            priority,
            reply_to: this.appId,
        };
    }

    /**
     * @param compressor The compression method to infer encoding from.
     * @returns The encoding to be used for creating task messages.
     */
    private static selectEncoder(
        compressor: Packer.Compressor
    ): Packer.Encoder {
        if (compressor === Packer.Compressor.Identity) {
            return Packer.Encoder.Plaintext;
        }

        return Packer.Encoder.Base64;
    }

    /**
     * @param encoding The encoding type to be converted into a MIME type.
     * @returns Base64 if base-64, UTF-8 if plaintext.
     */
    private static getEncodingMime(
        encoding: Packer.Encoder
    ): ContentEncodingMime {
        switch (encoding) {
            case Packer.Encoder.Base64: return ContentEncodingMime.Base64;
            case Packer.Encoder.Plaintext: return ContentEncodingMime.Utf8;
        }
    }
}

/**
 * Task creation options.
 */
export interface TaskOptions {
    appId: string;
    backend: ResultBackend;
    brokers: Array<MessageBroker>;
    deliveryMode?: "persistent" | "transient";
    failoverStrategy: FailoverStrategy;
    hardTimeLimit?: number;
    name: string;
    queue?: string;
    softTimeLimit?: number;
}

/**
 * Task invocation options.
 */
export interface TaskApplyOptions {
    args: Args;
    compression?: Packer.Compressor;
    eta?: Date;
    expires?: Date;
    ignoreResult?: boolean;
    kwargs: KeywordArgs;
    priority?: Priority;
    queue?: string;
    serializer?: Packer.Serializer;
}

export type Args = Array<any>;

export interface KeywordArgs {
    [key: string]: any | undefined;
}

export type Priority = number;