PruvoNet/squiss-ts

View on GitHub
src/TimeoutExtender.ts

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
import {Squiss} from './index';
import {Message} from './Message';
import * as LinkedList from 'linked-list';
import {Item} from 'linked-list';
import {SQSServiceException} from '@aws-sdk/client-sqs';

const MAX_MESSAGE_AGE_MS = 43200000;

class Node extends Item {
  constructor(public message: Message, public receivedOn: number, public timerOn: number) {
    super();
  }
}

export interface ITimeoutExtenderOptions {
  visibilityTimeoutSecs?: number;
  noExtensionsAfterSecs?: number;
  advancedCallMs?: number;
}

interface MessageIndex {
  [k: string]: Node;
}

const optDefaults: ITimeoutExtenderOptions = {
  visibilityTimeoutSecs: 30,
  noExtensionsAfterSecs: MAX_MESSAGE_AGE_MS / 1000,
  advancedCallMs: 5000,
};

export class TimeoutExtender {

  public readonly _index: MessageIndex;
  public _linkedList: LinkedList<Node>;
  public _opts: ITimeoutExtenderOptions;
  private _squiss: Squiss;
  private _timer: any;
  private readonly _visTimeout: number;
  private readonly _stopAfter: number;
  private readonly _apiLeadMs: number;

  constructor(squiss: Squiss, opts?: ITimeoutExtenderOptions) {
    this._opts = Object.assign({}, optDefaults, opts || {});
    this._index = {};
    this._timer = undefined;
    this._squiss = squiss;
    this._linkedList = new LinkedList<Node>();
    this._squiss.on('handled', (msg: Message) => {
      return this.deleteMessage(msg);
    });
    this._squiss.on('message', (msg: Message) => {
      return this.addMessage(msg);
    });
    this._visTimeout = this._opts.visibilityTimeoutSecs! * 1000;
    this._stopAfter = Math.min(this._opts.noExtensionsAfterSecs! * 1000, MAX_MESSAGE_AGE_MS);
    this._apiLeadMs = Math.min(this._opts.advancedCallMs!, this._visTimeout);
  }

  public addMessage(message: Message) {
    const now = Date.now();
    this._addNode(new Node(message, now, now + this._visTimeout - this._apiLeadMs));
  }

  public deleteMessage(message: Message) {
    const node = this._index[message.raw.MessageId!];
    if (node) {
      this._deleteNode(node);
    }
  }

  public _addNode(node: Node) {
    this._index[node.message.raw.MessageId!] = node;
    this._linkedList.append(node);
    if (!node.prev) {
      this._headChanged();
    }
  }

  public _deleteNode(node: Node) {
    const msgId = node.message.raw.MessageId!;
    delete this._index[msgId];
    const isFirst = !node.prev;
    node.detach();
    if (isFirst) {
      this._headChanged();
    }
  }

  public _getNodeAge(node: Node) {
    return Date.now() - node.receivedOn + this._apiLeadMs;
  }

  public _headChanged() {
    if (this._timer) {
      clearTimeout(this._timer);
    }
    if (!this._linkedList.head) {
      return false;
    }
    const node = this._linkedList.head;
    this._timer = setTimeout(() => {
      if (this._getNodeAge(node) >= this._stopAfter) {
        this._deleteNode(node);
        node.message.keep();
        node.message.emit('keep');
        this._squiss.emit('keep', node.message);
        node.message.emit('timeoutReached');
        this._squiss.emit('timeoutReached', node.message);
        return;
      }
      return this._renewNode(node);
    }, node.timerOn - Date.now());
    return true;
  }

  public _renewNode(node: Node) {
    const extendByMs = Math.min(this._visTimeout, MAX_MESSAGE_AGE_MS - this._getNodeAge(node));
    const extendBySecs = Math.floor(extendByMs / 1000);
    node.message.emit('extendingTimeout');
    this._squiss.emit('extendingTimeout', node.message);
    this._squiss.changeMessageVisibility(node.message, extendBySecs)
      .then(() => {
        node.message.emit('timeoutExtended');
        this._squiss.emit('timeoutExtended', node.message);
      })
      .catch((err: SQSServiceException) => {
        if (err.name === 'ReceiptHandleIsInvalid' || err.name === 'MessageNotInflight' ||
            err.message.match(/message does not exist or is not available/i) ||
            err.message.match(/the receipt handle has expired/i) ||
            err.message.match(/not a valid receipt handle/i)) {
          this._deleteNode(node);
          node.message.emit('autoExtendFail', err);
          this._squiss.emit('autoExtendFail', {message: node.message, error: err});
        } else {
          node.message.emit('autoExtendError', err);
          this._squiss.emit('autoExtendError', {message: node.message, error: err});
        }
      });
    this._deleteNode(node);
    node.timerOn = Date.now() + extendByMs - this._apiLeadMs;
    this._addNode(node);
  }
}