resource-watch/aqueduct-analysis-microservice

View on GitHub
aqueduct/services/cba_defaults_service.py

Summary

Maintainability
A
25 mins
Test Coverage
F
20%
import datetime
import logging
import os

import numpy as np
import pandas as pd
import sqlalchemy
from flask import json
from sqlalchemy import Column, Integer, Text, DateTime
from sqlalchemy.dialects.postgresql import JSON

from aqueduct.errors import Error


class CBADef(object):
    def __init__(self, user_selections):
        ### DBConexion
        self.engine = sqlalchemy.create_engine(os.getenv('POSTGRES_URL'))
        self.metadata = sqlalchemy.MetaData(bind=self.engine)
        self.metadata.reflect(self.engine)
        ### BACKGROUND INTO 
        # self.flood = "Riverine"
        self.scenarios = {"business as usual": ['rcp8p5', 'ssp2', "bau"],
                          "pessimistic": ['rcp8p5', 'ssp3', "pes"],
                          "optimistic": ['rcp4p5', 'ssp2', "opt"],
                          "rcp8p5": ['rcp8p5', 'ssp3', "pes"],
                          "rcp4p5": ['rcp8p5', 'ssp2', "bau"]}
        ###  USER INPUTS 
        self.geogunit_unique_name = user_selections.get("geogunit_unique_name")
        self.scenario = self.scenarios.get(user_selections.get("scenario"))
        self.flood = user_selections.get("flood")  # Flood type
        self.sub_scenario = user_selections.get("sub_scenario")

    # @cached_property
    def default(self):
        fids, geogunit_name, geogunit_type = pd.read_sql_query(
            "SELECT fids, name, type FROM lookup_master where uniqueName = '{0}' ".format(self.geogunit_unique_name),
            self.engine).values[0]
        clim, socio, scen_abb = self.scenario
        rps = np.array([2, 5, 10, 25, 50, 100, 250, 500, 1000])
        ##prot
        sub_abb = "wtsub" if self.sub_scenario else "nosub"

        # DEFAULT DATA
        read_prot = 'precalc_agg_{0}_{1}_{2}'.format(self.flood, geogunit_type.lower(), sub_abb)
        col_prot = 'urban_damage_v2_2010_{0}_prot_avg'.format(scen_abb)
        df_prot = pd.read_sql_query(
            "SELECT {0} FROM {1} where id like '{2}'".format(col_prot, read_prot, geogunit_name),
            self.engine)
        prot_val = 0 if df_prot.empty else int(df_prot.values[0].tolist()[0])
        prot_val =1000 if geogunit_name in ['Noord-Brabant, Netherlands', 'Zeeland, Netherlands',
                                                   'Zeeuwse meren, Netherlands', 'Zuid-Holland, Netherlands',
                                                   'Drenthe, Netherlands', 'Flevoland, Netherlands',
                                                   'Friesland, Netherlands', 'Gelderland, Netherlands',
                                                   'Groningen, Netherlands', 'IJsselmeer, Netherlands',
                                                   'Limburg, Netherlands', 'Noord-Holland, Netherlands',
                                                   'Overijssel, Netherlands', 'Utrecht, Netherlands',
                                                   'Netherlands'] else prot_val
        ##costs
        con_itl = pd.read_sql_query(
            "SELECT avg(construction_cost_index*7) FROM lookup_construction_factors_geogunit_108 where fid_aque in ({0}) ".format(
                ', '.join(map(str, fids))), self.engine)
        prot_round = int(rps[np.where(rps >= prot_val)][0])
        logging.debug(f'[CBADef, default]: {df_prot}')
        return [{
            "existing_prot": prot_val,
            "existing_prot_r": prot_round,
            "prot_fut": prot_round,
            "estimated_costs": 0 if con_itl.empty else con_itl.values[0].tolist()[0]

        }]


