PruvoNet/squiss-ts

View on GitHub
src/Squiss.ts

Summary

Maintainability
D
2 days
Test Coverage
A
100%
import {EventEmitter} from 'events';
import {Message} from './Message';
import {ITimeoutExtenderOptions, TimeoutExtender} from './TimeoutExtender';
import {createMessageAttributes, IMessageAttributes} from './attributeUtils';
import {isString} from 'ts-type-guards';
import {    ReceiveMessageCommandInput,
    BatchResultErrorEntry,
    SendMessageBatchRequestEntry,
    SendMessageBatchResult,
    SendMessageBatchResultEntry,
    SendMessageBatchCommandInput,
    SendMessageBatchCommandOutput,
    SendMessageCommandInput,
    SQSServiceException,
    ReceiveMessageCommandOutput,
    SQS, CreateQueueCommandInput,
    GetQueueUrlCommandInput,
    SendMessageCommandOutput,
    Message as SQSMessage,DeleteMessageBatchCommandOutput
} from '@aws-sdk/client-sqs'
import {S3} from '@aws-sdk/client-s3'
import {GZIP_MARKER, compressMessage} from './gzipUtils';
import {S3_MARKER, uploadBlob} from './s3Utils';
import {getMessageSize} from './messageSizeUtils';
import {
    IMessageToSend, ISendMessageRequest,
    IDeleteQueueItem, IDeleteQueueItemById, optDefaults, SquissEmitter, ISquissOptions
} from './Types';
import {removeEmptyKeys} from './Utils';
import { URL } from 'url';
import {Endpoint, EndpointV2} from '@aws-sdk/types';

const AWS_MAX_SEND_BATCH = 10;

export class Squiss extends (EventEmitter as new() => SquissEmitter) {

    public get inFlight(): number {
        return this._inFlight;
    }

    public get running(): boolean {
        return this._running;
    }

    public sqs: SQS;
    public _timeoutExtender: TimeoutExtender | undefined;
    public _opts: ISquissOptions;
    private _s3?: S3;
    private _running = false;
    private _paused = true;
    private _inFlight = 0;
    private _queueVisibilityTimeout = 0;
    private _queueMaximumMessageSize = 0;
    private _queueUrl: string;
    private _delQueue = new Map<string, IDeleteQueueItem>();
    private _delTimer: any;
    private _activeReq: AbortController | undefined;

    constructor(opts?: ISquissOptions | undefined) {
        super();
        this._opts = Object.assign({}, optDefaults, opts || {});
        this._initOpts();
        this._queueUrl = this._opts.queueUrl || '';
        this.sqs = this._initSqs();
    }

    public changeMessageVisibility(msg: Message | string, timeoutInSeconds: number): Promise<void> {
        let receiptHandle: string;
        if (msg instanceof Message) {
            receiptHandle = msg.raw.ReceiptHandle!;
        } else {
            receiptHandle = msg;
        }
        return this.getQueueUrl()
            .then((queueUrl) => {
                    return this.sqs.changeMessageVisibility({
                        QueueUrl: queueUrl,
                        ReceiptHandle: receiptHandle,
                        VisibilityTimeout: timeoutInSeconds,
                    });
                }
            )
            .then(() => {
                return Promise.resolve();
            });
    }

    public createQueue(): Promise<string> {
        if (!this._opts.queueName) {
            return Promise.reject(new Error('Squiss was not instantiated with a queueName'));
        }
        const params: CreateQueueCommandInput = {
            QueueName: this._opts.queueName,
            Attributes: {
                ReceiveMessageWaitTimeSeconds: this._opts.receiveWaitTimeSecs!.toString(),
                DelaySeconds: this._opts.delaySecs!.toString(),
                MaximumMessageSize: this._opts.maxMessageBytes!.toString(),
                MessageRetentionPeriod: this._opts.messageRetentionSecs!.toString(),
            },
        };
        if (this._opts.visibilityTimeoutSecs) {
            params.Attributes!.VisibilityTimeout = this._opts.visibilityTimeoutSecs.toString();
        }
        if (this._opts.queuePolicy) {
            params.Attributes!.Policy = this._opts.queuePolicy;
        }
        return this.sqs.createQueue(params).then((res) => {
            this._queueUrl = res.QueueUrl!;
            return res.QueueUrl!;
        });
    }

