exxamalte/python-geojson-client

View on GitHub
geojson_client/__init__.py

Summary

Maintainability
B
4 hrs
Test Coverage
"""
Base class for GeoJSON services.

Fetches GeoJSON feed from URL to be defined by sub-class.
"""
import logging
from datetime import datetime
from json import JSONDecodeError
from typing import Callable, Dict, List, Optional, Tuple

import geojson
import requests
from geojson import GeometryCollection, Point, Polygon
from haversine import haversine

from geojson_client.consts import FILTER_RADIUS, HTTP_ACCEPT_ENCODING_HEADER

_LOGGER = logging.getLogger(__name__)

UPDATE_OK = "OK"
UPDATE_OK_NO_DATA = "OK_NO_DATA"
UPDATE_ERROR = "ERROR"


class GeoJsonFeed:
    """Geo JSON feed base class."""

    def __init__(self, home_coordinates, url, filter_radius=None):
        """Initialise this service."""
        self._home_coordinates = home_coordinates
        self._filter_radius = filter_radius
        self._url = url
        self._request = requests.Request(
            method="GET", url=url, headers=HTTP_ACCEPT_ENCODING_HEADER
        ).prepare()
        self._last_timestamp = None

    def __repr__(self):
        """Return string representation of this feed."""
        return "<{}(home={}, url={}, radius={})>".format(
            self.__class__.__name__,
            self._home_coordinates,
            self._url,
            self._filter_radius,
        )

    def _new_entry(self, home_coordinates, feature, global_data):
        """Generate a new entry."""
        pass

    def _update_internal(
        self, filter_function: Callable[[List], List]
    ) -> Tuple[str, Optional[List]]:
        """Update from external source and return filtered entries."""
        status, data = self._fetch()
        if status == UPDATE_OK:
            if data:
                entries = []
                global_data = self._extract_from_feed(data)
                # Extract data from feed entries.
                for feature in data.features:
                    entries.append(
                        self._new_entry(self._home_coordinates, feature, global_data)
                    )
                filtered_entries = filter_function(entries)
                self._last_timestamp = self._extract_last_timestamp(filtered_entries)
                return UPDATE_OK, filtered_entries
            else:
                # Should not happen.
                return UPDATE_OK, None
        elif status == UPDATE_OK_NO_DATA:
            # Happens for example if the server returns 304
            return UPDATE_OK_NO_DATA, None
        else:
            # Error happened while fetching the feed.
            return UPDATE_ERROR, None

    def update(self) -> Tuple[str, Optional[List]]:
        """Update from external source and return filtered entries."""
        return self._update_internal(lambda entries: self._filter_entries(entries))

    def update_override(
        self, filter_overrides: Dict = None
    ) -> Tuple[str, Optional[List]]:
        """Update from external source and return filtered entries with ability to
        override filter conditions."""
        return self._update_internal(
            lambda entries: self._filter_entries_override(
                entries, filter_overrides=filter_overrides
            )
        )

    def _fetch(self):
        """Fetch GeoJSON data from external source."""
        try:
            with requests.Session() as session:
                response = session.send(self._request, timeout=10)
            if response.ok:
                feature_collection = geojson.loads(response.text)
                return UPDATE_OK, feature_collection
            else:
                _LOGGER.warning(
                    "Fetching data from %s failed with status %s",
                    self._request.url,
                    response.status_code,
                )
                return UPDATE_ERROR, None
        except requests.exceptions.RequestException as request_ex:
            _LOGGER.warning(
                "Fetching data from %s failed with %s", self._request.url, request_ex
            )
            return UPDATE_ERROR, None
        except JSONDecodeError as decode_ex:
            _LOGGER.warning(
                "Unable to parse JSON from %s: %s", self._request.url, decode_ex
            )
            return UPDATE_ERROR, None

    def _filter_entries(self, entries):
        """Filter the provided entries."""
        return self._filter_entries_override(entries, None)

    def _filter_entries_override(self, entries, filter_overrides: Dict = None):
        """Filter the provided entries."""
        filtered_entries = entries
        # Always remove entries without geometry
        filtered_entries = list(
            filter(lambda entry: entry.geometry is not None, filtered_entries)
        )
        # Filter by distance.
        filter_radius = (
            filter_overrides[FILTER_RADIUS]
            if filter_overrides and FILTER_RADIUS in filter_overrides
            else self._filter_radius
        )
        if filter_radius:
            filtered_entries = list(
                filter(
                    lambda entry: entry.distance_to_home <= filter_radius,
                    filtered_entries,
                )
            )
        return filtered_entries

    def _extract_from_feed(self, feed):
        """Extract global metadata from feed."""
        return None

    def _extract_last_timestamp(self, feed_entries):
        """Determine latest (newest) entry from the filtered feed."""
        return None

    @property
    def last_timestamp(self) -> Optional[datetime]:
        """Return the last timestamp extracted from this feed."""
        return self._last_timestamp


