gfw-api/gfw-analysis-gee

View on GitHub
gfwanalysis/utils/geo.py

Summary

Maintainability
B
6 hrs
Test Coverage
F
40%
import asyncio
import functools as funct
import logging

import ee
import geocoder
from shapely.geometry import shape, GeometryCollection


def get_extent_fc(image, fc, method='reduce_regions', bestEffort=False, scale=False, numPixels=10000):
    """ Given an ee.featureCollection (fc) of features each with
    a single geometry calculate the total extent (area_m2) using
    a single band binary image. The single band binary image is
    multiplied by pixel area, ee.Reducer.sum is applied at the nominal scale
    of the image to each geometry in the fc, and the total area in meters squared
    is calculated by summing all feautes in the collection."""
    im = image.multiply(ee.Image.pixelArea())
    ns = image.projection().nominalScale()
    scale = ee.Algorithms.If(scale, scale, ns)
    bn = im.bandNames().get(0)

    def get_reduce_regions(fc):
        res = im.reduceRegions( \
            reducer=ee.Reducer.sum().unweighted(), \
            collection=ee.FeatureCollection(fc), \
            scale=scale, \
            tileScale=1) \
            .aggregate_sum('sum')
        return ee.FeatureCollection(ee.Feature(None, {'area_m2': res}))

    def get_reduce_map(f):
        res = im.reduceRegion( \
            reducer=ee.Reducer.sum().unweighted(), \
            geometry=f.geometry(), \
            scale=scale, \
            bestEffort=bestEffort, \
            maxPixels=1e8, \
            tileScale=1) \
            .get(bn)
        return ee.Feature(None, {'area_m2': res})

    def get_sample(f):
        fc = image.sample(
            region=f.geometry(), \
            numPixels=numPixels, \
            scale=scale, \
            tileScale=1, \
            dropNulls=False)
        tc = fc.filterMetadata(bn, 'equals', 1).aggregate_count(bn)
        tnc = fc.size()
        area = f.geometry().area(1)
        res = ee.Number(tc).divide(tnc).multiply(area)
        return ee.Feature(None, {'total_px': tnc, 'yes_px': tc, 'geom_area_m2': area, 'area_m2': res})

    out = ee.Dictionary({ \
        'reduce_regions': get_reduce_regions(fc),
        'reduce_region_map': ee.FeatureCollection(fc).map(get_reduce_map),
        'sample_region_map': ee.FeatureCollection(fc).map(get_sample)
    })
    return ee.Number(ee.FeatureCollection(out.get(method)).aggregate_sum('area_m2'))


def reduce_extent_fc(image, fc, bestEffort=False, scale=False):
    """ Given an ee.featureCollection (fc) of features each with
    a single geometry calculate the total extent (area_m2) using
    a single band binary image. The single band binary image is
    multiplied by pixel area, ee.Reducer.sum is applied at the nominal scale
    of the image to each geometry in the fc, and the total area in meters squared
    is calculated by summing all feautes in the collection."""
    im = image.multiply(ee.Image.pixelArea())
    ns = image.projection().nominalScale()
    scale = ee.Algorithms.If(scale, scale, ns)
    bn = im.bandNames().get(0)

    def get_reduce(f):
        res = im.reduceRegion( \
            reducer=ee.Reducer.sum().unweighted(), \
            geometry=f.geometry(), \
            scale=scale, \
            bestEffort=bestEffort, \
            maxPixels=1e12, \
            tileScale=4) \
            .get(bn)
        return ee.Feature(None, {'area_m2': res})

    fcc = ee.FeatureCollection(fc).map(get_reduce)
    return ee.Number(fcc.aggregate_sum('area_m2'))


