flamingo-run/django-cloud-tasks

View on GitHub
django_cloud_tasks/tasks/publisher_task.py

Summary

Maintainability
A
0 mins
Test Coverage
import abc
from dataclasses import dataclass
from typing import Type

from cachetools.func import lru_cache
from django.apps import apps
from django.db.models import Model
from django.db import transaction
from gcp_pilot.pubsub import CloudPublisher

from django_cloud_tasks.apps import DjangoCloudTasksAppConfig
from django_cloud_tasks.context import get_current_headers
from django_cloud_tasks.serializers import serialize
from django_cloud_tasks.tasks.task import Task, get_config


class PublisherTask(Task, abc.ABC):
    # Just a specialized Task that publishes a message to PubSub
    # Since it cannot accept any random parameters, all its signatures have fixed arguments
    @classmethod
    def sync(cls, message: dict, attributes: dict[str, str] | None = None):
        return cls().run(message=message, attributes=attributes)

    @classmethod
    def asap(cls, message: dict, attributes: dict[str, str] | None = None):
        task_kwargs = {
            "message": message,
            "attributes": attributes,
        }
        return cls.push(task_kwargs=task_kwargs)

    def run(self, message: dict, attributes: dict[str, str] | None = None, headers: dict[str, str] | None = None):
        # Cloud PubSub does not support headers, but we simulate them with a key in the data property
        message = self._build_message_with_headers(message=message, headers=headers)

        return self._get_publisher_client().publish(
            message=serialize(value=message),
            topic_id=self.topic_name(),
            attributes=attributes,
        )

    def _build_message_with_headers(self, message: dict, headers: dict | None = None):
        message = message.copy()
        headers = get_current_headers() | (headers or {})
        if headers:
            message[self._app.propagated_headers_key] = headers
        return message

    @classmethod
    def set_up(cls) -> None:
        cls._get_publisher_client().create_topic(topic_id=cls.topic_name())

    @classmethod
    def topic_name(cls) -> str:
        name = cls.name()
        if app_name := get_config(name="app_name"):
            delimiter = get_config(name="delimiter")
            name = f"{app_name}{delimiter}{name}"
        return name

    @classmethod
    @lru_cache()
    def _get_publisher_client(cls) -> CloudPublisher:
        return CloudPublisher()

    @property
    @lru_cache()
    def _app(self) -> DjangoCloudTasksAppConfig:
        return apps.get_app_config("django_cloud_tasks")


@dataclass
class PreparedModelPublication:
    """Stores the information needed to publish a model to PubSub.

    Because models are mutable objects, in case we don't want to publish the event right away,
    we need to store the information needed to publish right away.
    """

    task_klass: Type["ModelPublisherTask"]
    message: dict
    attributes: dict[str, str]
    topic_name: str

    def get_task_kwargs(self):
        return {
            "message": self.message,
            "attributes": self.attributes,
            "topic_name": self.topic_name,
        }

    def sync(self):
        return self.task_klass().run(**self.get_task_kwargs())

    def asap(self):
        return self.push()

    def push(self, **kwargs):
        return self.task_klass._push_prepared(prepared=self, **kwargs)


class ModelPublisherTask(PublisherTask, abc.ABC):
    # Just a specialized Task that publishes a Django model to PubSub
    # Since it cannot accept any random parameters, all its signatures have fixed arguments
    @classmethod
    def sync(cls, obj: Model, **kwargs):
        return cls.prepare(obj=obj, **kwargs).sync()

    @classmethod
    def sync_on_commit(cls, obj: Model, **kwargs):
        prepared_publication = cls.prepare(obj=obj, **kwargs)
        transaction.on_commit(lambda: prepared_publication.sync())

    @classmethod
    def asap(cls, obj: Model, **kwargs):
        return cls.prepare(obj=obj, **kwargs).asap()

    @classmethod
    def push(cls, task_kwargs: dict, **kwargs):
        return cls.prepare(**task_kwargs).push(**kwargs)

    @classmethod
    def _push_prepared(cls, prepared: PreparedModelPublication, **kwargs):
        return super().push(task_kwargs=prepared.get_task_kwargs(), **kwargs)

    @classmethod
    def prepare(cls, obj: Model, **kwargs):
        return PreparedModelPublication(
            task_klass=cls,
            message=cls.build_message_content(obj=obj, **kwargs),
            attributes=cls.build_message_attributes(obj=obj, **kwargs),
            topic_name=cls.topic_name(obj=obj, **kwargs),
        )

    def run(
        self, message: dict, topic_name: str, attributes: dict[str, str] | None, headers: dict[str, str] | None = None
    ):
        message = self._build_message_with_headers(message=message, headers=headers)
        return self._get_publisher_client().publish(
            message=serialize(value=message),
            topic_id=topic_name,
            attributes=attributes,
        )

    @classmethod
    def set_up(cls) -> None: ...  # TODO: run over all models?

    @classmethod
    def topic_name(cls, obj: Model, **kwargs) -> str:
        name = cls.extract_model_name(obj=obj)
        if app_name := get_config(name="app_name"):
            delimiter = get_config(name="delimiter")
            name = f"{app_name}{delimiter}{name}"
        return name

    @classmethod
    def extract_model_name(cls, obj: Model) -> str:
        app_name = str(obj.__class__._meta.app_label).lower()
        model_name = str(obj.__class__._meta.model_name).lower()
        return f"{app_name}-{model_name}"

    @classmethod
    def build_message_content(cls, obj: Model, **kwargs) -> dict:
        raise NotImplementedError()

    @classmethod
    def build_message_attributes(cls, obj: Model, **kwargs) -> dict[str, str]:
        raise NotImplementedError()

    @classmethod
    @lru_cache()
    def _get_publisher_client(cls) -> CloudPublisher:
        return CloudPublisher()