amenezes/rabbit-client

View on GitHub
rabbit/_wait.py

Summary

Maintainability
A
35 mins
Test Coverage
import os
from typing import Union

from .logger import logger


def expo(
    headers: Union[None, dict],
    delay: Union[None, int] = None,
    base: Union[None, int] = None,
    factor: Union[None, int] = None,
    max_delay: Union[None, int] = None,
) -> int:
    """Exponential delay strategy."""
    delay = delay or int(os.getenv("EXPO_DELAY", 300000))
    base = base or int(os.getenv("EXPO_BASE", 2))
    factor = factor = int(os.getenv("EXPO_FACTOR", 1))
    max_delay = max_delay or os.getenv("EXPO_MAX_DELAY")  # type: ignore
    if max_delay:
        max_delay = int(max_delay)

    current_delay = _set_timeout(headers, delay)
    updated_delay = int(current_delay * (base * factor))
    logger.debug(
        f"expo delay strategy: [delay={delay}, current_delay={current_delay}, base={base}, factor={factor}, max_delay={max_delay}]"
    )
    if max_delay is None or updated_delay <= max_delay:
        return updated_delay
    return max_delay


def fibo(
    headers: Union[None, dict],
    delay: Union[None, int] = None,
    max_delay: Union[None, int] = None,
) -> int:
    """Incremental delay strategy."""
    delay = delay or int(os.getenv("FIBO_DELAY", 300000))
    max_delay = max_delay or int(os.getenv("FIBO_MAX_DELAY", 86400000))

    current_delay = _set_timeout(headers, delay)
    logger.debug(
        f"fibo delay strategy: [delay={delay}, current_delay={current_delay}, max_delay={max_delay}]"
    )
    if current_delay < max_delay:
        return int(current_delay + 60000)
    return int(max_delay)


def constant(headers: Union[None, dict], delay: Union[None, int] = None) -> int:
    """Constant delay strategy."""
    delay = delay or int(os.getenv("CONSTANT_DELAY", 300000))
    logger.debug(f"constant delay strategy: [delay={delay}]")
    return delay


def _set_timeout(headers: Union[None, dict], delay: int) -> int:
    if (headers is not None) and ("x-delay" in headers):
        delay = headers["x-delay"]
    return int(delay)