def sample_extent_fc(image, fc, numPixels=5000, scale=False):
    """ Given an ee.featureCollection (fc) of features each with
    a single geometry calculate the total extent (area_m2) using
    a single band binary image. The single band binary image is
    sampled at the nominal scale of the image to each geometry in the fc
    , the proportion of 1s in the geometry is calculated, and the total area in meters squared
    is calculated by summing all feautes in the collection."""
    numPixels = ee.Number(numPixels)
    ns = image.projection().nominalScale()
    scale = ee.Algorithms.If(scale, scale, ns)
    bn = image.bandNames().get(0)

    def get_sample(f):
        fc = image.sample(
            region=f.geometry(), \
            numPixels=numPixels, \
            scale=scale, \
            tileScale=4, \
            dropNulls=True)
        tc = fc.filterMetadata(bn, 'equals', 1).aggregate_count(bn)
        area = f.geometry().area(1)
        res = ee.Number(tc).divide(numPixels).multiply(area)
        return ee.Feature(None, {'total_px': numPixels, 'yes_px': tc, 'geom_area_m2': area, 'area_m2': res})

    fcc = ee.FeatureCollection(fc).map(get_sample)
    return ee.Number(fcc.aggregate_sum('area_m2'))


def sample_statistics(image, geom, numPixels, scale):
    """ Sample within a region of a single band float image,
    returning a dictionary of statistics:
    accessed as ["values"]["<stat>"]
    Note this is an estimate"""
    bn = image.bandNames().getInfo()[0]
    fc = image.sample(region=geom, numPixels=numPixels, scale=scale, tileScale=16, dropNulls=True)
    return fc.aggregate_stats(bn)


def divide_geometry(feature):
    """ Divide feature geometry
    *
    * Split a features polygon geometry in 4 using its' center point
    * 
    * @param {ee.Feature} A feature with a polygon geometry to split.
    * 
    * @return {ee.FeatureCollection} Collection with 4 features per input feature.
    * 
    * @example aoi = Map.getBounds() fc = divideGeometry(aoi)
    """
    # Get coordinates of polygons bounds
    g = feature.geometry()
    l2 = g.bounds(1).coordinates().flatten()
    c2 = g.centroid(1).coordinates().flatten()
    # Create 4 cells dividing bounds at mid point
    t0 = ee.Feature(ee.Geometry.Polygon( \
        [l2.get(0), l2.get(1), \
         c2.get(0), l2.get(1), \
         c2.get(0), c2.get(1), \
         l2.get(0), c2.get(1), \
         l2.get(0), l2.get(1), \
         ]), {"name": "t0"})
    t1 = ee.Feature(ee.Geometry.Polygon( \
        [c2.get(0), l2.get(3), \
         l2.get(2), l2.get(3), \
         l2.get(2), c2.get(1), \
         c2.get(0), c2.get(1), \
         c2.get(0), l2.get(3), \
         ]), {"name": "t1"})
    t2 = ee.Feature(ee.Geometry.Polygon( \
        [c2.get(0), c2.get(1), \
         l2.get(4), c2.get(1), \
         l2.get(4), l2.get(5), \
         c2.get(0), l2.get(5), \
         c2.get(0), c2.get(1), \
         ]), {"name": "t2"})
    t3 = ee.Feature(ee.Geometry.Polygon( \
        [l2.get(0), c2.get(1), \
         c2.get(0), c2.get(1), \
         c2.get(0), l2.get(7), \
         l2.get(6), l2.get(7), \
         l2.get(0), c2.get(1), \
         ]), {"name": "t3"})
    # Make a featureCollection
    fc = ee.FeatureCollection([t0, t1, t2, t3, ])

    # Map intersection with feature and return cell name
    def tmp_f(f):
        return ee.Feature( \
            feature.geometry().intersection(f.geometry(), 1) \
            , {"name": f.get("name")})

    return ee.FeatureCollection(fc.map(tmp_f))


def ee_squaremeters_to_ha(value):
    """Converts square meters to hectares, and gives val to 2 decimal places"""
    tmp = ee.Number(value).divide(10000)
    tmp1 = ee.Number(ee.Number(tmp).format('%.2f'))
    return ee.Number.parse(tmp1)


def get_geocode(point):
    result = geocoder.osm(point, method='reverse', lang_code='en')
    if 'ERROR' in str(result): result = None
    return result


def loop_future(loop, f, a):
    return loop.run_in_executor(
        None,
        funct.partial(f, a),
    )


def buffer_geom(geom):
    buffer_size = geom.convexHull(1).bounds(1).area(1).sqrt().multiply(0.5)
    return geom.convexHull(1).buffer(buffer_size).bounds(1)


