danidee10/django-notifs

View on GitHub
notifications/backends/celery.py

Summary

Maintainability
A
0 mins
Test Coverage
A
93%
"""Celery backend"""

from __future__ import absolute_import, unicode_literals

import logging

from celery import shared_task
from celery.utils.log import get_task_logger

from .. import default_settings as settings
from .base import BaseBackend

logger = get_task_logger(__name__)


@shared_task(
    bind=True,
    retry_backoff=settings.NOTIFICATIONS_RETRY_INTERVAL,
    max_retries=settings.NOTIFICATIONS_MAX_RETRIES,
)
def consume(self, provider, payload, context):
    """Send notification via a channel to celery."""
    try:
        CeleryBackend.consume(provider, payload, context)
    except Exception as e:
        if settings.NOTIFICATIONS_RETRY:
            self.retry(exc=e)


class CeleryBackend(BaseBackend):

    logger = logging.getLogger('django_notifs.backends.celery')

    def produce(self, provider, payload, context, countdown):
        consume.apply_async(
            args=[provider, payload, context],
            queue=settings.NOTIFICATIONS_QUEUE_NAME,
            countdown=countdown,
        )