teamdigitale/italia-ts-commons

View on GitHub
src/tasks.ts

Summary

Maintainability
A
3 hrs
Test Coverage
import { right } from "fp-ts/lib/Either";
import { Task } from "fp-ts/lib/Task";
import * as T from "fp-ts/lib/Task";
import { TaskEither } from "fp-ts/lib/TaskEither";
import * as TE from "fp-ts/lib/TaskEither";
import { pipe } from "fp-ts/lib/function";
import { Millisecond } from "./units";

/**
 * Returns a Task that resolves to a value after a delay.
 */
export const delayTask = <A>(n: Millisecond, a: A): Task<A> =>
  T.delay(n)(T.of(a));

/**
 * In the context of a retriable task, when it returns a TransientError the
 * task can be executed again.
 */
export type TransientError = "transient";
export const TransientError: TransientError = "transient";

/**
 * A RetriableTask failed too many times.
 */
export type MaxRetries = "max-retries";
export const MaxRetries: MaxRetries = "max-retries";

/**
 * A RetriableTask has been aborted.
 */
export type RetryAborted = "retry-aborted";
export const RetryAborted: RetryAborted = "retry-aborted";

/**
 * A Task that can be retried when it fails with a transient error
 */
export type RetriableTask<E, T> = E extends TransientError
  ? never
  : TaskEither<E | TransientError, T>;

/**
 * Wraps a RetriableTask with a number of retries
 */
export const withRetries =
  <E, T>(
    maxRetries: number,
    backoff: (count: number) => Millisecond
  ): ((
    _: RetriableTask<E, T>,
    shouldAbort?: Promise<boolean>
  ) => TaskEither<E | MaxRetries | RetryAborted, T>) =>
  // eslint-disable-next-line @typescript-eslint/explicit-function-return-type
  (task, shouldAbort = Promise.resolve(false)) => {
    // Whether we must stop retrying
    // the abort process gets triggered when the shouldAbort promise resolves
    // to true. Not that aborting stops the retry process, it does NOT stop
    // the execution of the current task.
    // eslint-disable-next-line functional/no-let
    let mustAbort = false;
    shouldAbort.then(
      (v) => {
        mustAbort = v;
      },
      (_) => void 0
    );

    const runTaskOnce = (
      count: number,
      currentTask: RetriableTask<E, T>
    ): TaskEither<E | TransientError | RetryAborted, T> => {
      // on first execution, count === 0
      if (count >= maxRetries - 1) {
        // no more retries left
        return currentTask;
      }
      // allow one run of the task
      return pipe(
        currentTask,
        TE.orElse((l) => {
          // if the task fails...
          if (mustAbort) {
            return TE.left(RetryAborted);
          }
          if (l === TransientError) {
            // ...with a TransientError, chain it with a backoff delay
            // an then with another run.
            return pipe(
              delayTask(backoff(count), true),
              T.map((d) => right(d)),
              TE.chain(() => runTaskOnce(count + 1, currentTask))
            );
          }
          // ...with an error that is not a TransientError, we just return it
          return TE.left(l);
        })
      );
    };

    // if the recursive task execution returns with a failure and the failure is
    // a TransientError, it means that the retries have been exausted - we map
    // the error to a MaxRetries error.
    return pipe(
      runTaskOnce(0, task),
      TE.mapLeft((l) => (l === TransientError ? MaxRetries : l))
    );
  };