IBM/node-celery-ts

View on GitHub
src/containers/resource_pool.ts

Summary

Maintainability
A
0 mins
Test Coverage
// BSD 3-Clause License
//
// Copyright (c) 2018, IBM Corporation
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice, this
//   list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
//   this list of conditions and the following disclaimer in the documentation
//   and/or other materials provided with the distribution.
//
// * Neither the name of the copyright holder nor the names of its
//   contributors may be used to endorse or promote products derived from
//   this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.

import { List } from "./list";
import { PromiseQueue } from "./promise_queue";

import { promisifyEvent } from "../utility";

import * as Events from "events";

/**
 * Represents a generic resource pool. If no resources are available, will
 * create and register a new one. Uses a doubly-linked list as a FIFO queue for
 * unused resources and a Set for in-use resources.
 */
export class ResourcePool<T> {
    private readonly create: () => T | PromiseLike<T>;
    private readonly destroy: (resource: T) => string | PromiseLike<string>;
    private readonly emptyEmitter: Events.EventEmitter;
    private inUse: Set<T>;
    private readonly maxResources: number;
    private unused: List<T>;
    private resourceCount: number = 0;
    private waiting: PromiseQueue<T>;

    /**
     * @param create Function to be called when a new resource must be created.
     * @param destroy Function to be called when a resource must be destroyed.
     * @returns An empty ResourcePool.
     */
    public constructor(
        create: () => T | PromiseLike<T>,
        destroy: (resource: T) => string | PromiseLike<string>,
        maxResources: number,
    ) {
        this.create = create;
        this.destroy = destroy;
        this.maxResources = maxResources;

        this.emptyEmitter = new Events.EventEmitter();

        this.inUse = new Set<T>();
        this.unused = new List<T>();
        this.waiting = new PromiseQueue<T>();
    }

    /**
     * @returns The number of resources owned by this pool.
     */
    public numOwned(): number {
        return this.resourceCount;
    }

    /**
     * Resources that are pending are counted here.
     *
     * @returns The number of in-use resources owned by this pool.
     */
    public numInUse(): number {
        return this.resourceCount - this.unused.length;
    }

    /**
     * @returns The number of unused resources owned by this pool.
     */
    public numUnused(): number {
        return this.unused.length;
    }

    /**
     * Gets a resource, invokes `f` with it, then returns it to the pool.
     *
     * @param f A function that accepts a resource and returns some value.
     * @returns A `Promise` that follows the result of invoking `f`.
     */
    public async use<U>(f: (resource: T) => U | PromiseLike<U>): Promise<U> {
        const resource = await this.get();

        try {
            const result = f(resource);

            return this.returnAfter(result, resource);
        } catch (error) {
            return this.returnAfter(Promise.reject(error), resource);
        }
    }

    /**
     * @returns An unused resource from the pool. If none are available, one
     *          will be created.
     */
    public async get(): Promise<T> {
        const resource = await this.getResource();
        this.inUse.add(resource);

        return resource;
    }

    /**
     * @param promise The `Promise` to settle before returning the resource.
     * @param resource The resource to return after `promise` settles.
     */
    public async returnAfter<U>(
        promise: U | PromiseLike<U>,
        resource: T
    ): Promise<U> {
        // unsure why this doesn't work with async/await syntax

        return Promise.resolve(promise)
            .then((value) => {
                this.return(resource);

                return value;
            }).catch((reason) => {
                this.return(resource);

                return Promise.reject(reason);
            });
    }

    /**
     * @param resource The resource to return to the pool.
     * @throws Error If the resource does not belong to this pool.
     */
    public return(resource: T): void {
        if (!this.inUse.delete(resource)) {
            throw new Error("resource does not belong to this pool");
        }

        if (this.waiting.resolveOne(resource)) {
            return;
        }

        if (this.inUse.size === 0) {
            this.emptyEmitter.emit("empty");
        }

        this.unused.push(resource);
    }

    /**
     * @returns A Promise that resolves to the response of each call to
     *          destroy() after all resources have been returned.
     */
    public async destroyAll(): Promise<Array<string>> {
        if (this.inUse.size !== 0) {
            await promisifyEvent(this.emptyEmitter, "empty");
        }

        const connections = Array.from(this.unused);
        this.unused = new List<T>();

        const replies = connections.map((resource) =>
            this.destroy(resource)
        );

        return Promise.all(replies);
    }

    /**
     * @returns A resource. Will be created if all owned resources are in use.
     */
    private async getResource(): Promise<T> {
        const maybeUnused = this.unused.shift();

        if (typeof maybeUnused !== "undefined") {
            return maybeUnused;
        } else if (this.resourceCount < this.maxResources) {
            ++this.resourceCount;

            return Promise.resolve(this.create());
        }

        return this.waiting.push();
    }
}