    public async deleteMessage(msg: Message): Promise<void> {
        if (!msg.raw) {
            throw new Error('Squiss.deleteMessage requires a Message object');
        }
        const promise = new Promise<void>((resolve, reject) => {
            this._delQueue.set(msg.raw.MessageId!,
                {msg, Id: msg.raw.MessageId!, ReceiptHandle: msg.raw.ReceiptHandle!, resolve, reject});
        });
        msg.emit('delQueued');
        this.emit('delQueued', msg);
        this.handledMessage(msg);
        if (this._delQueue.size >= this._opts.deleteBatchSize!) {
            if (this._delTimer) {
                clearTimeout(this._delTimer);
                this._delTimer = undefined;
            }
            await this._deleteXMessages(this._opts.deleteBatchSize);
        } else if (!this._delTimer) {
            this._delTimer = setTimeout(async () => {
                this._delTimer = undefined;
                await this._deleteXMessages();
            }, this._opts.deleteWaitMs);
        }
        return promise;
    }

    public deleteQueue(): Promise<void> {
        return this.getQueueUrl()
            .then((queueUrl) => {
                return this.sqs.deleteQueue({QueueUrl: queueUrl});
            })
            .then(() => {
                return Promise.resolve();
            });
    }

    public getQueueUrl(): Promise<string> {
        if (this._queueUrl) {
            return Promise.resolve(this._queueUrl);
        }
        const params: GetQueueUrlCommandInput = {QueueName: this._opts.queueName!};
        if (this._opts.accountNumber) {
            params.QueueOwnerAWSAccountId = this._opts.accountNumber.toString();
        }
        return this.sqs.getQueueUrl(params).then(async (data) => {
            this._queueUrl = data.QueueUrl!;
            if (this._opts.correctQueueUrl) {
                let newUrl: URL | undefined;
                const endpoint = this.sqs.config.endpoint;
                /* istanbul ignore else  */
                if (typeof endpoint === 'string') {
                    newUrl = new URL(endpoint);
                } else if (typeof endpoint === 'function') {
                    const retrievedEndpoint = await endpoint();
                    if (retrievedEndpoint && typeof retrievedEndpoint === 'object') {
                        if ('url' in retrievedEndpoint) {
                            newUrl = new URL((retrievedEndpoint as EndpointV2).url.toString());
                        }
                        if ('hostname' in retrievedEndpoint) {
                            const { protocol, hostname, port, path } = retrievedEndpoint as Endpoint;
                            // query params are ignored in setting endpoint.
                            newUrl = new URL(`${protocol}//${hostname}${port ? ':' + port : ''}${path}`);
                        }
                    }
                }
                /* istanbul ignore if */
                if (!newUrl) {
                    throw new Error('Failed to get configured SQQ endpoint');
                }
                const parsedQueueUrl = new URL(this._queueUrl);
                newUrl.pathname = parsedQueueUrl.pathname;
                this._queueUrl = newUrl.toString();
            }
            return this._queueUrl;
        });
    }

    public getQueueVisibilityTimeout(): Promise<number> {
        if (this._queueVisibilityTimeout) {
            return Promise.resolve(this._queueVisibilityTimeout);
        }
        return this.getQueueUrl().then((queueUrl) => {
            return this.sqs.getQueueAttributes({
                AttributeNames: ['VisibilityTimeout'],
                QueueUrl: queueUrl,
            });
        }).then((res) => {
            if (!res.Attributes || !res.Attributes.VisibilityTimeout) {
                throw new Error('SQS.GetQueueAttributes call did not return expected shape. Response: ' +
                    JSON.stringify(res));
            }
            this._queueVisibilityTimeout = parseInt(res.Attributes.VisibilityTimeout, 10);
            return this._queueVisibilityTimeout;
        });
    }

    public getQueueMaximumMessageSize(): Promise<number> {
        if (this._queueMaximumMessageSize) {
            return Promise.resolve(this._queueMaximumMessageSize);
        }
        return this.getQueueUrl().then((queueUrl) => {
            return this.sqs.getQueueAttributes({
                AttributeNames: ['MaximumMessageSize'],
                QueueUrl: queueUrl,
            });
        }).then((res) => {
            if (!res.Attributes || !res.Attributes.MaximumMessageSize) {
                throw new Error('SQS.GetQueueAttributes call did not return expected shape. Response: ' +
                    JSON.stringify(res));
            }
            this._queueMaximumMessageSize = parseInt(res.Attributes.MaximumMessageSize, 10);
            return this._queueMaximumMessageSize;
        });
    }

