saltstack/salt

View on GitHub
salt/modules/celery.py

Summary

Maintainability
A
55 mins
Test Coverage
# -*- coding: utf-8 -*-
'''
Support for scheduling celery tasks. The worker is independent of salt and thus can run in a different
virtualenv or on a different python version, as long as broker, backend and serializer configurations match.
Also note that celery and packages required by the celery broker, e.g. redis must be installed to load
the salt celery execution module.

.. note::
    A new app (and thus new connections) is created for each task execution
'''
from __future__ import absolute_import, print_function, unicode_literals

# Import python libs
import logging
import sys

# Import salt libs
from salt.exceptions import SaltInvocationError
from salt.ext import six

log = logging.getLogger(__name__)


# Import third party libs
try:
    from celery import Celery
    from celery.exceptions import TimeoutError
    HAS_CELERY = True
except ImportError:
    HAS_CELERY = False


def __virtual__():
    '''
    Only load if celery libraries exist.
    '''
    if not HAS_CELERY:
        return False, 'The celery module could not be loaded: celery library not found'
    return True


def run_task(task_name, args=None, kwargs=None, broker=None, backend=None, wait_for_result=False, timeout=None,
             propagate=True, interval=0.5, no_ack=True, raise_timeout=True, config=None):
    '''
    Execute celery tasks. For celery specific parameters see celery documentation.


    CLI Example:

    .. code-block:: bash

        salt '*' celery.run_task tasks.sleep args=[4] broker=redis://localhost \\
        backend=redis://localhost wait_for_result=true

    task_name
        The task name, e.g. tasks.sleep

    args
        Task arguments as a list

    kwargs
        Task keyword arguments

    broker
        Broker for celeryapp, see celery documentation

    backend
        Result backend for celeryapp, see celery documentation

    wait_for_result
        Wait until task result is read from result backend and return result, Default: False

    timeout
        Timeout waiting for result from celery, see celery AsyncResult.get documentation

    propagate
        Propagate exceptions from celery task, see celery AsyncResult.get documentation, Default: True

    interval
        Interval to check for task result, see celery AsyncResult.get documentation, Default: 0.5

    no_ack
        see celery AsyncResult.get documentation. Default: True

    raise_timeout
        Raise timeout exception if waiting for task result times out. Default: False

    config
        Config dict for celery app, See celery documentation

    '''
    if not broker:
        raise SaltInvocationError('broker parameter is required')

    with Celery(broker=broker, backend=backend, set_as_current=False) as app:
        if config:
            app.conf.update(config)

        with app.connection():
            args = args or []
            kwargs = kwargs or {}
            async_result = app.send_task(task_name, args=args, kwargs=kwargs)

            if wait_for_result:
                try:
                    return async_result.get(timeout=timeout, propagate=propagate,
                                            interval=interval, no_ack=no_ack)
                except TimeoutError as ex:
                    log.error('Waiting for the result of a celery task execution timed out.')
                    if raise_timeout:
                        six.reraise(*sys.exc_info())
                    return False