exxamalte/python-aio-georss-client

View on GitHub
aio_georss_client/xml_parser/feed_item.py

Summary

Maintainability
A
25 mins
Test Coverage
"""GeoRSS feed item."""
from __future__ import annotations

import logging

from ..consts import (
    XML_TAG_GDACS_BBOX,
    XML_TAG_GEO_LAT,
    XML_TAG_GEO_LONG,
    XML_TAG_GEO_POINT,
    XML_TAG_GEORSS_POINT,
    XML_TAG_GEORSS_POLYGON,
    XML_TAG_GEORSS_WHERE,
    XML_TAG_GML_EXTERIOR,
    XML_TAG_GML_LINEAR_RING,
    XML_TAG_GML_POINT,
    XML_TAG_GML_POLYGON,
    XML_TAG_GML_POS,
    XML_TAG_GML_POS_LIST,
    XML_TAG_GUID,
    XML_TAG_ID,
    XML_TAG_SOURCE,
)
from .feed_or_feed_item import FeedOrFeedItem
from .geometry import BoundingBox, Geometry, Point, Polygon

_LOGGER = logging.getLogger(__name__)


class FeedItem(FeedOrFeedItem):
    """Represents a feed item."""

    def __repr__(self):
        """Return string representation of this feed item."""
        return f"<{self.__class__.__name__}({self.guid})>"

    @property
    def guid(self) -> str | None:
        """Return the guid of this feed item."""
        return self._attribute_with_text([XML_TAG_GUID, XML_TAG_ID])

    @property
    def id(self) -> str | None:
        """Return the id of this feed item."""
        return self.guid

    @property
    def source(self) -> str | None:
        """Return the source of this feed item."""
        return self._attribute([XML_TAG_SOURCE])

    @property
    def geometries(self) -> list[Geometry] | None:
        """Return all geometries of this feed item."""
        geometries = []
        for entry in [
            self._geometry_georss_point(),
            self._geometry_georss_where(),
            self._geometry_geo_point(),
            self._geometry_geo_long_lat(),
            self._geometry_georss_polygon(),
            self._geometry_gdacs_bbox(),
        ]:
            if entry:
                geometries.extend(entry)
        # Filter out any duplicates.
        unique_geometries = []
        for i in geometries:
            if i not in unique_geometries:
                unique_geometries.append(i)
        return unique_geometries

    def _geometry_georss_point(self) -> list[Point] | None:
        """Check for georss:point tag."""
        # <georss:point>-0.5 119.8</georss:point>
        point = self._attribute([XML_TAG_GEORSS_POINT])
        if point:
            if isinstance(point, tuple):
                return FeedItem._create_georss_point_single(point)
            else:
                return FeedItem._create_georss_point_multiple(point)
        return None

    @staticmethod
    def _create_georss_point_single(point: tuple) -> list[Point]:
        """Create single point from provided coordinates."""
        return [Point(point[0], point[1])]

    @staticmethod
    def _create_georss_point_multiple(point: list) -> list[Point]:
        """Create multiple points from provided coordinates."""
        points: list[Point] = []
        for entry in point:
            points.append(Point(entry[0], entry[1]))
        return points

    def _geometry_georss_where(self) -> list[Geometry] | None:
        """Check for georss:where tag."""
        where = self._attribute([XML_TAG_GEORSS_WHERE])
        if where:
            # Point:
            # <georss:where>
            #   <gml:Point>
            #     <gml:pos>44.11 -66.23</gml:pos>
            #   </gml:Point>
            # </georss:where>
            pos = self._attribute_in_structure(
                where, [XML_TAG_GML_POINT, XML_TAG_GML_POS]
            )
            if pos:
                return [Point(pos[0], pos[1])]
            # Polygon:
            # <georss:where>
            #   <gml:Polygon>
            #     <gml:exterior>
            #       <gml:LinearRing>
            #         <gml:posList>
            #           -71.106216 42.366661
            #           -71.105576 42.367104
            #           -71.104378 42.367134
            #           -71.103729 42.366249
            #           -71.098793 42.363331
            #           -71.101028 42.362541
            #           -71.106865 42.366123
            #           -71.106216 42.366661
            #         </gml:posList>
            #       </gml:LinearRing>
            #     </gml:exterior>
            #   </gml:Polygon>
            # </georss:where>
            pos_list = self._attribute_in_structure(
                where,
                [
                    XML_TAG_GML_POLYGON,
                    XML_TAG_GML_EXTERIOR,
                    XML_TAG_GML_LINEAR_RING,
                    XML_TAG_GML_POS_LIST,
                ],
            )
            if pos_list:
                return self._create_polygon(pos_list)
        return None

    def _geometry_geo_point(self) -> list[Point] | None:
        """Check for geo:Point tag."""
        # <geo:Point xmlns:geo="http://www.w3.org/2003/01/geo/wgs84_pos#">
        #   <geo:lat>38.3728</geo:lat>
        #   <geo:long>15.7213</geo:long>
        # </geo:Point>
        point = self._attribute([XML_TAG_GEO_POINT])
        if point:
            lat = point.get(XML_TAG_GEO_LAT)
            long = point.get(XML_TAG_GEO_LONG)
            if long and lat:
                return [Point(lat, long)]
        return None

    def _geometry_geo_long_lat(self) -> list[Point] | None:
        """Check for geo:long and geo:lat tags."""
        # <geo:long>119.948006</geo:long>
        # <geo:lat>-23.126413</geo:lat>
        lat = self._attribute([XML_TAG_GEO_LAT])
        long = self._attribute([XML_TAG_GEO_LONG])
        if long and lat:
            return [Point(lat, long)]
        return None

    def _geometry_gdacs_bbox(self) -> list[BoundingBox] | None:
        """Check for gdacs:bbox tag."""
        # <!--gdacs: bbox format = lonmin lonmax latmin latmax -->
        # <gdacs:bbox>164.5652 172.5652 -24.9041 -16.9041</gdacs:bbox>
        bbox = self._attribute([XML_TAG_GDACS_BBOX])
        if bbox:
            if isinstance(bbox, tuple):
                return FeedItem._create_bbox_single(bbox)
            else:
                return FeedItem._create_bbox_multiple(bbox)
        return None

    @staticmethod
    def _create_bbox_single(bbox: tuple) -> list[BoundingBox]:
        """Create single bbox from provided tuple of coordinates."""
        return [BoundingBox(Point(bbox[2], bbox[0]), Point(bbox[3], bbox[1]))]

    @staticmethod
    def _create_bbox_multiple(bbox: list) -> list[BoundingBox]:
        """Create multiple bboxes from provided list of coordinates."""
        bounding_boxes = []
        for entry in bbox:
            if len(entry) == 4:
                bounding_boxes.append(
                    BoundingBox(Point(entry[2], entry[0]), Point(entry[3], entry[1]))
                )
            else:
                _LOGGER.warning("Insufficient data for " "bounding box: %s", entry)
        return bounding_boxes

    def _geometry_georss_polygon(self) -> list[Polygon] | None:
        """Check for georss:polygon tag."""
        # <georss:polygon>
        #   -34.937663524 148.597260613
        #   -34.9377026399999 148.597169138
        #   -34.9377002169999 148.59708737
        #   -34.9376945989999 148.59705595
        #   -34.9376863529999 148.596955098
        #   -34.937663524 148.597260613
        # </georss:polygon>
        polygon = self._attribute([XML_TAG_GEORSS_POLYGON])
        if polygon:
            return self._create_polygon(polygon)
        return None

    @staticmethod
    def _create_polygon(polygon_data) -> list[Polygon] | None:
        """Create a polygon from the provided coordinates."""
        if polygon_data:
            # Either tuple or an array of tuples.
            if isinstance(polygon_data, tuple):
                return FeedItem._create_polygon_single(polygon_data)
            else:
                return FeedItem._create_polygon_multiple(polygon_data)
        return None

    @staticmethod
    def _create_polygon_single(polygon_data: tuple) -> list[Polygon]:
        """Create polygon from provided tuple of coordinates."""
        if len(polygon_data) % 2 != 0:
            # Not even number of coordinates - chop last entry.
            polygon_data = polygon_data[0 : len(polygon_data) - 1]
        points = []
        for i in range(0, len(polygon_data), 2):
            points.append(Point(polygon_data[i], polygon_data[i + 1]))
        return [Polygon(points)]

    @staticmethod
    def _create_polygon_multiple(polygon_data: list) -> list[Polygon]:
        """Create polygon from provided list of coordinates."""
        polygons = []
        for entry in polygon_data:
            polygons.extend(FeedItem._create_polygon(entry))
        return polygons