    public handledMessage(msg: Message): void {
        this._inFlight--;
        if (this._paused && this._slotsAvailable()) {
            this._paused = false;
            this._startPoller()
                .catch((e: Error) => {
                    this.emit('error', e);
                });
        }
        msg.emit('handled');
        this.emit('handled', msg);
        if (!this._inFlight) {
            this.emit('drained');
        }
    }

    public releaseMessage(msg: Message): Promise<void> {
        this.handledMessage(msg);
        return this.changeMessageVisibility(msg, 0)
            .then((res) => {
                msg.emit('released');
                this.emit('released', msg);
                return res;
            });
    }

    public purgeQueue(): Promise<void> {
        return this.getQueueUrl()
            .then((queueUrl) => {
                return this.sqs.purgeQueue({QueueUrl: queueUrl});
            })
            .then(() => {
                this._inFlight = 0;
                this._delQueue = new Map();
                this._delTimer = undefined;
                return Promise.resolve();
            });
    }

    public sendMessage(message: IMessageToSend, delay?: number, attributes?: IMessageAttributes)
        : Promise<SendMessageCommandOutput> {
        return Promise.all([
            this._prepareMessageRequest(message, delay, attributes),
            this.getQueueUrl(),
        ])
            .then((data) => {
                const rawParams = data[0];
                const queueUrl = data[1];
                const params: SendMessageCommandInput = {
                    QueueUrl: queueUrl,
                    ...rawParams,
                };
                return this.sqs.sendMessage(params);
            });
    }

    public sendMessages(messages: IMessageToSend[] | IMessageToSend, delay?: number,
                        attributes?: IMessageAttributes | IMessageAttributes[])
        : Promise<SendMessageBatchResult> {
        return this.getQueueMaximumMessageSize()
            .then((queueMaximumMessageSize) => {
                return this._prepareMessagesToSend(messages, queueMaximumMessageSize, delay, attributes);
            })
            .then((batches) => {
                return Promise.all(batches.map((batch, idx) => {
                    return this._sendMessageBatch(batch, delay, idx * AWS_MAX_SEND_BATCH);
                }));
            })
            .then((results) => {
                const successful: SendMessageBatchResultEntry[] = [];
                const failed: BatchResultErrorEntry[] = [];
                results.forEach((res) => {
                    res.Successful?.forEach((elem) => successful.push(elem));
                    res.Failed?.forEach((elem) => failed.push(elem));
                });
                return {Successful: successful, Failed: failed};
            });
    }

    public start(): Promise<void> {
        if (this._running) {
            return Promise.resolve();
        }
        this._running = true;
        return this._startPoller();
    }

    public async stop(soft?: boolean, timeout?: number): Promise<boolean> {
        if (!soft && this._activeReq) {
            this._activeReq.abort();
        }
        this._running = this._paused = false;
        if (!this._inFlight) {
            await this._drainDeleteQueue();
            return true;
        }
        let resolved = false;
        let timer: any;
        const result = await new Promise<boolean>(async (resolve) => {
            this.on('drained',() => {
                if (!resolved) {
                    resolved = true;
                    if (timer) {
                        clearTimeout(timer);
                        timer = undefined;
                    }
                    resolve(true);
                }
            });
            timer = timeout ? setTimeout(() => {
                resolved = true;
                resolve(false);
            }, timeout) : undefined;
        });
        await this._drainDeleteQueue();
        return result;
    }

    public getS3(): S3 {
        if (!this._s3) {
            this._s3 = this._initS3();
        }
        return this._s3;
    }

    private async _drainDeleteQueue(): Promise<void> {
        if (this._delTimer) {
            clearTimeout(this._delTimer);
            this._delTimer = undefined;
        }
        await this._deleteXMessages();
    }