class FeedEntry:
    """Feed entry base class."""

    def __init__(self, home_coordinates, feature):
        """Initialise this feed entry."""
        self._home_coordinates = home_coordinates
        self._feature = feature

    def __repr__(self):
        """Return string representation of this entry."""
        return "<{}(id={})>".format(self.__class__.__name__, self.external_id)

    @property
    def geometry(self):
        """Return all geometry details of this entry."""
        if self._feature:
            return self._feature.geometry
        return None

    @property
    def coordinates(self):
        """Return the best coordinates (latitude, longitude) of this entry."""
        if self.geometry:
            return GeoJsonDistanceHelper.extract_coordinates(self.geometry)
        return None

    @property
    def title(self) -> Optional[str]:
        """Return the title of this entry."""
        return None

    @property
    def external_id(self) -> Optional[str]:
        """Return the external id of this entry."""
        return None

    @property
    def attribution(self) -> Optional[str]:
        """Return the attribution of this entry."""
        return None

    @property
    def distance_to_home(self):
        """Return the distance in km of this entry to the home coordinates."""
        return GeoJsonDistanceHelper.distance_to_geometry(
            self._home_coordinates, self.geometry
        )

    def _search_in_feature(self, name):
        """Find an attribute in the feature object."""
        if self._feature and name in self._feature:
            return self._feature[name]
        return None

    def _search_in_properties(self, name):
        """Find an attribute in the feed entry's properties."""
        if (
            self._feature
            and self._feature.properties
            and name in self._feature.properties
        ):
            return self._feature.properties[name]
        return None


class GeoJsonDistanceHelper:
    """Helper to calculate distances between GeoJSON geometries."""

    def __init__(self):
        """Initialize the geo distance helper."""
        pass

    @staticmethod
    def extract_coordinates(geometry):
        """Extract the best coordinates from the feature for display."""
        latitude = longitude = None
        if isinstance(geometry, Point):
            # Just extract latitude and longitude directly.
            latitude, longitude = geometry.coordinates[1], geometry.coordinates[0]
        elif isinstance(geometry, GeometryCollection):
            # Go through the collection, and extract the first suitable
            # geometry.
            for entry in geometry.geometries:
                latitude, longitude = GeoJsonDistanceHelper.extract_coordinates(entry)
                if latitude is not None and longitude is not None:
                    break
        elif isinstance(geometry, Polygon):
            # Find the polygon's centroid as a best approximation for the map.
            longitudes_list = [point[0] for point in geometry.coordinates[0]]
            latitudes_list = [point[1] for point in geometry.coordinates[0]]
            number_of_points = len(geometry.coordinates[0])
            longitude = sum(longitudes_list) / number_of_points
            latitude = sum(latitudes_list) / number_of_points
            _LOGGER.debug(
                "Centroid of %s is %s", geometry.coordinates[0], (latitude, longitude)
            )
        else:
            _LOGGER.debug("Not implemented: %s", type(geometry))
        return latitude, longitude

    @staticmethod
    def distance_to_geometry(home_coordinates, geometry):
        """Calculate the distance between home coordinates and geometry."""
        distance = float("inf")
        if isinstance(geometry, Point):
            distance = GeoJsonDistanceHelper._distance_to_point(
                home_coordinates, geometry
            )
        elif isinstance(geometry, GeometryCollection):
            distance = GeoJsonDistanceHelper._distance_to_geometry_collection(
                home_coordinates, geometry
            )
        elif isinstance(geometry, Polygon):
            distance = GeoJsonDistanceHelper._distance_to_polygon(
                home_coordinates, geometry.coordinates[0]
            )
        else:
            _LOGGER.debug("Not implemented: %s", type(geometry))
        return distance

    @staticmethod
    def _distance_to_point(home_coordinates, point):
        """Calculate the distance between home coordinates and the point."""
        # Swap coordinates to match: (latitude, longitude).
        return GeoJsonDistanceHelper._distance_to_coordinates(
            home_coordinates, (point.coordinates[1], point.coordinates[0])
        )

    @staticmethod
    def _distance_to_geometry_collection(home_coordinates, geometry_collection):
        """Calculate the distance between home coordinates and the geometry
        collection."""
        distance = float("inf")
        for geometry in geometry_collection.geometries:
            distance = min(
                distance,
                GeoJsonDistanceHelper.distance_to_geometry(home_coordinates, geometry),
            )
        return distance

    @staticmethod
    def _distance_to_polygon(home_coordinates, polygon):
        """Calculate the distance between home coordinates and the polygon."""
        distance = float("inf")
        # Calculate distance from polygon by calculating the distance
        # to each point of the polygon but not to each edge of the
        # polygon; should be good enough
        for polygon_point in polygon:
            distance = min(
                distance,
                GeoJsonDistanceHelper._distance_to_coordinates(
                    home_coordinates, (polygon_point[1], polygon_point[0])
                ),
            )
        return distance

    @staticmethod
    def _distance_to_coordinates(home_coordinates, coordinates):
        """Calculate the distance between home coordinates and the
        coordinates."""
        # Expecting coordinates in format: (latitude, longitude).
        return haversine(coordinates, home_coordinates)