Terralego/django-geostore

View on GitHub
geostore/tiles/mixins.py

Summary

Maintainability
A
1 hr
Test Coverage
from hashlib import sha224
from urllib.parse import unquote, urljoin

from django.core.cache import cache
from django.db.models import Q
from django.http import HttpResponse, QueryDict
from django.urls import reverse
from django.utils.functional import cached_property
from django.utils.encoding import escape_uri_path
from django.utils.text import slugify
from django.utils.html import escape
from django.utils.timezone import now
from rest_framework.decorators import action
from rest_framework.response import Response

from ..models import Feature
from .. import settings as app_settings
from ..tokens import tiles_token_generator
from .helpers import VectorTile


class AuthenticatedGroupsMixin:
    IDB64_QUERY_ARG = 'idb64'
    TOKEN_QUERY_ARG = 'token'

    @cached_property
    def authenticated_groups(self):
        token, idb64 = self.request.GET.get(self.TOKEN_QUERY_ARG), self.request.GET.get(self.IDB64_QUERY_ARG)
        if token and idb64:
            groups, layergroup = tiles_token_generator.decode_idb64(idb64)

            if groups and layergroup and tiles_token_generator.check_token(token, groups, layergroup):
                return groups
        return []

    def is_authorized(self, layer):
        if self.request.user.is_authenticated and self.request.user.is_superuser:
            return True
        if layer.authorized_groups.exists():
            return layer.authorized_groups.filter(pk__in=self.authenticated_groups).exists()
        return True