    private _initS3() {
        if (this._opts.S3) {
            if (typeof this._opts.S3 === 'function') {
                return new this._opts.S3(this._opts.awsConfig || {});
            } else {
                return this._opts.S3;
            }
        } else {
            /* istanbul ignore next */
            return new S3(this._opts.awsConfig || {});
        }
    }

    private _initOpts() {
        if (!this._opts.queueUrl && !this._opts.queueName) {
            throw new Error('Squiss requires either the "queueUrl", or the "queueName".');
        }
        if (this._opts.s3Fallback && !this._opts.s3Bucket) {
            throw new Error('Squiss requires "s3Bucket" to be defined is using s3 fallback');
        }
        this._opts.deleteBatchSize = Math.min(this._opts.deleteBatchSize!, 10);
        this._opts.receiveBatchSize = Math.min(this._opts.receiveBatchSize!,
            this._opts.maxInFlight! > 0 ? this._opts.maxInFlight! : 10, 10);
        this._opts.minReceiveBatchSize = Math.min(this._opts.minReceiveBatchSize!, this._opts.receiveBatchSize);
    }

    private async _deleteMessages(batch: IDeleteQueueItem[]): Promise<void> {
        if (batch.length === 0) {
            return;
        }
        return this.getQueueUrl().then((queueUrl) => {
            return this.sqs.deleteMessageBatch({
                QueueUrl: queueUrl,
                Entries: batch.map((item) => {
                    return {
                        Id: item.Id,
                        ReceiptHandle: item.ReceiptHandle,
                    };
                }),
            });
        }).then(this._handleBatchDeleteResults(batch))
            .catch((err: Error) => {
                this.emit('error', err);
            });
    }

    private _emitMessages(messages: SQSMessage[]): void {
        messages.forEach((msg) => {
            const message = new Message({
                squiss: this,
                unwrapSns: this._opts.unwrapSns,
                bodyFormat: this._opts.bodyFormat,
                msg,
                s3Retriever: this.getS3.bind(this),
                s3Retain: this._opts.s3Retain || false,
            });
            this._inFlight++;
            message.parse()
                .then(() => {
                    this.emit('message', message);
                })
                .catch((e: Error) => {
                    this.emit('error', e);
                    message.release();
                });
        });
    }

    private _getBatch(queueUrl: string): void {
        if (this._activeReq || !this._running) {
            return;
        }
        const maxMessagesToGet = this._getMaxMessagesToGet();
        if (maxMessagesToGet < this._opts.minReceiveBatchSize!) {
            this._paused = true;
            return;
        }
        const params: ReceiveMessageCommandInput = removeEmptyKeys({
            QueueUrl: queueUrl, MaxNumberOfMessages: maxMessagesToGet,
            WaitTimeSeconds: this._opts.receiveWaitTimeSecs,
            MessageAttributeNames: this._opts.receiveAttributes,
            AttributeNames: this._opts.receiveSqsAttributes,
            VisibilityTimeout: this._opts.visibilityTimeoutSecs,
        });
        const controller = new AbortController();
        this._activeReq = controller;
        this.sqs.receiveMessage(params, {
            abortSignal: controller.signal,
        }).then(this._handleGetBatchResult(queueUrl)).catch((err: SQSServiceException) => {
            this._activeReq = undefined;
            if (err.name === 'AbortError') {
                this.emit('aborted', err);
            } else {
                setTimeout(this._getBatch.bind(this, queueUrl), this._opts.pollRetryMs);
                this.emit('error', err);
            }
        });
    }

    private _initTimeoutExtender(): Promise<void> {
        if (!this._opts.autoExtendTimeout || this._timeoutExtender) {
            return Promise.resolve();
        }
        return Promise.resolve().then(() => {
            if (this._opts.visibilityTimeoutSecs) {
                return this._opts.visibilityTimeoutSecs;
            }
            return this.getQueueVisibilityTimeout();
        }).then((visibilityTimeoutSecs) => {
            const opts: ITimeoutExtenderOptions = {visibilityTimeoutSecs};
            if (this._opts.noExtensionsAfterSecs) {
                opts.noExtensionsAfterSecs = this._opts.noExtensionsAfterSecs;
            }
            if (this._opts.advancedCallMs) {
                opts.advancedCallMs = this._opts.advancedCallMs;
            }
            this._timeoutExtender = new TimeoutExtender(this, opts);
        });
    }