async def reverse_geocode_a_geostore(loop, shape):
    """ Take a shapely shape object and return geocoding results on the min/max coordinate locations"""
    w, s, e, n = shape.bounds
    bl = [s, w]
    br = [s, e]
    tl = [n, w]
    tr = [n, e]
    cd = list(shape.centroid.coords[0])[::-1]
    locs = [bl, tr, br, tl, cd]

    asyncio.set_event_loop(loop)

    futures = [loop_future(loop, get_geocode, loc) for loc in locs]
    for response in await asyncio.gather(*futures):
        # logging.info(f'Response: {response}')
        pass

    return [future.result() for future in futures]


def check_equivence(item1, item2):
    """Check to see if the two items are equal and neither is equal to None"""
    if item1 is None or item2 is None:
        return None
    else:
        return item1 == item2


def get_clip_vertex_list(geojson):
    """
    Take a geojson object and return a list of geometry vertices that ee can use as an argument to get thumbs
    """
    tmp_poly = []
    s = GeometryCollection([shape(feature["geometry"]).buffer(0) for feature in geojson.get('features')])
    simple = s[0].simplify(tolerance=0.01, preserve_topology=True)
    try:
        for x, y in zip(simple.exterior.coords.xy[0], simple.exterior.coords.xy[1]):
            tmp_poly.append([x, y])
    except:
        for x, y in zip(simple[0].exterior.coords.xy[0], simple[0].exterior.coords.xy[1]):
            tmp_poly.append([x, y])
    return tmp_poly


def human_format(num):
    magnitude = 0
    while abs(num) >= 1000:
        magnitude += 1
        num /= 1000.0
    # add more suffixes if you need them
    return '%.2f%s' % (num, ['', 'k', 'M', 'G', 'T', 'P'][magnitude])


def get_region(geom):
    """Take a valid geojson object, iterate over all features in that object.
        Build up a list of EE Polygons, and finally return an EE Feature
        collection. New as of 19th Sep 2017 (needed to fix a bug where the old
        function ignored multipolys)
    """
    polygons = []
    for feature in geom.get('features'):
        shape_type = feature.get('geometry').get('type')
        coordinates = feature.get('geometry').get('coordinates')
        if shape_type == 'MultiPolygon':
            polygons.append(ee.Geometry.MultiPolygon(coordinates))
        elif shape_type == 'Polygon':
            polygons.append(ee.Geometry.Polygon(coordinates))
        else:
            pass
    return ee.FeatureCollection(polygons)


def squaremeters_to_ha(value):
    """Converts square meters to hectares, and gives val to 2 decimal places"""
    tmp = value / 10000.
    return float('{0:4.2f}'.format(tmp))


def get_thresh_image(thresh, asset_id):
    """Renames image bands using supplied threshold and returns image."""
    image = ee.Image(asset_id)

    # Select out the gain band if it exists
    if 'gain' in asset_id:
        before = image.select('.*_' + thresh, 'gain').bandNames()
    else:
        before = image.select('.*_' + thresh).bandNames()

    after = before.map(
        lambda x: ee.String(x).replace('_.*', ''))

    image = image.select(before, after)
    return image


def dict_unit_transform(data, num):
    dasy = {}
    for key in data:
        dasy[key] = data[key] * num

    return dasy


def indicator_selector(row, indicator, begin, end):
    """Return Tons of biomass loss."""
    dasy = {}
    if indicator == 4:
        return row[2]['value']

    for i in range(len(row)):
        if row[i]['indicator_id'] == indicator and row[i]['year'] >= int(begin) and row[i]['year'] <= int(end):
            dasy[str(row[i]['year'])] = row[i]['value']

    return dasy


def dates_selector(data, begin, end):
    """Return Tons of biomass loss."""
    dasy = {}
    for key in data:
        if int(key) >= int(begin) and int(key) <= int(end):
            dasy[key] = data[key]

    return dasy


def sum_range(data, begin, end):
    return sum([data[key] for key in data if (int(key) >= int(begin)) and (int(key) < int(end))])