class CBADefaultService(object):
    """
    this will have the next methods:
        * create cache table (only if the table doesn't exist)
        * check if a certain set of parameters exists on the table, if exists it will retrive cbaService data from the row selected
        * if not it will trigger the CBAService class to calculate it.
    """

    ### DBConexion
    def __init__(self, params):
        logging.info('[CBADCache, __init__]: creating db engine')
        self.engine = sqlalchemy.create_engine(os.getenv('POSTGRES_URL'))
        logging.info('[CBADCache, __init__]: db engine created, loading metadata')
        self.metadata = sqlalchemy.MetaData(bind=self.engine, reflect=True)
        logging.info('[CBADCache, __init__]: db engine metadata loaded')
        # self.metadata.reflect(self.engine)
        self.params = params

    @property
    def _generateKey(self):
        return '_'.join([str(value) for (key, value) in sorted(self.params.items())])

    def _createTable(self):
        """
        where key is a composition of the user selected params: 
        "geogunit_unique_name"_"existing_prot"_"scenario"_"prot_fut"_"implementation_start_"implementation_end"_"infrastructure_life"_"benefits_start"_"ref_year"_"estimated_costs"_"discount_rate"_"om_costs"_"user_urb_cost"_"user_rur_cost"
        """
        try:
            logging.info(f'[]:{datetime.datetime.now}')
            myCache = sqlalchemy.Table("cache_d_cba", self.metadata,
                                       Column('id', Integer, primary_key=True, unique=True),
                                       Column('key', Text, unique=True, index=True),
                                       Column('value', JSON),
                                       Column('last_updated', DateTime, default=datetime.datetime.now,
                                              onupdate=datetime.datetime.now)
                                       )
            myCache.create()
        except Exception as e:
            logging.error('[CBADCache, _createTable]: ' + str(e))
            raise Error(str(e))
        return myCache

    def checkParams(self):
        try:
            table = self.metadata.tables['cache_d_cba']
            logging.info('[CBADCache, checkParams]: check params...')
            # logging.info(self._generateKey)
            select_st = table.select().where(table.c.key == self._generateKey)
            res = self.engine.connect().execute(select_st).fetchone()
            logging.info(res)
            return res
        except Exception as e:
            logging.error('[CBADCache, checkParams]: ' + str(e))
            raise Error(str(e))

    def insertRecord(self, key, data):
        # insert data via insert() construct
        try:
            table = self.metadata.tables['cache_d_cba']
            ins = table.insert().values(
                key=key,
                value=data)
            conn = self.engine.connect()
            conn.execute(ins)

            return 200

        except Exception as e:
            logging.error('[CBADCache, insertRecord]: ' + str(e))
            raise Error(str(e))

    def updateRecord(self):
        """
        TODO:
        """
        return 0

    def cleanCache(self):
        try:
            table_drop = self.metadata.tables['cache_d_cba'].delete()
            conn = self.engine.connect()
            conn.execute(table_drop)
            return 200
        except Exception as e:
            logging.error('[CBAICache, cleanCache table]: ' + str(e))
            raise Error(message='Cache table drop has failed. \n'+ str(e))

    def execute(self):
        try:
            inspector = sqlalchemy.inspect(self.engine)
            logging.info('[CBADCache]: Getting cba default...')
            if 'cache_d_cba' in inspector.get_table_names():
                # It means we have the cache table, we will need to check the params
                logging.info('[CBADCache]: table_exists')
                checks = self.checkParams()
                if checks != None:
                    logging.info('[CBADCache]: table available; extracting data')
                    data = json.loads(checks[2])
                    logging.info(data.keys())
                    return data['data']  # we will give back the data in a way CBAEndService can use it

                else:  # we will execute the whole process and we will generate the output in a way  CBAEndService can use it
                    logging.info('[CBADCache]: data not available; generating data')
                    data_output = CBADef(self.params).default()
                    data = json.dumps({'data': data_output}, ignore_nan=True)
                    key = self._generateKey
                    self.insertRecord(key, data)
                    return data_output  # we will give back the data in a way CBAEndService can use it
            else:
                self._createTable()
                self.execute()
                # executes the cba code to get the table, inserts it into the database and we should be ready to go
        except Exception as e:
            logging.error('[CBADCache, _createTable]: ' + str(e))
            raise Error(str(e))