    private _sendMessageBatch(messages: ISendMessageRequest[], delay: number | undefined, startIndex: number):
        Promise<SendMessageBatchCommandOutput> {
        const start = startIndex || 0;
        return this.getQueueUrl().then((queueUrl) => {
            const entries: SendMessageBatchRequestEntry[] = [];
            const params: SendMessageBatchCommandInput = {
                QueueUrl: queueUrl,
                Entries: entries,
            };
            const promises: Promise<void>[] = [];
            messages.forEach((msg, idx) => {
                const entry: SendMessageBatchRequestEntry = {
                    Id: (start + idx).toString(),
                    ...msg,
                };
                entries.push(entry);
            });
            return Promise.all(promises)
                .then(() => {
                    return this.sqs.sendMessageBatch(params);
                });
        });
    }

    private _slotsAvailable(): boolean {
        return !this._opts.maxInFlight || this._inFlight < this._opts.maxInFlight;
    }

    private _startPoller(): Promise<void> {
        return this._initTimeoutExtender()
            .then(() => this.getQueueUrl())
            .then((queueUrl): void => {
                this._getBatch(queueUrl);
            })
    }

    private async _deleteXMessages(x?: number): Promise<void> {
        const delQueue = this._delQueue;
        const iterator = delQueue.entries();
        const delBatch = Array.from({length: x || delQueue.size}, function(this: typeof iterator) {
            const element = this.next().value;
            delQueue.delete(element[0]);
            return element[1];
        }, iterator);
        await this._deleteMessages(delBatch);
    }

    private _isLargeMessage(message: ISendMessageRequest, minSize?: number): Promise<boolean> {
        const messageSize = getMessageSize(message);
        if (minSize) {
            return Promise.resolve(messageSize > minSize);
        }
        return this.getQueueMaximumMessageSize()
            .then((queueMaximumMessageSize) => {
                return messageSize >= queueMaximumMessageSize;
            });
    }

    private _prepareMessageParams(message: IMessageToSend, delay?: number, attributes?: IMessageAttributes) {
        const messageStr = isString(message) ? message : JSON.stringify(message);
        const params: ISendMessageRequest = {MessageBody: messageStr, DelaySeconds: delay};
        attributes = Object.assign({}, attributes);
        params.MessageGroupId = attributes.FIFO_MessageGroupId;
        delete attributes.FIFO_MessageGroupId;
        params.MessageDeduplicationId = attributes.FIFO_MessageDeduplicationId;
        delete attributes.FIFO_MessageDeduplicationId;
        params.MessageAttributes = createMessageAttributes(attributes);
        let getMessagePromise = Promise.resolve(messageStr);
        if (this._opts.gzip && (!this._opts.minGzipSize || getMessageSize(params) >= this._opts.minGzipSize)) {
            getMessagePromise = compressMessage(messageStr);
            params.MessageAttributes = params.MessageAttributes || {};
            params.MessageAttributes[GZIP_MARKER] = {
                StringValue: '1',
                DataType: 'Number',
            };
        }
        return getMessagePromise.then((finalMessage) => {
            params.MessageBody = finalMessage;
            return {finalMessage, params};
        });
    }

    private _handleLargeMessagePrepare({finalMessage, params}: { finalMessage: string, params: ISendMessageRequest }) {
        if (!this._opts.s3Fallback) {
            return Promise.resolve(params);
        }
        return this._isLargeMessage(params, this._opts.minS3Size)
            .then((isLarge) => {
                if (!isLarge) {
                    return Promise.resolve(params);
                }
                return uploadBlob(this.getS3(), this._opts.s3Bucket!, finalMessage, this._opts.s3Prefix || '')
                    .then((uploadData) => {
                        this.emit('s3Upload', uploadData);
                        params.MessageBody = JSON.stringify(uploadData);
                        params.MessageAttributes = params.MessageAttributes || {};
                        params.MessageAttributes[S3_MARKER] = {
                            StringValue: `${uploadData.uploadSize}`,
                            DataType: 'Number',
                        };
                        return Promise.resolve(params);
                    });
            });
    }

