digitalfabrik/integreat-cms

View on GitHub
integreat_cms/cms/utils/external_calendar_utils.py

Summary

Maintainability
A
0 mins
Test Coverage
B
82%
"""
Utility functions for working with external calendars
"""

from __future__ import annotations

import dataclasses
import datetime
import logging

import icalendar.cal
from django.utils.translation import gettext as _
from icalendar.prop import vCategory

from integreat_cms.cms.constants import status
from integreat_cms.cms.forms import EventForm, EventTranslationForm
from integreat_cms.cms.models import EventTranslation, ExternalCalendar
from integreat_cms.cms.utils.content_utils import clean_content


# pylint: disable=too-many-instance-attributes
@dataclasses.dataclass(frozen=True, kw_only=True)
class IcalEventData:
    """
    Holds data extracted from ical events
    """

    event_id: str
    title: str
    content: str
    start_date: datetime.date
    start_time: datetime.time | None
    end_date: datetime.date
    end_time: datetime.time | None
    is_all_day: bool
    categories: list[str]
    external_calendar_id: int

    @classmethod
    def from_ical_event(
        cls,
        event: icalendar.cal.Component,
        language_slug: str,
        external_calendar_id: int,
        logger: logging.Logger,
    ) -> IcalEventData:
        """
        Reads an ical event and constructs an instance of this class from it
        :param event: The ical event
        :param language_slug: The slug of the language of this event
        :param external_calendar_id: The id of the external calendar of this event
        :param logger: The logger to use
        :return: An instance of IcalEventData
        """
        event_id = event.decoded("UID").decode("utf-8")
        title = event.decoded("SUMMARY").decode("utf-8")
        content = clean_content(
            content=(
                event.decoded("DESCRIPTION").decode("utf-8")
                if "DESCRIPTION" in event
                else ""
            ).replace("\n", "<br>"),
            language_slug=language_slug,
        )
        start = event.decoded("DTSTART")
        end = event.decoded("DTEND")

        # Categories can be a "vCategory" object, a list of such objects, or be missing
        categories = event.get("categories", [])
        categories = (
            categories.cats
            if isinstance(categories, vCategory)
            else [
                category
                for sub_categories in categories
                for category in sub_categories.cats
            ]
        )

        logger.debug(
            "Event(event_id=%s, title=%s, start=%s, end=%s, content=%s...)",
            event_id,
            title,
            start,
            end,
            content[:32],
        )

        return cls(
            event_id=event_id,
            title=title,
            content=content,
            start_date=start.date() if isinstance(start, datetime.datetime) else start,
            start_time=start.time() if isinstance(start, datetime.datetime) else None,
            end_date=end.date() if isinstance(end, datetime.datetime) else end,
            end_time=end.time() if isinstance(end, datetime.datetime) else None,
            is_all_day=not isinstance(start, datetime.datetime),
            external_calendar_id=external_calendar_id,
            categories=categories,
        )

    def to_event_form_data(self) -> dict:
        """
        Returns a dictionary of relevant data for the event form
        :return: Dict of relevant data
        """
        return {
            "start_date": self.start_date,
            "start_time": self.start_time,
            "end_date": self.end_date,
            "end_time": self.end_time,
            "is_all_day": self.is_all_day,
            "has_not_location": True,
            "external_calendar": self.external_calendar_id,
            "external_event_id": self.event_id,
        }

    def to_event_translation_form_data(self) -> dict:
        """
        Returns a dictionary of relevant data for the event translation form
        :return: Dict of relevant data
        """
        return {"title": self.title, "status": status.PUBLIC, "content": self.content}


def import_events(calendar: ExternalCalendar, logger: logging.Logger) -> None:
    """
    Imports events from this calendar and sets or clears the errors field of the calendar

    :param calendar: The external calendar
    :param logger: The logger to use
    """

    errors: list[str] = []

    _import_events(calendar, errors, logger)

    if errors:
        calendar.errors = "\n".join(errors)
    else:
        calendar.errors = ""

    calendar.save()


def _import_events(
    calendar: ExternalCalendar, errors: list[str], logger: logging.Logger
) -> None:
    """
    Imports events from this calendar and sets or clears the errors field of the calendar

    :param calendar: The external calendar
    :param logger: The logger to use
    """
    try:
        ical = calendar.load_ical()
    except IOError as e:
        logger.error("Could not import events from %s: %s", calendar, e)
        errors.append(_("Could not access the url of this external calendar"))
        return
    except ValueError as e:
        logger.error("Malformed calendar %s: %s", calendar, e)
        errors.append(
            _("The data provided by the url of this external calendar is invalid")
        )
        return

    calendar_events = set()
    for event in ical.walk("VEVENT"):
        try:
            if (event_uid := import_event(calendar, event, errors, logger)) is not None:
                calendar_events.add(event_uid)
        except KeyError as e:
            logger.error(
                "Could not import event because it does not have a required field: %s, missing field: %r",
                event,
                e,
            )
            errors.append(
                _(
                    "Could not import event because it is missing a required field: {}"
                ).format(e)
            )
            continue

    events_to_delete = calendar.events.exclude(external_event_id__in=calendar_events)
    logger.info(
        "Deleting %s unused events: %r", events_to_delete.count(), events_to_delete
    )
    events_to_delete.delete()


def import_event(
    calendar: ExternalCalendar,
    event: icalendar.cal.Component,
    errors: list[str],
    logger: logging.Logger,
) -> str | None:
    """
    Imports an event from the external calendar

    :param calendar: The external calendar
    :param event: The event that should be imported
    :param errors: A list to which errors will be logged
    :param logger: The logger to use

    :return: The uid of the event
    """
    language = calendar.region.default_language

    event_data = IcalEventData.from_ical_event(
        event, language.slug, calendar.pk, logger
    )

    # Skip this event if it does not have the required tag
    if calendar.import_filter_category and not any(
        category == calendar.import_filter_category
        for category in event_data.categories
    ):
        logger.info(
            "Skipping event %s with tags: [%s]",
            event_data.title,
            ", ".join(event_data.categories),
        )
        return None

    previously_imported_event_translation = EventTranslation.objects.filter(
        event__external_calendar=calendar,
        event__external_event_id=event_data.event_id,
        language=language,
    ).first()
    previously_imported_event = (
        previously_imported_event_translation.event
        if previously_imported_event_translation
        else None
    )

    event_form = EventForm(
        data=event_data.to_event_form_data(),
        instance=previously_imported_event,
        additional_instance_attributes={"region": calendar.region},
    )
    if not event_form.is_valid():
        logger.error("Could not import event: %r", event_form.errors)
        errors.append(
            _("Could not import '{}': {}").format(event_data.title, event_form.errors)
        )
        return event_data.event_id

    event = event_form.save()

    event_translation_form = EventTranslationForm(
        data=event_data.to_event_translation_form_data(),
        instance=previously_imported_event_translation,
        additional_instance_attributes={
            "language": language,
            "event": event,
        },
    )
    if not event_translation_form.is_valid():
        logger.error("Could not import event: %r", event_translation_form.errors)
        errors.append(
            _("Could not import '{}': {}").format(
                event_data.title, event_translation_form.errors
            )
        )
        return event_data.event_id

    # We could look at the sequence number of the ical event too, to see if it has changed.
    # If it hasn't, we don't need to create forms and can quickly skip it
    if event_form.has_changed() or event_translation_form.has_changed():
        event_translation = event_translation_form.save()
        logger.success("Imported event %r, %r", event, event_translation)  # type: ignore[attr-defined]
    else:
        logger.info("Event %r has not changed", event_translation_form.instance)

    return event_data.event_id