maestro-server/discovery-api

View on GitHub
app/celery.py

Summary

Maintainability
A
0 mins
Test Coverage
B
83%
from celery import Celery


def make_celery(app):
    celery = Celery(app.import_name)
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery