drdown/taskapp/celery.py
import os
from celery import Celery
from django.apps import apps, AppConfig
from django.conf import settings
if not settings.configured:
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE',
'config.settings.local') # pragma: no cover
app = Celery('drdown')
class CeleryConfig(AppConfig):
name = 'drdown.taskapp'
verbose_name = 'Celery Config'
def ready(self):
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
installed_apps = [app_config.name for app_config in apps.get_app_configs()] # pragma: no cover
app.autodiscover_tasks(lambda: installed_apps, force=True)
if hasattr(settings, 'RAVEN_CONFIG'):
# Celery signal registration
# Since raven is required in production only,
# imports might (most surely will) be wiped out
# during PyCharm code clean up started
# in other environments.
# @formatter:off
from raven import Client as RavenClient # pragma: no cover
from raven.contrib.celery import register_signal as raven_register_signal # pragma: no cover
from raven.contrib.celery import register_logger_signal as raven_register_logger_signal # pragma: no cover
# @formatter:on
raven_client = RavenClient(dsn=settings.RAVEN_CONFIG['dsn']) # pragma: no cover
raven_register_logger_signal(raven_client) # pragma: no cover
raven_register_signal(raven_client) # pragma: no cover
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}') # pragma: no cover