gfw-api/gfw-analysis-gee

View on GitHub
gfwanalysis/middleware.py

Summary

Maintainability
D
2 days
Test Coverage
F
19%
"""MIDDLEWARE"""

import json
import logging
from functools import wraps

from flask import request, redirect

from gfwanalysis.errors import GeostoreNotFound
from gfwanalysis.routes.api import error
from gfwanalysis.services.analysis.landsat_tiles_v2 import RedisService
from gfwanalysis.services.area_service import AreaService
from gfwanalysis.services.geostore_service import GeostoreService


def exist_tile(func):
    """Get geodata"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        url = RedisService.get(request.path)
        if url is None:
            return func(*args, **kwargs)
        else:
            return redirect(url)

    return wrapper


def exist_mapid(func):
    """Get geodata"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        year = kwargs['year']
        kwargs["map_object"] = RedisService.check_year_mapid(year)
        return func(*args, **kwargs)

    return wrapper


def get_classification_params(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            img_id = request.args.get('img_id')
            if not img_id:
                return error(status=400,
                             detail='An img_id string is needed (from Landsat-8 or Sentinel-2 collections).')
        kwargs["img_id"] = img_id
        return func(*args, **kwargs)

    return wrapper


def get_sentinel_params(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            lat = request.args.get('lat')
            lon = request.args.get('lon')
            start = request.args.get('start')
            end = request.args.get('end')
            if not lat or not lon or not start or not end:
                return error(status=400, detail='Some parameters are needed')
        kwargs["lat"] = lat
        kwargs["lon"] = lon
        kwargs["start"] = start
        kwargs["end"] = end
        return func(*args, **kwargs)

    return wrapper


def get_highres_params(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            lat = request.args.get('lat')
            lon = request.args.get('lon')
            start = request.args.get('start')
            end = request.args.get('end')
            if not lat or not lon or not start or not end:
                return error(status=400, detail='Some parameters are needed')
        kwargs["lat"] = lat
        kwargs["lon"] = lon
        kwargs["start"] = start
        kwargs["end"] = end
        return func(*args, **kwargs)

    return wrapper


def get_sentinel_mosaic_params(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            start = request.args.get('start', None)
            end = request.args.get('end', None)
            cloudscore_thresh = request.args.get('cloudscore_thresh', 10)
            bounds = request.args.get('bounds', False)
        kwargs["start"] = start
        kwargs["end"] = end
        kwargs["cloudscore_thresh"] = cloudscore_thresh
        kwargs["bounds"] = bounds
        return func(*args, **kwargs)

    return wrapper


def get_recent_params(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            lat = request.args.get('lat')
            lon = request.args.get('lon')
            start = request.args.get('start')
            end = request.args.get('end')
            sort_by = request.args.get('sort_by', None)
            bmin = request.args.get('min', None)
            bmax = request.args.get('max', None)
            opacity = request.args.get('opacity', 1.0)
            bands = request.args.get('bands', None)
            if not lat or not lon or not start or not end:
                return error(status=400, detail='[RECENT] Parameters: (lat, lon. start, end) are needed')
        kwargs["lat"] = lat
        kwargs["lon"] = lon
        kwargs["start"] = start
        kwargs["end"] = end
        kwargs["sort_by"] = sort_by
        kwargs["bmin"] = bmin
        kwargs["bmax"] = bmax
        kwargs["opacity"] = float(opacity)
        kwargs["bands"] = bands
        return func(*args, **kwargs)

    return wrapper


def get_recent_tiles(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'POST':
            data_array = request.get_json().get('source_data', [])
            bands = request.args.get('bands', None)
            if not bands:
                bands = request.get_json().get('bands', None)
            bmin = request.args.get('min', None)
            bmax = request.args.get('max', None)
            opacity = request.args.get('opacity', 1.0)
            if not data_array:
                return error(status=400, detail='[TILES] Some parameters are needed')
        kwargs["data_array"] = data_array
        kwargs["bands"] = bands
        kwargs["data_array"] = data_array
        kwargs["bands"] = bands
        kwargs["bmin"] = bmin
        kwargs["bmax"] = bmax
        kwargs["opacity"] = float(opacity)
        return func(*args, **kwargs)

    return wrapper


def get_recent_thumbs(func):
    @wraps(func)
    def wrapper(*args, **kwargs):

        if request.method == 'POST':
            data_array = request.get_json().get('source_data', [])
            bands = request.args.get('bands', None)
            if not bands:
                bands = request.get_json().get('bands', None)
            bmin = request.args.get('min', None)
            bmax = request.args.get('max', None)
            opacity = request.args.get('opacity', 1.0)
            if not data_array:
                return error(status=400, detail='[THUMBS] Some parameters are needed')
        kwargs["data_array"] = data_array
        kwargs["bands"] = bands
        kwargs["bmin"] = bmin
        kwargs["bmax"] = bmax
        kwargs["opacity"] = float(opacity)
        return func(*args, **kwargs)

    return wrapper


def get_mc_info(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'POST':
            data_array = request.get_json().get('timeseries')
            window = request.args.get('window', None)
            if window:
                window = int(window)
            mc_number = request.args.get('mc_number', None)
            if mc_number:
                mc_number = int(mc_number)
            bin_number = request.args.get('bin_number', None)
            if bin_number:
                bin_number = int(bin_number)
            if not data_array:
                return error(status=400, detail='[MC] Timeseries is required')
        kwargs["timeseries"] = data_array
        kwargs["window"] = window
        kwargs["mc_number"] = mc_number
        kwargs["bin_number"] = bin_number
        return func(*args, **kwargs)

    return wrapper


def get_geo_by_hash(func):
    """Get geodata"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            geostore = request.args.get('geostore')
            if not geostore:
                return error(status=400, detail='Geostore is required')
            try:
                geojson, area_ha = GeostoreService.get(geostore, request.headers.get("x-api-key"))
            except GeostoreNotFound:
                return error(status=404, detail='Geostore not found')
        elif request.method == 'POST':
            geojson = request.get_json().get('geojson', None) if request.get_json() else None
            try:
                area_ha = AreaService.tabulate_area(geojson)
            except Exception as e:
                logging.info(f"[middleware geo hash] Exception")
                return error(status=500, detail=str(e))

        kwargs["geojson"] = geojson
        kwargs["area_ha"] = area_ha
        return func(*args, **kwargs)

    return wrapper


def get_composite_params(func):
    """Get instrument"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method in ['GET', 'POST']:
            instrument = request.args.get('instrument', False)
            if not instrument:
                instrument = 'landsat'
            date_range = request.args.get('date_range', False)
            if not date_range:
                date_range = ""
            thumb_size = request.args.get('thumb_size', False)
            if not thumb_size:
                thumb_size = [500, 500]
            else:
                thumb_size = [int(size.strip()) for size in thumb_size.replace('[', '').replace(']', '').split(',')]
            classify = request.args.get('classify', False)
            if classify and classify.lower() == 'true':
                classify = True
            else:
                classify = False
            band_viz = request.args.get('band_viz', False)
            if not band_viz:
                band_viz = {'bands': ['B4', 'B3', 'B2'], 'min': 0, 'max': 0.4}
            else:
                band_viz = json.loads(band_viz)
            get_dem = request.args.get('get_dem', False)
            if get_dem and get_dem.lower() == 'true':
                get_dem = True
            else:
                get_dem = False
            get_stats = request.args.get('get_stats', False)
            if get_stats and get_stats.lower() == 'true':
                get_stats = True
            else:
                get_stats = False
            show_bounds = request.args.get('show_bounds', False)
            if show_bounds and show_bounds.lower() == 'true':
                show_bounds = True
            else:
                show_bounds = False
            cloudscore_thresh = request.args.get('cloudscore_thresh', False)
            if not cloudscore_thresh:
                cloudscore_thresh = 5
            else:
                cloudscore_thresh = int(cloudscore_thresh)
        kwargs['get_stats'] = get_stats
        kwargs['get_dem'] = get_dem
        kwargs['classify'] = classify
        kwargs['thumb_size'] = thumb_size
        kwargs['date_range'] = date_range
        kwargs['instrument'] = instrument
        kwargs['band_viz'] = band_viz
        kwargs['show_bounds'] = show_bounds
        kwargs['cloudscore_thresh'] = cloudscore_thresh
        return func(*args, **kwargs)

    return wrapper


def get_geo_by_geom(func):
    """Get geometry data"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            # logging.info(f'[decorator - geom]: Getting area by GET {request}')
            geojson = request.args.get('geojson')
            if not geojson:
                return error(status=400, detail='geojson is required')
        elif request.method == 'POST':
            geojson = request.get_json().get('geojson', None) if request.get_json() else None
        try:
            area_ha = AreaService.tabulate_area(geojson)
        except Exception as e:
            return error(status=500, detail=str(e))
        kwargs["geojson"] = geojson
        kwargs['area_ha'] = area_ha
        return func(*args, **kwargs)

    return wrapper


def get_geo_by_national(func):
    """Get geodata"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            try:
                iso = request.view_args.get('iso')
                geojson, area_ha = GeostoreService.get_national(iso, request.headers.get("x-api-key"))
            except GeostoreNotFound:
                return error(status=404, detail='Geostore not found')
            kwargs["geojson"] = geojson
            kwargs["area_ha"] = area_ha
        return func(*args, **kwargs)

    return wrapper


def get_geo_by_subnational(func):
    """Get geodata"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            try:
                iso = request.view_args.get('iso')
                id1 = request.view_args.get('id1')
                geojson, area_ha = GeostoreService.get_subnational(iso, id1, request.headers.get("x-api-key"))
            except GeostoreNotFound:
                return error(status=404, detail='Geostore not found')
            kwargs["geojson"] = geojson
            kwargs["area_ha"] = area_ha
        return func(*args, **kwargs)

    return wrapper


def get_geo_by_regional(func):
    """Get geodata"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            try:
                iso = request.view_args.get('iso')
                id1 = request.view_args.get('id1')
                id2 = request.view_args.get('id2')
                geojson, area_ha = GeostoreService.get_regional(iso, id1, id2, request.headers.get("x-api-key"))
            except GeostoreNotFound:
                return error(status=404, detail='Geostore not found')
            kwargs["geojson"] = geojson
            kwargs["area_ha"] = area_ha
        return func(*args, **kwargs)

    return wrapper


def get_geo_by_use(func):
    """Get geodata"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            try:
                name = request.view_args.get('name')
                use_id = request.view_args.get('id')
                geojson, area_ha = GeostoreService.get_use(name, use_id, request.headers.get("x-api-key"))
            except GeostoreNotFound:
                return error(status=404, detail='Geostore not found')
            kwargs["geojson"] = geojson
            kwargs["area_ha"] = area_ha
        return func(*args, **kwargs)

    return wrapper


def get_geo_by_wdpa(func):
    """Get geodata"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.method == 'GET':
            try:
                wdpa_id = request.view_args.get('id')
                geojson, area_ha = GeostoreService.get_wdpa(wdpa_id, request.headers.get("x-api-key"))
            except GeostoreNotFound:
                return error(status=404, detail='Geostore not found')
            kwargs["geojson"] = geojson
            kwargs["area_ha"] = area_ha
        return func(*args, **kwargs)

    return wrapper