exxamalte/python-aio-georss-client

View on GitHub
aio_georss_client/feed_manager.py

Summary

Maintainability
A
35 mins
Test Coverage
"""Base class for the feed manager. This allows managing feeds and their entries throughout their life-cycle."""
from __future__ import annotations

import logging
from collections.abc import Awaitable, Callable
from datetime import datetime

from .consts import UPDATE_OK, UPDATE_OK_NO_DATA
from .feed import GeoRssFeed
from .feed_entry import FeedEntry
from .status_update import StatusUpdate

_LOGGER = logging.getLogger(__name__)


class FeedManagerBase:
    """Generic Feed manager."""

    def __init__(
        self,
        feed: GeoRssFeed,
        generate_async_callback: Callable[[str], Awaitable[None]],
        update_async_callback: Callable[[str], Awaitable[None]],
        remove_async_callback: Callable[[str], Awaitable[None]],
        status_async_callback: Callable[[StatusUpdate], Awaitable[None]] = None,
    ):
        """Initialise feed manager."""
        self._feed: GeoRssFeed = feed
        self.feed_entries: dict = {}
        self._managed_external_ids: set = set()
        self._last_update: datetime | None = None
        self._last_update_successful: datetime | None = None
        self._generate_async_callback: Callable[
            [str], Awaitable[None]
        ] = generate_async_callback
        self._update_async_callback: Callable[
            [str], Awaitable[None]
        ] = update_async_callback
        self._remove_async_callback: Callable[
            [str], Awaitable[None]
        ] = remove_async_callback
        self._status_async_callback: Callable[
            [StatusUpdate], Awaitable[None]
        ] = status_async_callback

    def __repr__(self):
        """Return string representation of this feed."""
        return f"<{self.__class__.__name__}(feed={self._feed})>"

    async def update(self):
        """Update the feed and then update connected entities."""
        status, feed_entries = await self._feed.update()
        # Record current time of update.
        self._last_update = datetime.now()
        count_created: int = 0
        count_updated: int = 0
        count_removed: int = 0
        await self._store_feed_entries(status, feed_entries)
        if status == UPDATE_OK:
            _LOGGER.debug("Data retrieved %s", feed_entries)
            # Record current time of update.
            self._last_update_successful = self._last_update
            # For entity management the external ids from the feed are used.
            feed_external_ids = {entry.external_id for entry in feed_entries}
            count_removed = await self._update_feed_remove_entries(feed_external_ids)
            count_updated = await self._update_feed_update_entries(feed_external_ids)
            count_created = await self._update_feed_create_entries(feed_external_ids)
        elif status == UPDATE_OK_NO_DATA:
            _LOGGER.debug("Update successful, but no data received from %s", self._feed)
            # Record current time of update.
            self._last_update_successful = self._last_update
        else:
            _LOGGER.warning(
                "Update not successful, no data received from %s", self._feed
            )
            # Remove all entities.
            count_removed = await self._update_feed_remove_entries(set())
        # Send status update to subscriber.
        await self._status_update(status, count_created, count_updated, count_removed)

    async def _store_feed_entries(
        self, status: str, feed_entries: list[FeedEntry] | None
    ):
        """Keep a copy of all feed entries for future lookups."""
        if feed_entries or status == UPDATE_OK_NO_DATA:
            if status == UPDATE_OK:
                self.feed_entries = {entry.external_id: entry for entry in feed_entries}
        else:
            self.feed_entries.clear()

    async def _update_feed_create_entries(self, feed_external_ids: set[str]) -> int:
        """Create entities after feed update."""
        create_external_ids: set[str] = feed_external_ids.difference(
            self._managed_external_ids
        )
        count_created = len(create_external_ids)
        await self._generate_new_entities(create_external_ids)
        return count_created

    async def _update_feed_update_entries(self, feed_external_ids: set[str]) -> int:
        """Update entities after feed update."""
        update_external_ids: set[str] = self._managed_external_ids.intersection(
            feed_external_ids
        )
        count_updated = len(update_external_ids)
        await self._update_entities(update_external_ids)
        return count_updated

    async def _update_feed_remove_entries(self, feed_external_ids: set[str]) -> int:
        """Remove entities after feed update."""
        remove_external_ids: set[str] = self._managed_external_ids.difference(
            feed_external_ids
        )
        count_removed = len(remove_external_ids)
        await self._remove_entities(remove_external_ids)
        return count_removed

    async def _generate_new_entities(self, external_ids: set[str]):
        """Generate new entities for events."""
        for external_id in external_ids:
            await self._generate_async_callback(external_id)
            _LOGGER.debug("New entity added %s", external_id)
            self._managed_external_ids.add(external_id)

    async def _update_entities(self, external_ids: set[str]):
        """Update entities."""
        for external_id in external_ids:
            _LOGGER.debug("Existing entity found %s", external_id)
            await self._update_async_callback(external_id)

    async def _remove_entities(self, external_ids: set[str]):
        """Remove entities."""
        for external_id in external_ids:
            _LOGGER.debug("Entity not current anymore %s", external_id)
            self._managed_external_ids.remove(external_id)
            await self._remove_async_callback(external_id)

    async def _status_update(
        self, status: str, count_created: int, count_updated: int, count_removed: int
    ):
        """Provide status update."""
        if self._status_async_callback:
            await self._status_async_callback(
                StatusUpdate(
                    status,
                    self.last_update,
                    self.last_update_successful,
                    self.last_timestamp,
                    len(self.feed_entries),
                    count_created,
                    count_updated,
                    count_removed,
                )
            )

    @property
    def last_timestamp(self) -> datetime | None:
        """Return the last timestamp extracted from this feed."""
        return self._feed.last_timestamp

    @property
    def last_update(self) -> datetime | None:
        """Return the last update of this feed."""
        return self._last_update

    @property
    def last_update_successful(self) -> datetime | None:
        """Return the last successful update of this feed."""
        return self._last_update_successful