IBM/node-celery-ts

View on GitHub
src/amqp/broker.ts

Summary

Maintainability
A
0 mins
Test Coverage
// BSD 3-Clause License
//
// Copyright (c) 2018, IBM Corporation
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice, this
//   list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
//   this list of conditions and the following disclaimer in the documentation
//   and/or other materials provided with the distribution.
//
// * Neither the name of the copyright holder nor the names of its
//   contributors may be used to endorse or promote products derived from
//   this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.

import { AmqpOptions, DEFAULT_AMQP_OPTIONS } from "./options";

import { ResourcePool } from "../containers";
import { MessageBroker } from "../message_broker";
import { TaskMessage } from "../messages";
import { isNullOrUndefined, promisifyEvent } from "../utility";

import * as AmqpLib from "amqplib";

/**
 * `AmqpBroker` implements `MessageBroker` using the `RabbitMQ` message broker.
 * Messages are, by default, durable, and will survive a broker restart.
 */
export class AmqpBroker implements MessageBroker {
    private channels: ResourcePool<AmqpLib.Channel>;
    private readonly connection: Promise<AmqpLib.Connection>;
    private readonly options: AmqpOptions;

    /**
     * Constructs an `AmqpBroker` with the given options.
     *
     * @param options The configuration of the connection to make. If
     *                `undefined`, will connect to RabbitMQ at localhost:6379.
     * @returns An `AmqpBroker` connected using the specified configuration.
     */
    public constructor(options?: AmqpOptions) {
        this.options = (() => {
            if (isNullOrUndefined(options)) {
                return DEFAULT_AMQP_OPTIONS;
            }

            return options;
        })();

        this.connection = Promise.resolve(AmqpLib.connect(this.options));

        this.channels = new ResourcePool(
            async () => {
                const connection = await this.connection;

                return connection.createChannel();
            },
            async (channel) => {
                await channel.close();

                return "closed";
            },
            2,
        );
    }

    /**
     * Disconnects from the RabbitMQ node. Only call once.
     * Alias for #end().
     *
     * @returns A `Promise` that resolves once the connection is closed.
     *
     * @see #end
     */
    public async disconnect(): Promise<void> {
        await this.end();
    }

    /**
     * Disconnects from the RabbitMQ node. Only call once.
     *
     * @returns A `Promise` that resolves once the connection is closed.
     */
    public async end(): Promise<void> {
        await this.channels.destroyAll();

        const connection = await this.connection;
        await connection.close();
    }

    /**
     * Queues a message onto the requested exchange.
     *
     * @param message The message to be published. Used to determine the
     *                publishing options.
     * @returns A `Promise` that resolves to `"flushed to write buffer"`.
     */
    public async publish(message: TaskMessage): Promise<string> {
        const exchange = message.properties.delivery_info.exchange;
        const routingKey = message.properties.delivery_info.routing_key;

        const body = AmqpBroker.getBody(message);
        const options = AmqpBroker.getPublishOptions(message);

        return this.channels.use(async (channel) => {
            await AmqpBroker.assert({ channel, exchange, routingKey });

            return AmqpBroker.doPublish({
                body,
                channel,
                exchange,
                options,
                routingKey
            });
        });
    }

    /**
     * Converts a task message's body into a `Buffer`.
     */
    private static getBody(message: TaskMessage): Buffer {
        return Buffer.from(message.body, message.properties.body_encoding);
    }

    /**
     * @param message The message to extract options from.
     * @returns The options that the message should be published with.
     */
    private static getPublishOptions(
        message: TaskMessage
    ): AmqpLib.Options.Publish {
        return {
            contentEncoding: message["content-encoding"],
            contentType: message["content-type"],
            correlationId: message.properties.correlation_id,
            deliveryMode: message.properties.delivery_mode,
            headers: message.headers,
            priority: message.properties.priority,
            replyTo: message.properties.reply_to,
        };
    }

    /**
     * Uses `AmqpLib.Channel#assertQueue`.
     *
     * @param channel The channel to make assertions with.
     * @param exchange The exchange to assert.
     * @param routingKey The queue to assert.
     * @returns A `Promise` that resolves when the assertions are complete.
     */
    private static async assert({ channel, exchange, routingKey }: {
        channel: AmqpLib.Channel;
        exchange: string;
        routingKey: string;
    }): Promise<void> {
        const assertion = channel.assertQueue(routingKey);

        // cannot assert default exchange
        if (exchange === "") {
            await assertion;
        } else {
            await Promise.all([
                assertion,
                channel.assertExchange(exchange, "direct"),
            ]);
        }
    }

    /**
     * Uses `AmqpLib.Channel#publish`. If the write buffer is full, recursively
     * calls itself after a `"drain"` event is emitted until the write is
     * performed.
     *
     * @param body The body of the message to publish.
     * @param channel The channel to publish with.
     * @param exchange The exchange to publish to.
     * @param options The options to forward to `amqplib`.
     * @param routingKey The key to route by. If using a direct exchange, this
     *                   is the queue to route to.
     * @returns The response from the RabbitMQ node.
     */
    private static async doPublish({
        body,
        channel,
        exchange,
        options,
        routingKey
    }: {
        body: Buffer;
        channel: AmqpLib.Channel;
        exchange: string;
        options: AmqpLib.Options.Publish;
        routingKey: string;
    }): Promise<string> {
        const publish = () => channel.publish(
            exchange,
            routingKey,
            body,
            options
        );

        while (!publish()) {
            await promisifyEvent<void>(channel, "drain");
        }

        return "flushed to write buffer";
    }
}