
View on GitHub


0 mins
Test Coverage
# flake8: noqa
import riprova

def task():
    """Retry operation if it fails with constant backoff"""

def task():
    """Retry operation if it fails using exponential backoff"""

def task():
    """Raises a TimeoutError if the retry loop exceeds from 10 seconds"""

def on_retry(err, next_try):
    print('Operation error: {}'.format(err))
    print('Next try in: {}ms'.format(next_try))

def task():
    """Subscribe via function callback to every retry attempt"""

def evaluator(response):
    # Force retry operation if not a valid response
    if response.status >= 400:
        raise RuntimeError('invalid response status')
    # Otherwise return False, meaning no retry
    return False

def task():
    """Use a custom evaluator function to determine if the operation failed or not"""

async def task():
    """Asynchronous coroutines are also supported :)"""