    private _prepareMessageRequest(message: IMessageToSend, delay?: number, attributes?: IMessageAttributes)
        : Promise<ISendMessageRequest> {
        if (attributes && attributes[GZIP_MARKER]) {
            return Promise.reject(new Error(`Using of internal attribute ${GZIP_MARKER} is not allowed`));
        }
        if (attributes && attributes[S3_MARKER]) {
            return Promise.reject(new Error(`Using of internal attribute ${S3_MARKER} is not allowed`));
        }
        return this._prepareMessageParams(message, delay, attributes)
            .then(this._handleLargeMessagePrepare.bind(this))
            .then((params) => {
                return removeEmptyKeys(params);
            });
    }

    private _initSqs() {
        if (this._opts.SQS) {
            if (typeof this._opts.SQS === 'function') {
                return new this._opts.SQS(this._opts.awsConfig || {});
            } else {
                return this._opts.SQS;
            }
        } else {
            return new SQS(this._opts.awsConfig || {});
        }
    }

    private _handleBatchDeleteResults(batch: IDeleteQueueItem[]) {
        const itemById: IDeleteQueueItemById = batch.reduce((prevByValue, item) => {
            prevByValue[item.Id] = item;
            return prevByValue;
        }, {} as IDeleteQueueItemById);
        return (data: DeleteMessageBatchCommandOutput) => {
            if (data.Failed?.length) {
                data.Failed.forEach((fail) => {
                    /* istanbul ignore next */
                    const item = itemById[fail.Id ?? ''];
                    this.emit('delError', {error: fail, message: item.msg});
                    item.msg.emit('delError', fail);
                    item.reject(fail);
                });
            }
            if (data.Successful?.length) {
                data.Successful.forEach((success) => {
                    /* istanbul ignore next */
                    const id = success.Id ?? '';
                    const item = itemById[id];
                    const msg = item.msg;
                    this.emit('deleted', {msg, successId: id});
                    msg.emit('deleted', id);
                    item.resolve();
                });
            }
        };
    }

    private _prepareMessagesToSend(messages: IMessageToSend[] | IMessageToSend, queueMaximumMessageSize: number,
                                   delay?: number, attributes?: IMessageAttributes | IMessageAttributes[]) {
        const msgs: IMessageToSend[] = Array.isArray(messages) ? messages : [messages];
        const defaultAttributes = (attributes && !Array.isArray(attributes)) ? attributes : undefined;
        const arrayAttributes = (attributes && Array.isArray(attributes)) ? attributes : [];
        const promises = msgs.map((msg, i) => {
            return this._prepareMessageRequest(msg, delay, defaultAttributes || arrayAttributes[i]);
        });
        return Promise.all(promises)
            .then((requests) => {
                const batches: ISendMessageRequest[][] = [];
                let currentBatchSize = 0;
                let currentBatchLength = 0;
                requests.forEach((message) => {
                    const messageSize = getMessageSize(message);
                    if (currentBatchLength % AWS_MAX_SEND_BATCH === 0 ||
                        currentBatchSize + messageSize >= queueMaximumMessageSize) {
                        currentBatchLength = currentBatchSize = 0;
                        batches.push([]);
                    }
                    currentBatchSize += messageSize;
                    currentBatchLength++;
                    batches[batches.length - 1].push(message);
                });
                return batches;
            });
    }

    private _handleGetBatchResult(queueUrl: string) {
        return (data: ReceiveMessageCommandOutput) => {
            let gotMessages = true;
            this._activeReq = undefined;
            if (data && data.Messages) {
                this.emit('gotMessages', data.Messages.length);
                this._emitMessages(data.Messages);
            } else {
                this.emit('queueEmpty');
                gotMessages = false;
            }
            if (this._slotsAvailable()) {
                const next = this._getBatch.bind(this, queueUrl);
                if (gotMessages && this._opts.activePollIntervalMs) {
                    setTimeout(next, this._opts.activePollIntervalMs);
                } else if (!gotMessages && this._opts.idlePollIntervalMs) {
                    setTimeout(next, this._opts.idlePollIntervalMs);
                } else {
                    next();
                }
            } else {
                this._paused = true;
                this.emit('maxInFlight');
            }
        };
    }

    private _getMaxMessagesToGet() {
        return !this._opts.maxInFlight ? this._opts.receiveBatchSize! :
            Math.min(this._opts.maxInFlight! - this._inFlight, this._opts.receiveBatchSize!);
    }
}