
View on GitHub


35 mins
Test Coverage
import os
import pickle
import asyncio
import logging
import functools

from celery.backends.base import DisabledBackend

from waterbutler.tasks import app
from waterbutler.tasks import settings
from waterbutler.tasks import exceptions

logger = logging.getLogger(__name__)

def ensure_event_loop():
    """Ensure the existance of an eventloop
    Useful for contexts where get_event_loop() may
    raise an exception.
    :returns: The new event loop
    :rtype: BaseEventLoop
        return asyncio.get_event_loop()
    except (AssertionError, RuntimeError):

    # Note: No clever tricks are used here to dry up code
    # This avoids an infinite loop if settings the event loop ever fails
    return asyncio.get_event_loop()

def __coroutine_unwrapper(func):
    def wrapped(*args, **kwargs):
        return ensure_event_loop().run_until_complete(func(*args, **kwargs))
    wrapped.as_async = func
    return wrapped

async def backgrounded(func, *args, **kwargs):
    """Runs the given function with the given arguments in
    a background thread
    loop = asyncio.get_event_loop()
    if asyncio.iscoroutinefunction(func):
        func = __coroutine_unwrapper(func)

    return (await loop.run_in_executor(
        None,  # None uses the default executer, ThreadPoolExecuter
        functools.partial(func, *args, **kwargs)

def backgroundify(func):
    async def wrapped(*args, **kwargs):
        return (await backgrounded(func, *args, **kwargs))
    return wrapped

def adhoc_file_backend(func, was_bound=False, basepath=None):
    basepath = basepath or settings.ADHOC_BACKEND_PATH

    def wrapped(task, *args, **kwargs):
        if was_bound:
            args = (task,) + args

            result = func(*args, **kwargs)
        except Exception as e:
            result = e

        with open(os.path.join(basepath,, 'wb') as result_file:
            pickle.dump(result, result_file)

        if isinstance(result, Exception):
            raise result
        return result
    return wrapped

def celery_task(func, *args, **kwargs):
    """A wrapper around Celery.task. When the wrapped method is called it will be called using
    Celery's Task.delay function and run in a background thread.

    If the celery backend is disabled, the task will be wrapped in a function that will write the
    result to disk using the pickle serialization protocol.
    task_func = __coroutine_unwrapper(func)

    if isinstance(app.backend, DisabledBackend):
        task_func = adhoc_file_backend(
            was_bound=kwargs.pop('bind', False)
        kwargs['bind'] = True

    logger.debug('celery_task: task_func:({})'.format(task_func))

    task = app.task(task_func, **kwargs)
    task.adelay = backgroundify(task.delay)

    return task

async def wait_on_celery(result, interval=None, timeout=None, basepath=None):
    timeout = timeout or settings.WAIT_TIMEOUT
    interval = interval or settings.WAIT_INTERVAL
    basepath = basepath or settings.ADHOC_BACKEND_PATH

    waited = 0

    while True:
        if isinstance(app.backend, DisabledBackend):
                with open(os.path.join(basepath,, 'rb') as result_file:
                    data = pickle.load(result_file)
                if isinstance(data, Exception):
                    raise data
                return data
            except FileNotFoundError:
            if result.ready():
                if result.failed():
                    raise result.result
                return result.result

        if waited > timeout:
            raise exceptions.WaitTimeOutError
        await asyncio.sleep(interval)
        waited += interval