def admin_0_simplify(iso):
    """Check admin areas and return a relevant simplification or None"""
    # logging.info(f'[admin_0_simplify]: passed {iso}')
    admin_0_dic = {'ATA': 0.3,
                   'RUS': 0.3,
                   'CAN': 0.3,
                   'GRL': 0.3,
                   'USA': 0.3,
                   'CHN': 0.3,
                   'AUS': 0.1,
                   'BRA': 0.1,
                   'KAZ': 0.1,
                   'ARG': 0.1,
                   'IND': 0.1,
                   'MNG': 0.1,
                   'DZA': 0.1,
                   'MEX': 0.1,
                   'COD': 0.1,
                   'SAU': 0.1,
                   'IRN': 0.1,
                   'SWE': 0.1,
                   'LBY': 0.1,
                   'SDN': 0.1,
                   'IDN': 0.1,
                   'FIN': 0.01,
                   'NOR': 0.01,
                   'SJM': 0.01,
                   'ZAF': 0.01,
                   'UKR': 0.01,
                   'MLI': 0.01,
                   'TCD': 0.01,
                   'PER': 0.01,
                   'AGO': 0.01,
                   'NER': 0.01,
                   'CHL': 0.01,
                   'TUR': 0.01,
                   'EGY': 0.01,
                   'MRT': 0.01,
                   'BOL': 0.01,
                   'PAK': 0.01,
                   'ETH': 0.01,
                   'FRA': 0.01,
                   'COL': 0.01}
    simplification = admin_0_dic.get(iso, None)
    return simplification


def admin_1_simplify(iso, admin1):
    # logging.info(f'[admin_1_simplify]: passed {iso}/{admin1}')
    admin_1_dic = {'RUS': {60: 0.3,
                           35: 0.3,
                           12: 0.1,
                           80: 0.1,
                           18: 0.1,
                           28: 0.1,
                           30: 0.1,
                           4: 0.1,
                           40: 0.1,
                           32: 0.1,
                           24: 0.1,
                           83: 0.1,
                           3: 0.01,
                           69: 0.01,
                           9: 0.01,
                           46: 0.01,
                           26: 0.01,
                           45: 0.01,
                           66: 0.01,
                           55: 0.01,
                           50: 0.01},
                   'CAN': {8: 0.3,
                           6: 0.3,
                           11: 0.3,
                           9: 0.1,
                           2: 0.1,
                           1: 0.1,
                           3: 0.1,
                           12: 0.1,
                           13: 0.1,
                           5: 0.1},
                   'GRL': {2: 0.3, 3: 0.3, 5: 0.1},
                   'USA': {2: 0.3,
                           44: 0.1,
                           27: 0.01,
                           5: 0.01,
                           32: 0.01,
                           29: 0.01,
                           3: 0.01,
                           23: 0.01,
                           38: 0.01,
                           6: 0.01,
                           51: 0.01,
                           24: 0.01,
                           13: 0.01},
                   'AUS': {11: 0.3, 7: 0.3, 6: 0.1, 8: 0.1, 5: 0.1},
                   'CHN': {28: 0.3,
                           19: 0.1,
                           29: 0.1,
                           21: 0.1,
                           11: 0.1,
                           26: 0.01,
                           5: 0.01,
                           30: 0.01},
                   'BRA': {4: 0.1,
                           14: 0.1,
                           12: 0.1,
                           13: 0.01,
                           5: 0.01,
                           11: 0.01,
                           9: 0.01,
                           10: 0.01,
                           21: 0.01},
                   'NER': {1: 0.1},
                   'DZA': {41: 0.01, 1: 0.01, 22: 0.01},
                   'KAZ': {9: 0.01, 3: 0.01, 5: 0.01, 11: 0.01, 10: 0.01, 1: 0.01},
                   'SAU': {8: 0.01, 7: 0.01},
                   'MLI': {9: 0.01},
                   'LBY': {6: 0.01},
                   'EGY': {14: 0.01},
                   'ZAF': {8: 0.01},
                   'PAK': {2: 0.01},
                   'SDN': {10: 0.01, 8: 0.01},
                   'IND': {29: 0.01, 19: 0.01, 20: 0.01},
                   'ARG': {1: 0.01, 20: 0.01, 4: 0.01},
                   'PER': {17: 0.01},
                   'BOL': {8: 0.01},
                   'ETH': {8: 0.01, 9: 0.01},
                   'IDN': {23: 0.01},
                   'SJM': {2: 0.01}}
    try:
        admin1 = int(admin1)
    except:
        admin1 = -999
    simplification = None
    if admin_1_dic.get(iso):
        simplification = admin_1_dic.get(iso, None).get(admin1, None)
        logging.info(f'[admin_1_simplify]: {simplification}')
    return simplification