class MVTViewMixin(AuthenticatedGroupsMixin):
    tile_response_class = HttpResponse
    tile_content_type = 'application/vnd.mapbox-vector-tile'

    @action(detail=True, url_path=r'tiles/\{z\}/\{x\}/\{y\}', permission_classes=[], url_name='tiles-pattern')
    def tiles_pattern(self, request, *args, **kwargs):
        """ Fake pattern to reverse tile url """
        return Response(status=404)

    @action(detail=True, permission_classes=[])
    def tilejson(self, request, *args, **kwargs):
        """ MVT layer tilejson """
        last_update = self.get_last_update()
        cache_key = sha224(
            f"tilejson-{self.get_object().name}{'-'.join([g.name for g in self.authenticated_groups])}".encode()
        ).hexdigest()
        version = int(last_update.timestamp())
        tilejson_data = cache.get(cache_key, version=version)

        if not tilejson_data:
            tilejson_data = self.get_tilejson()
            cache.set(cache_key, tilejson_data, version=version)
        return Response(tilejson_data)

    @action(detail=True, url_name='tiles', permission_classes=[],
            url_path=r'tiles/(?P<z>[\d-]+)/(?P<x>[\d-]+)/(?P<y>[\d-]+)', )
    def tiles(self, request, z, x, y, **kwargs):
        return self.tile_response_class(
            self.get_tile(int(z), int(x), int(y)),
            content_type=self.tile_content_type
        )

    def get_tile_for_layer(self, layer, z, x, y, name=None, features_pk=None):
        tile = VectorTile(layer)
        return tile.get_tile(
            x, y, z, name, features_pk
        )

    def get_tile(self, z, x, y):
        tiles_array = []
        for layer in self.layers:
            minzoom = layer.layer_settings_with_default('tiles', 'minzoom')
            maxzoom = layer.layer_settings_with_default('tiles', 'maxzoom')
            if minzoom <= z <= int(maxzoom) and self.is_authorized(layer):
                unused, tile = self.get_tile_for_layer(layer, z, x, y)
                tiles_array.append(tile)

            for extra_layer in layer.extra_geometries.all():
                unused, tile = self.get_tile_for_layer(extra_layer, z, x, y)
                tiles_array.append(tile)

            for relation in layer.relations_as_origin.all():
                relation_layer = relation.destination
                relation_features = relation.related_features.values_list('destination', flat=True)
                unused, tile = self.get_tile_for_layer(relation_layer,
                                                       z, x, y,
                                                       f'relation-{slugify(layer.name)}-{slugify(relation.name)}',
                                                       relation_features)
                tiles_array.append(tile)

        return b''.join(tiles_array)

    def get_tile_path(self):
        return self.get_tokenized_url(
            reverse("layer-tiles-pattern", args=[self.get_object().pk])
        )

    @cached_property
    def layers(self):
        # keep a qs result here

        return type(self.get_object()).objects.filter(
            Q(pk=self.get_object().pk) &
            (
                Q(authorized_groups__isnull=True) | Q(authorized_groups__in=self.authenticated_groups)
            )
        )

    def get_tokenized_url(self, url):
        if self.authenticated_groups:
            querystring = QueryDict(mutable=True)
            querystring.update({
                self.IDB64_QUERY_ARG: self.request.GET.get(self.IDB64_QUERY_ARG),
                self.TOKEN_QUERY_ARG: self.request.GET.get(self.TOKEN_QUERY_ARG)
            })
            return f'{url}?{querystring.urlencode()}'
        return url

    @staticmethod
    def settings_link(layer, *args):
        layer_settings = layer.layer_settings_with_default(*args)
        if 'link' in layer_settings:
            return '<a href="{0}"/>{1}</a>'.format(
                escape_uri_path(layer_settings['link']),
                escape(layer_settings['name'])
            )
        return layer_settings

    def get_min_zoom(self):
        return max(
            app_settings.MIN_TILE_ZOOM,
            min([
                layer.layer_settings_with_default('tiles', 'minzoom')
                for layer in self.layers
            ], default=app_settings.MIN_TILE_ZOOM))

    def get_max_zoom(self):
        return min(
            app_settings.MAX_TILE_ZOOM,
            max([
                layer.layer_settings_with_default('tiles', 'maxzoom')
                for layer in self.layers
            ], default=app_settings.MAX_TILE_ZOOM))

    def _join_group_settings_link(self, layers, *args):
        return ','.join(set([
            a if 'link' not in a else
            '<a href="{0}"/>{1}</a>'.format(escape_uri_path(a['link']), escape(a['name']))
            for a in [layer.layer_settings_with_default(*args) for layer in self.layers]
            if a
        ])) or None

    def _join_group_settings_string(self, layers, *args):
        return ','.join(set([
            a
            for a in [layer.layer_settings_with_default(*args) for layer in self.layers]
            if a
        ])) or None

    def get_attribution(self):
        return self._join_group_settings_link(self.layers, 'metadata', 'attribution')

    def get_description(self):
        return self._join_group_settings_string(self.layers, 'metadata', 'description')

    @staticmethod
    def layer_fields(layer):
        properties_filter = layer.layer_settings_with_default(
            'tiles', 'properties_filter')
        if properties_filter is not None:
            fields = properties_filter
        else:
            fields = layer.layer_properties.keys()

        return {f: '' for f in fields}

    def get_vector_layers(self):
        data = []
        for layer in self.layers:
            data.append({
                'id': layer.name,
                'description': layer.name.title(),
                'fields': self.layer_fields(layer),
                'minzoom': layer.layer_settings_with_default('tiles', 'minzoom'),
                'maxzoom': layer.layer_settings_with_default('tiles', 'maxzoom'),
            })
            for extra_geom in layer.extra_geometries.all():
                data.append({
                    'id': f'{extra_geom.name}',
                    'description': f'{extra_geom.title}'.title(),
                    'fields': {},
                    'minzoom': layer.layer_settings_with_default('tiles', 'minzoom'),
                    'maxzoom': layer.layer_settings_with_default('tiles', 'maxzoom'),
                })
            for relation in layer.relations_as_origin.all():
                relation_layer = relation.destination
                data.append({
                    'id': f'relation-{slugify(layer.name)}-{slugify(relation.name)}',
                    'description': relation.name.title(),
                    'fields': {},
                    'minzoom': relation_layer.layer_settings_with_default('tiles', 'minzoom'),
                    'maxzoom': relation_layer.layer_settings_with_default('tiles', 'maxzoom'),
                })

        return data

    def get_tile_urls(self, tile_pattern):
        if app_settings.TERRA_TILES_HOSTNAMES:
            return [
                unquote(urljoin(hostname, tile_pattern))
                for hostname in app_settings.TERRA_TILES_HOSTNAMES
            ]
        else:
            return [
                unquote(urljoin(self.request.build_absolute_uri('/'), tile_pattern))
            ]

    def get_tilejson(self):
        minzoom = self.get_min_zoom()
        maxzoom = self.get_max_zoom()

        tile_pattern = self.get_tile_path()

        # https://github.com/mapbox/tilejson-spec/tree/3.0/3.0.0
        return {
            'tilejson': '3.0.0',
            'name': self.get_object().name,
            'tiles': self.get_tile_urls(tile_pattern),
            'minzoom': minzoom,
            'maxzoom': maxzoom,
            # bounds
            # center
            'attribution': self.get_attribution(),
            'description': self.get_description(),
            'vector_layers': self.get_vector_layers()
        }

    def get_last_update(self):
        features = Feature.objects.filter(layer__in=self.layers).order_by('-updated_at')

        if features.exists():
            ref_object = features.first()
        else:
            ref_object = self.layers.order_by('-updated_at').first()

        return ref_object.updated_at if ref_object else now()


class MultipleMVTViewMixin(MVTViewMixin):
    def get_tile_path(self):
        return self.get_tokenized_url(
            reverse("group-tiles-pattern", args=[self.get_object().slug])
        )

    @cached_property
    def layers(self):
        # Get the non authentified layers
        return self.get_object().layers.filter(
            Q(authorized_groups__isnull=True) |
            Q(authorized_groups__in=self.authenticated_groups)
        )