resource-watch/aqueduct-analysis-microservice

View on GitHub
aqueduct/services/cba_service.py

Summary

Maintainability
D
2 days
Test Coverage
F
11%
import datetime
import logging
import os
import sys, traceback


import numpy as np
import pandas as pd
import sqlalchemy
from flask import json
from scipy.interpolate import interp1d
from sqlalchemy import Column, Integer, Text, DateTime
from sqlalchemy.dialects.postgresql import JSON

from aqueduct.errors import Error


class CBAService(object):
    def __init__(self, user_selections):
        ### DBConexion
        self.engine = sqlalchemy.create_engine(os.getenv('POSTGRES_URL'))
        self.metadata = sqlalchemy.MetaData(bind=self.engine)
        self.metadata.reflect(self.engine)
        ### BACKGROUND INTO
        # self.flood = "Riverine"
        self.exposures = ["gdpexp", "popexp", "urban_damage_v2"]
        self.geogunit = "geogunit_108"
        self.scenarios = {"business as usual": ['rcp8p5', 'ssp2', "bau"],
                          "pessimistic": ['rcp8p5', 'ssp3', "pes"],
                          "optimistic": ['rcp4p5', 'ssp2', "opt"],
                          "rcp8p5": ['rcp8p5', 'ssp3', "pes"],
                          "rcp4p5": ['rcp8p5', 'ssp2', "bau"]}
        self.sub_abb = "nosub"
        self.mods = ["gf", "ha", "ip", "mi", "nr"]
        self.years = [2010., 2030., 2050., 2080.]
        self.ys = [str(x)[0:4] for x in self.years]
        self.rps = [2, 5, 10, 25, 50, 100, 250, 500, 1000]
        self.rps_names = ["rp" + str(x).zfill(5) for x in self.rps]
        self.cba_types = ['pop_costs', "gdp_costs", "urb_benefits", "pop_benefits", "gdp_benefits", "prot_present",
                          "prot_future"]
        self.inAGGFormat = 'raw_agg_riverine_{:s}_{:s}'.format
        self.inRAWFormat = 'raw_riverine_{:s}_{:s}'.format
        ###  USER INPUTS
        self.geogunit_unique_name = user_selections.get("geogunit_unique_name")
        self.existing_prot = user_selections.get("existing_prot")
        self.scenario = user_selections.get("scenario")
        self.prot_futu = user_selections.get("prot_fut")
        self.implementation_start = user_selections.get("implementation_start")
        self.implementation_end = user_selections.get("implementation_end")
        self.infrastructure_life = user_selections.get("infrastructure_life")
        self.benefits_start = user_selections.get("benefits_start")
        self.ref_year = user_selections.get("ref_year")
        self.estimated_costs = user_selections.get("estimated_costs")
        self.discount_rate = user_selections.get("discount_rate")
        self.om_costs = user_selections.get("om_costs")
        self.user_urb_cost = user_selections.get("user_urb_cost")
        self.user_rur_cost = user_selections.get("user_rur_cost")
        self.cost_option = "geogunit_108"

        # DEFAULT VARIABLES
        self.geogunit_name, self.geogunit_type, self.fids, self.clim, self.socio, self.scen_abb, self.prot_pres, rpend, \
        self.build_start_end, self.year_range, self.benefit_increase, self.prot_idx_fut, self.risk_analysis, self.df_prot, self.prot_fut = self.user_selections()
        logging.info(self.prot_fut)
        # Define the time series of the analysis
        self.time_series = np.arange(self.year_range[0], self.year_range[1] + 1)
        self.year_array = np.arange(len(self.time_series)) + 1.
        self.costFormat = 'lookup_cost_urban_{:s}_{:s}_{:s}'.format
        # READ IN COST DATA
        self.df_urb_all = self.costFormat(self.scen_abb, str(self.ref_year), self.cost_option)
        # Raw data aggregated to unit type
        self.df_urb_agg = self.inAGGFormat(self.geogunit_type.lower(), "urban_damage_v2")
        # Raw
        self.df_pop = self.inRAWFormat(self.geogunit, "popexp")
        self.df_gdp = self.inRAWFormat(self.geogunit, "gdpexp")
        self.df_urb = self.inRAWFormat(self.geogunit, "urban_damage_v2")
        self.geogunit = "geogunit_103" if self.geogunit_type.lower() == "city" else "geogunit_108"
        self.filt_risk = pd.read_sql_query(
            "SELECT * FROM Precalc_Riverine_{0}_nosub where id in ({1})".format(self.geogunit,
                                                                                ', '.join(map(str, self.fids))),
            self.engine)
        self.estimated_costs = None

    ##---------------------------------------------------
    ### FUNCTIONS FOR BOTH RISK AND CBA TABS          ###
    ##---------------------------------------------------
    def user_selections(self):
        """
        Purpose: Gather all neccesary inputs to run any analysis

        Output:
            geogunit_name: original (ie non-unique) name
            geogunit_type: City, State, Country, Basin
            clim: rcp4p5, rcp8p4 (climate scenario associated with overall scenario)
            socio: base, ssp2, ssp3 (socioeconomic scenario associated with overall scenario)
            prot_pres: default protection standard for unit as a whole
            df_precalc: precalculated impact data
            risk_analysis - can we use precalculated risk data, or do we need to calculate on-the-fly?
        """
        #logging.debug('[CBA, user_selections]: start')
        # GEOGUNIT INFO

        fids, geogunit_name, geogunit_type = pd.read_sql_query(
            "SELECT fids, name, type FROM lookup_master where uniqueName = '{0}' ".format(self.geogunit_unique_name),
            self.engine).values[0]
        logging.info(geogunit_name)
        logging.info(self.geogunit_unique_name)
        # IMPACT DRIVER INFO (climate and socioeconomc scenarios)
        clim, socio, scen_abb = self.scenarios.get(self.scenario)

        read_prot = 'precalc_agg_riverine_{0}_nosub'.format(geogunit_type).lower()

        df_prot = pd.read_sql_query("SELECT id, {0} FROM {1}".format(
            ', '.join([col for col in sqlalchemy.Table(read_prot, self.metadata).columns.keys() if ("prot" in col)]),
            read_prot), self.engine, index_col='id')

        # PROTECTION STANDARDS and RISK ANALYSIS TYPE
        if self.existing_prot == None:
            risk_analysis = "precalc"
            # Hardwire in the protection standards for the Netherlands
            if geogunit_name in ['Noord-Brabant, Netherlands', 'Zeeland, Netherlands', 'Zeeuwse meren, Netherlands',
                                 'Zuid-Holland, Netherlands', 'Drenthe, Netherlands', 'Flevoland, Netherlands',
                                 'Friesland, Netherlands', 'Gelderland, Netherlands', 'Groningen, Netherlands',
                                 'IJsselmeer, Netherlands', 'Limburg, Netherlands', 'Noord-Holland, Netherlands',
                                 'Overijssel, Netherlands', 'Utrecht, Netherlands', "Netherlands"]:
                prot_pres = 1000
                logging.info('!!!!!!!!!!!!!!!!!')
            else:
                # Average prot standard for a whole unit (i.e. country)
                prot_name = "_".join(["Urban_Damage_v2", '2010', scen_abb, "PROT_avg"]).lower()
                prot_pres = df_prot.loc[geogunit_name, prot_name]
        else:
            risk_analysis = "calc"
            prot_pres = self.existing_prot

        if self.prot_futu == None:
            logging.info(prot_pres)
            prot_fut = min([x for x in self.rps if x >= prot_pres])
        else:
            prot_fut = self.prot_futu

        # prot_start_unit = min(rps, key=lambda x:abs(x-prot_pres))
        build_start_end = (self.implementation_start, self.implementation_end)
        year_range = (self.implementation_start, self.implementation_start + self.infrastructure_life)
        benefit_increase = (self.benefits_start, self.implementation_end)

        prot_idx_fut = self.years.index(self.ref_year)

        # Define the desired protection standard
        rpend = "endrp" + str(prot_fut).zfill(5)
        return geogunit_name, geogunit_type, fids, clim, socio, scen_abb, prot_pres, rpend, \
               build_start_end, year_range, benefit_increase, prot_idx_fut, risk_analysis, df_prot, prot_fut

    def run_stats(self, dataframe):
        """
        select col_min, col_max, col_avg from table
        """
        #logging.debug('[CBA, run_stats]: start')
        df_stats = pd.DataFrame(index=dataframe.index)
        for t in self.cba_types:
            df_filt = dataframe[[col for col in dataframe.columns if (t in col.lower())]]
            df_stats[t + "_avg"] = df_filt.mean(axis=1)
            df_stats[t + "_min"] = df_filt.min(axis=1)
            df_stats[t + "_max"] = df_filt.max(axis=1)
        return df_stats

    def compute_costs(self, m, input_total_cost, exposure):

        """
           Output:
            time series of costs without any discount rate included
        """
        #logging.debug('[CBA, compute_costs]: start')
        time_series = np.arange(self.year_range[0], self.year_range[1] + 1)  # list of years until horizon
        build_years = np.arange(
            self.build_start_end[1] - self.build_start_end[0]) + 1.  # list of build years (starting at 1)
        cum_costs_present = np.cumsum(float(input_total_cost) * np.ones(len(build_years)) / len(
            build_years))  # cumulative building costs until end of building period
        maintenance = cum_costs_present * self.om_costs  # maintenance costs
        costs_pa_present = np.diff(np.append(0.,
                                             cum_costs_present)) + maintenance  # compute annual costs without discount rate until end of building
        costs_horizon_present = np.zeros(len(time_series))
        costs_horizon_present[self.build_start_end[0] - self.year_range[0]: self.build_start_end[1] - self.year_range[
            0]] = costs_pa_present  # insert costs until end of build
        costs_horizon_present[self.build_start_end[1] - self.year_range[0]:] = maintenance[
            -1]  # add maintenance until horizon
        #     costs_horizon_present = np.append(costs_pa_present, np.ones(year_range[-1]-build_start_end[-1] + 1)*maintenance[-1])  # add maintenance until horizon
        c = costs_horizon_present / ((1 + self.discount_rate) ** (self.year_array))
        df_results = pd.DataFrame(index=self.time_series, columns=[m + "_" + exposure + "_Costs"], data=c)
        # logging.debug(m ," compute cost done:", time.time() - stime1)
        return df_results

    def compute_benefits(self, model, annual_risk_pres, annual_risk_fut, annual_pop_pres, annual_pop_fut,
                         annual_gdp_pres, annual_gdp_fut, annual_prot_pres, annual_prot_fut):
        #logging.debug('[CBA, compute_benefits]: start')
        diff_urb = np.where(annual_risk_pres - annual_risk_fut < 0, 0, annual_risk_pres - annual_risk_fut)  # difference is the potential yearly benefit
        diff_pop = np.where(annual_pop_pres - annual_pop_fut < 0, 0, annual_pop_pres - annual_pop_fut) # difference is the potential yearly benefit
        diff_gdp = np.where(annual_gdp_pres - annual_gdp_fut < 0, 0, annual_gdp_pres - annual_gdp_fut) # difference is the potential yearly benefit

        relative_benefit = np.maximum(
            np.minimum(self.extrap1d(interp1d(list(self.benefit_increase), [0., 1.]))(self.time_series), 1.), 0.)
        urb_benefits = relative_benefit * diff_urb
        pop_benefits = relative_benefit * diff_pop
        gdp_benefits = relative_benefit * diff_gdp

        # compute discount rate for costs and benefits

        urb_benefits_discounted = urb_benefits / (
                (1 + self.discount_rate) ** (self.year_array))  # add the annual discount rate
        gdp_benefits_discounted = gdp_benefits / ((1 + self.discount_rate) ** (self.year_array))

        d = {model + "_Urb_Benefits": urb_benefits_discounted,
             model + "_Pop_Benefits": pop_benefits, model + "_GDP_Benefits": gdp_benefits_discounted,
             model + "_Prot_Present": annual_prot_pres, model + "_Prot_Future": annual_prot_fut}
        df_results = pd.DataFrame(index=self.time_series, data=d)

        return df_results

    @staticmethod
    def expected_value(values, RPs, RP_zero, RP_infinite):
        """
        Purpose: Annual expected image/damage for given time period
        Input:
            values: Impact per return period
                2D array MxN
                    M: several time periods
                    N: several return periods
            RPs: return periods (equal to length of N)
            RP_zero: return period at which to break the EP-curve to zero (i.e. protection standard)
            RP_infinite: return period close to the infinitely high return period
        Output:
            vector with expected values for each time period
        """
        #logging.debug('[CBA, expected_value]: start')
        # append the return period at which maximum impact occurs, normally this is set to 1e6 years
        RPs = np.append(np.array(RPs), RP_infinite)
        # derive the probabilities associated with return periods
        prob = 1. / RPs
        values = np.array(values)
        # append infinite impact (last value in array) to array. Simply copy the last value.
        values = np.append(values, values[-1])
        # now make a smooth function (function relates prob (on x) to projected future impact (y))
        values_func = interp1d(prob, values)
        # Returns 10,000 evenly spaced probabilities from most likely prob to most extreme
        prob_smooth = np.linspace(prob[0], prob[-1], 10000)
        # Insert these probabilites into "smooth function" to find their related impact
        values_smooth = values_func(prob_smooth)
        # Set all impacts above thres (protection standard) to zero
        values_smooth[prob_smooth > 1. / RP_zero] = 0.
        # compute expected values from return period values:
        # Integrate under curve to find sum of all impact
        exp_val = np.trapz(np.flipud(values_smooth), np.flipud(prob_smooth))
        # print "Values, RP, Exp Value", values,  RP_zero, exp_val,
        return exp_val

    @staticmethod
    def interp_value(x, y, x_i, min_x=-np.Inf, max_x=np.Inf):
        """
        Purpose:  Find impacts associated with given protection standard
        OR        Find probability associated with a given impact

        Allows for extrapolation to find new Y given user-defined X
        Do a linear inter/extrapolation of y(x) to find a value y(x_idx)
        """
        #logging.debug('[CBA, interp_value]: start')
        x = np.atleast_1d(x)
        y = np.atleast_1d(y)
        f = interp1d(x, y, fill_value=(y.min(), y.max()), bounds_error=False)
        return f(x_i)

    @staticmethod
    def extrap1d(interpolator):
        """
        Purpose: Make an extrapolation function
        """
        #logging.debug('[CBA, extrap1d]: start')
        xs = interpolator.x
        ys = interpolator.y

        def pointwise(x):
            # If new prob is smaller than smallest prob in function
            if x < xs[0]:
                return ys[0] + (x - xs[0]) * (ys[1] - ys[0]) / (xs[1] - xs[0])
            # If new prob is larger than largest prob in function
            elif x > xs[-1]:
                return ys[-1] + (x - xs[-1]) * (ys[-1] - ys[-2]) / (xs[-1] - xs[-2])
            # If prob falls within set range of prob in function
            else:
                return interpolator(x)

        def ufunclike(xs):
            return np.fromiter(map(pointwise, np.array(xs)), dtype=np.float)

        return ufunclike

    def compute_rp_change(self, rps, ref_impact, target_impact, rp, min_rp=2, max_rp=1000):
        """
        Purpose: Compute how return period protection changes from one impact
        distribution to another (e.g. present to future)
        Input:
            rps: return periods of impacts
            ref_impact: set of reference impact
            target_impacts: impacts to which protection standard should be mapped
                            (i.e. year the flood protection should be valid in)
            rp, protection standard at reference impacts
        """
        #logging.debug('[CBA, compute_rp_change]: start')
        # interpolate to estimate impacts at protection level 'rp'
        # atleast1_1d = scalar inputs are converted to 1-D arrays. Arrays of higher dimensions are preserved
        p = 1. / np.atleast_1d(rps)
        # Find the target impact given user-defined protection standard (rp) by running the interp_values function
        if target_impact.sum() == 0:
            new_prot = np.array([rp])
        else:
            prot_impact = self.interp_value(rps, ref_impact, rp)
            new_prot = self.interp_value(target_impact, rps, prot_impact)
        # lookup what the protection standard is within the target impact list
        return new_prot

    def find_startrp(self, x):
        #logging.debug('[CBA, find_startrp]: start')
        if pd.isnull(x):
            rpstart = np.nan
        else:
            prot_start_unit = min(self.rps, key=lambda rp: abs(rp - x))
            rpstart = "startrp" + str(prot_start_unit).zfill(5)
        return rpstart

    def find_dimension_v2(self, m, df_lookup, df_cost, user_urb):
        #logging.debug('[CBA, find_dimension_v2]: start')
        uniStartRPList = [x for x in list(set(df_lookup['startrp'].values)) if pd.notnull(x)]
        erp = "endrp" + str(self.prot_fut).zfill(5)
        logging.info(erp)
        logging.info(str(self.prot_fut))
        uniSRPdfList = []
        targetColNameList = []
        for srp in uniStartRPList:
            df_lookup_temp = df_lookup[df_lookup['startrp'] == srp]
            uniSRPdfList.append(df_lookup_temp)
            targetColName = "_".join([self.scenarios.get(self.scenario)[0], m,
                                      self.scenarios.get(self.scenario)[1], str(self.ref_year), srp, erp])
            targetColNameList.append(targetColName)

        df = pd.DataFrame(np.transpose([uniStartRPList, targetColNameList, uniSRPdfList]),
                          columns=['startrp', 'tgtCol', 'df'])
        costList = []

        for itl in np.arange(0, len(df.index), 1):
            logging.info(itl)
            df_itl = df['df'].iloc[itl]
            tgtCol_itl = df['tgtCol'].iloc[itl]
            logging.info(tgtCol_itl)
            # if '00000' in tgtCol_itl:
            #     continue
            cost_itl = pd.read_sql_query("SELECT sum({0}) FROM {1} where id in ({2})".format(tgtCol_itl, df_cost,
                                                                                             ", ".join(map(str, df_itl[
                                                                                                 'FID'].values))),
                                         self.engine).values[0]
            ####-------------------------
            # NEW CODE
            if user_urb == None:

                ppp_itl, con_itl = pd.read_sql_query(
                    "SELECT avg(ppp_mer_rate_2005_index) mean_1, avg(construction_cost_index*7) mean_2 FROM lookup_construction_factors_geogunit_108 where fid_aque in ({0}) ".format(
                        ', '.join(map(str, self.fids))), self.engine).values[0]

                costList.append((cost_itl * con_itl)/ ppp_itl)
            else:
                costList.append((cost_itl)/ppp_itl)
            ####-------------------------
        totalCost = sum(costList)
        return totalCost

    def find_construction(self, m, exposure, user_rur=None, user_urb=None):
        """
        Purpose: Calculate the total cost to construction the desired flood protection
        Inputs:
            m = model
            user_cost = user-defined cost per km per m
        Output:
            cost = total cost of dike
        """
        #logging.debug('[CBA, find_construction]: start')
        lookup_c = pd.read_sql_query(
            "SELECT * FROM lookup_{0} where {1} = '{2}' ".format(self.geogunit, self.geogunit_type, self.geogunit_name),
            self.engine, 'id')
        lookup_c["FID"] = lookup_c.index
        lookup_c["riverine"] = self.prot_pres
        logging.info(self.prot_pres)
        lookup_c["startrp"] = lookup_c["riverine"].apply(lambda x: self.find_startrp(x))
        urb_dimensions = self.find_dimension_v2(m, lookup_c, self.df_urb_all, user_urb)
        # Find the Purchasing Power Parity to Market value rate
        # Find the local cost to construct the dike ($/km/m)
        # If the user did not input a cost, use the local cost and PPP conversion to find total cost. 7 million is a standard factor cost

        if user_urb == None:
            cost_urb = urb_dimensions * 7e6
            cost = cost_urb
        else:
            cost_urb = urb_dimensions * user_urb * 1e6
            cost = cost_urb
        return cost

    def average_prot(self, m, year, risk_data_input):
        #logging.debug('[CBA, average_prot]: start')
        idx = int(year) - self.implementation_start
        #logging.debug(f'[CBA, average_prot, idx]: {idx} ==> {year} {self.implementation_start}')
        clm = "histor" if year == '2010' else self.clim
        sco = "base" if year == '2010' else self.socio
        mdl = "wt" if year == '2010' else m
        test_rps = np.linspace(min(self.rps), max(self.rps), 999)
        try:
            assert (len(risk_data_input) >= idx), f"the infrastructure lifetime ({self.infrastructure_life}) MUST be between {2080 - self.implementation_start} - {2100 - self.implementation_start}"
        except AssertionError as e:
            raise Error(message='computation failed: '+ str(e), status=400)
        real_impact = risk_data_input[int(idx)]
        # READ IN REFERENCE IMPACT
        # READ IN RAW DATA
        if real_impact == 0:
            test = np.nan
        else:
            # PULL ONLY CURRENT DATA
            cols = [col for col in sqlalchemy.Table(self.df_urb_agg, self.metadata).columns.keys() if
                    (clm.lower() in col) and (mdl.lower() in col) and (sco.lower() in col) and (year in col)]
            # impact_present = df_urb_agg.loc[geogunit_name, [col for col in df_urb_agg.columns if (clm in col) and (mdl in col) and (sco in col) and (year in col)]]
            impact_present = pd.read_sql_query(
                "SELECT {0} FROM {1} where id ='{2}'".format(', '.join(cols), self.df_urb_agg, self.geogunit_name),
                self.engine).iloc[0]
            check = 1e25
            for test in test_rps:
                test_impact = self.expected_value(impact_present, self.rps, test, 1e5)
                diff = abs(real_impact - test_impact)
                if diff > check:
                    break
                check = diff
        return test

    def risk_evolution(self, impact_cc, impact_urb, impact_pop, impact_gdp, prot, prot_idx):
        """
        Creates a time series of how annual expected impact evolves through time, assuming given protection standard at some moment in time.
        The protection standard is transformed into protection standards at other moments in time by lookup of the associated
        impact at that protection standard using a climate change only scenario.
        Input:
            years: list of years [N]
            rps: list of return periods [M] (in years)
            impact_cc: list of lists: N lists containing M impacts (for each return period) with only climate change
            impact_cc_socio: list of lists: N lists containing M impacts (for each return period) with climate and socio change
            prot: protection standard at given moment (in years)
            prot_idx: index of year (in array years) at which prot is valid.
        """
        # determine risk evaolution
        risk_prot, pop_impact, gdp_impact, prot_levels = [], [], [], []
        #logging.debug('[CBA, risk_evolution]: start')
        for year, imp_cc, imp_urb, imp_pop, imp_gdp in zip(self.years, impact_cc, impact_urb, impact_pop, impact_gdp):
            prot_trans = self.compute_rp_change(self.rps, impact_cc[prot_idx], imp_cc, prot, min_rp=2,
                                                max_rp=1000)  # i.e. RP_zero
            # compute the expected value risk with the given protection standard
            risk_prot.append(self.expected_value(imp_urb, self.rps, prot_trans, 1e5))
            pop_impact.append(self.expected_value(imp_pop, self.rps, prot_trans, 1e5))
            gdp_impact.append(self.expected_value(imp_gdp, self.rps, prot_trans, 1e5))
            # prot_levels.append(prot_trans[0])
        # Interpolate annual expected risk to get estimate for every year in time series
        # print prot_levels
        risk_func = interp1d(self.years, risk_prot, kind='linear', bounds_error=False,
                             fill_value='extrapolate')  # define interpolation function/relationship
        pop_func = interp1d(self.years, pop_impact, kind='linear', bounds_error=False,
                            fill_value='extrapolate')  # define interpolation function/relationship
        gdp_func = interp1d(self.years, gdp_impact, kind='linear', bounds_error=False,
                            fill_value='extrapolate')  # define interpolation function/relationship
        # prot_func = extrap1d(interp1d(years, prot_levels))

        annual_risk = risk_func(self.time_series)  # Run timeseries through interpolation function
        annual_pop = pop_func(self.time_series)  # Run timeseries through interpolation function
        annual_gdp = gdp_func(self.time_series)  # Run timeseries through interpolation function
        # annual_prot = prot_func(time_series)
        return annual_risk, annual_pop, annual_gdp  # , annual_prot

    def calc_impact(self, m, pt, ptid):
        """this can be improved with threads and is where the leak happens, a more ammount of fids, the runtime increases"""
        annual_risk, annual_pop, annual_gdp = 0, 0, 0
        #logging.debug('[CBA, calc_impact]: start')
        # cba_raw = pd.read_sql_query("SELECT {0} FROM {1} where id = {2} ".format(', '.join(columns), inData, inName), self.engine)
        # impact_present = pd.read_sql_query("SELECT {0} FROM {1} where id = {2} ".format(', '.join(cols), inData, inName), self.engine).values[0]
        df_urb = pd.read_sql_query(
            "SELECT * FROM {0} where id in ({1}) ".format(self.df_urb, ', '.join(map(str, self.fids))), self.engine)
        df_pop = pd.read_sql_query("SELECT * FROM {1} where id in ({2}) ".format(', '.join(
            [col for col in sqlalchemy.Table(self.df_pop, self.metadata).columns.keys() if
             (self.clim in col) and (self.socio in col) and (m in col)]), self.df_pop, ', '.join(map(str, self.fids))),
                                   self.engine)
        df_gdp = pd.read_sql_query("SELECT * FROM {1} where id in ({2}) ".format(', '.join(
            [col for col in sqlalchemy.Table(self.df_gdp, self.metadata).columns.keys() if
             (self.clim in col) and (self.socio in col) and (m in col)]), self.df_gdp, ', '.join(map(str, self.fids))),
                                   self.engine)
        # Present data = 2010 data
        # impact_present = pd.read_sql_query("SELECT {0} FROM {1} where id = {2} ".format(', '.join(cols), inData, inName), self.engine).values[0]
        for f in self.fids:
            impact_cc = self.select_impact(m, df_urb, f, "base")
            impact_urb = self.select_impact(m, df_urb, f, self.socio)
            impact_pop = self.select_impact(m, df_pop, f, self.socio)
            impact_gdp = self.select_impact(m, df_gdp, f, self.socio)
            f_risk, f_pop, f_gdp = self.risk_evolution(impact_cc, impact_urb, impact_pop, impact_gdp, pt, ptid)
            annual_risk = annual_risk + f_risk
            annual_pop = annual_pop + f_pop
            annual_gdp = annual_gdp + f_gdp
        return annual_risk, annual_pop, annual_gdp

    def precalc_present_benefits(self, model):
        """
        Inputs:
            prot_start_unit = present_day protection standard (assumed to beong to 0th year in list of years)
        Output:
            time series of costs and benefits with discount rate included

        """
        # Find total costs without discount(over timeseries)
        # DEFAULT DATA
        #logging.debug('[CBA, precalc_present_benefits]: start')
        urb_imp = self.filt_risk[
            [col for col in self.filt_risk.columns if ("urban_damage" in col) and (self.scen_abb.lower() in col)
             and (model.lower() in col) and ("tot" in col)]].sum(axis=0)
        pop_imp = self.filt_risk[
            [col for col in self.filt_risk.columns if ("popexp" in col) and (self.scen_abb.lower() in col)
             and (model.lower() in col) and ("tot" in col)]].sum(axis=0)
        gdp_imp = self.filt_risk[
            [col for col in self.filt_risk.columns if ("gdpexp" in col) and (self.scen_abb.lower() in col)
             and (model.lower() in col) and ("tot" in col)]].sum(axis=0)
        prot_imp = self.df_prot.loc[self.geogunit_name, [col for col in self.df_prot.columns if
                                                        ("urban_damage" in col) and (self.scen_abb.lower() in col)
                                                        and ("prot_avg" in col)]]

        risk_func = interp1d(self.years, urb_imp, kind='linear', bounds_error=False,
                             fill_value='extrapolate')  # define interpolation function/relationship
        pop_func = interp1d(self.years, pop_imp, kind='linear', bounds_error=False,
                            fill_value='extrapolate')  # define interpolation function/relationship
        gdp_func = interp1d(self.years, gdp_imp, kind='linear', bounds_error=False,
                            fill_value='extrapolate')  # define interpolation function/relationship
        prot_func = interp1d(self.years, prot_imp, kind='linear', bounds_error=False,
                             fill_value='extrapolate')  # define interpolation function/relationship

        annual_risk = risk_func(self.time_series)  # Run timeseries through interpolation function
        annual_pop = pop_func(self.time_series)  # Run timeseries through interpolation function
        annual_gdp = gdp_func(self.time_series)  # Run timeseries through interpolation function
        annual_prot = prot_func(self.time_series)  # Run timeseries through interpolation function

        return annual_risk, annual_pop, annual_gdp, annual_prot

    def select_impact(self, m, inData, inName, socioecon):
        """
        Purpose: Pull raw data needed to perfrom CBA
        Inputs:
            m = climate model
            inData = dataset to filter through
            socioecono = socioeconomic data
        Output:
            List of dataframes for each year with impact estimates. Impact data for climate change only scenario and total change

        Time is being killed here on big selections
        """
        #logging.debug('[CBA, select_impact]: start')
        cba_raw = inData.set_index('id').loc[inName]
        # Present data = 2010 data
        impact_present = cba_raw.filter(like="_2010_", axis=0).values
        # For all years, pull the climate change only data (base signifies no socioeconomic pathway used)
        i_2030 = cba_raw.filter(like='2030', axis=0).filter(like=self.clim, axis=0).filter(like=socioecon,
                                                                                           axis=0).filter(like=m,
                                                                                                          axis=0).values
        i_2050 = cba_raw.filter(like='2050', axis=0).filter(like=self.clim, axis=0).filter(like=socioecon,
                                                                                           axis=0).filter(like=m,
                                                                                                          axis=0).values
        i_2080 = cba_raw.filter(like='2080', axis=0).filter(like=self.clim, axis=0).filter(like=socioecon,
                                                                                           axis=0).filter(like=m,
                                                                                                          axis=0).values
        impact = [impact_present, i_2030, i_2050, i_2080]

        return impact

    # @cached_property
    def analyze(self):
        # allStartTime = time.time()
        ##--------------------------------------------------------
        ###              ANALYSIS          ###
        ##--------------------------------------------------------
        logging.debug( "Analysis starting...")
        # IMPACT DATA BY MODEL

        # model_benefits = pd.DataFrame(index=time_series)
        try:
            model_benefits = pd.DataFrame(data=self.time_series, columns=['year']).set_index('year')
            # IMPACT DATA BY MODEL

            for m in self.mods:
                # start_time = time.time()
                logging.debug( "------------------   Model %s starting...  ---------------" %m)

                if self.risk_analysis == "precalc":
                    logging.debug( "------------------   precalc  ---------------" )
                    annual_risk_pres, annual_pop_pres, annual_gdp_pres, annual_prot_pres = self.precalc_present_benefits(
                        m)

                else:
                    logging.debug( "------------------   Calc  ---------------")
                    annual_risk_pres, annual_pop_pres, annual_gdp_pres = self.calc_impact(m, self.prot_pres, 0)
                    prot_pres_list = []
                    for y in self.ys:
                        prot_pres_list.append(self.average_prot(m, y, annual_risk_pres))
                    prot_func_pres = self.extrap1d(interp1d(self.years, prot_pres_list))
                    annual_prot_pres = prot_func_pres(self.time_series)  # Run timeseries through interpolation function
                # logging.debug( m, "present done", time.time() - start_time)

                # start_time = time.time()
                # sTimetl = time.time()
                logging.debug( "------------------   CALC2  ---------------" )
                annual_risk_fut, annual_pop_fut, annual_gdp_fut = self.calc_impact(m, self.prot_futu, self.prot_idx_fut)
                logging.debug( "------------------   CALC3  ---------------" )
                logging.debug(f'[CBA, {m}]')
                logging.debug(f'[CBA, {self.ys}]')
                logging.debug(f'[CBA, {annual_risk_fut}]')
                prot_fut_list = [self.average_prot(m, y, annual_risk_fut) for y in self.ys]
                logging.debug( "------------------   CALC4  ---------------" )
                prot_func_fut = self.extrap1d(interp1d(self.years, prot_fut_list))
                logging.debug( "------------------   CALC5  ---------------" )
                annual_prot_fut = prot_func_fut(self.time_series)  # Run timeseries through interpolation function

                # logging.debug( m, "future  done", time.time() - start_time)

                # start_time = time.time()
                df = self.compute_benefits(m, annual_risk_pres, annual_risk_fut, annual_pop_pres, annual_pop_fut,
                                           annual_gdp_pres, annual_gdp_fut, annual_prot_pres, annual_prot_fut)
                model_benefits = model_benefits.join(df)
                logging.debug( "------------------   CALC6  ---------------" )
                # logging.debug( m, "benefits done", time.time()-start_time )

                # start_time = time.time()

                pop_costs = self.find_construction(m, "POPexp", self.user_rur_cost, self.user_urb_cost)
                gdp_costs = self.find_construction(m, "Urban_Damage_v2", self.user_rur_cost, self.user_urb_cost)
                logging.debug( "------------------   CALC7  ---------------" )
                df_pc = self.compute_costs(m, pop_costs, "POP")
                model_benefits = model_benefits.join(df_pc)
                df_gc = self.compute_costs(m, gdp_costs, "GDP")
                model_benefits = model_benefits.join(df_gc)
                #logging.debug(m, "costs done", time.time()-start_time)
                #logging.debug("Model %s done..." % m)

            # start_time = time.time()
            df_final = self.run_stats(model_benefits)
            # check benefits or cost is 0
            if df_final.urb_benefits_avg.sum() == 0:
                df_final[[x for x in df_final.columns if "costs" in x]] = 0
            elif df_final.gdp_costs_avg.sum() == 0:
                df_final[[x for x in df_final.columns if "Benefits" in x]] = 0


            ### DETAILS
            details = {"geogunitName": self.geogunit_name,
                       "geogunitType": self.geogunit_type,
                       "scenario": self.scenario,
                       "averageProtection": self.prot_pres,
                       "startingProtection": min(self.rps, key=lambda rp: abs(rp - self.prot_pres)),
                       "futureProtection": self.prot_futu,
                       "referenceYear": self.ref_year,
                       "implementionStart": self.implementation_start,
                       "implementionEnd": self.implementation_end,
                       "infrastructureLifespan": self.infrastructure_life,
                       "estimatedCosts": self.estimated_costs,
                       "benefitsStart": self.benefits_start,
                       "discount": self.discount_rate,
                       "om": self.om_costs,
                       "gdpCosts": gdp_costs.tolist()}
            # df_final = model_benefits
            # print( "All done! Total computation time:", time.time() - allStartTime)
            return {
            "meta": details,
            "df": df_final
        }
        except Error as e:
            logging.error('[CBA analyze]: ' + str(e))
            raise e
        except Exception as e:
            logging.error('[CBA analyze]: ' + str(e))
            raise Error(message='computation failed: '+ str(e))
        finally:
            self.engine.dispose()


class CBAICache(object):
    """
    this will have the next methods:
        * create cache table (only if the table doesn't exist)
        * check if a certain set of parameters exists on the table, if exists it will retrive cbaService data from the row selected
        * if not it will trigger the CBAService class to calculate it.
    """

    ### DBConexion
    def __init__(self, params):
        self.engine = sqlalchemy.create_engine(os.getenv('POSTGRES_URL'))
        self.metadata = sqlalchemy.MetaData(bind=self.engine, reflect=True)
        # self.metadata.reflect(self.engine)
        self.params = params

    @property
    def _generateKey(self):
        return '_'.join([str(value) for (key, value) in sorted(self.params.items())])

    def _createTable(self):
        """
        where key is a composition of the user selected params:
        "geogunit_unique_name"_"existing_prot"_"scenario"_"prot_fut"_"implementation_start_"implementation_end"_"infrastructure_life"_"benefits_start"_"ref_year"_"estimated_costs"_"discount_rate"_"om_costs"_"user_urb_cost"_"user_rur_cost"
        """
        try:
            myCache = sqlalchemy.Table("cache_cba", self.metadata,
                                       Column('id', Integer, primary_key=True, unique=True),
                                       Column('key', Text, unique=True, index=True),
                                       Column('value', JSON),
                                       Column('last_updated', DateTime, default=datetime.datetime.now,
                                              onupdate=datetime.datetime.now)
                                       )
            myCache.create()
        except Exception as e:
            logging.error('[CBAICache, _createTable]: ' + str(e))
            raise Error(message='cache table creation failed'+ str(e))
        return myCache

    def checkParams(self):
        try:
            table = self.metadata.tables['cache_cba']

            logging.info('[CBAICache, checkParams]: checking params...')
            # logging.info(self._generateKey)
            select_st = table.select().where(table.c.key == self._generateKey)
            res = self.engine.connect().execute(select_st).fetchone()
            logging.info(f'[CBAICache, checkParams - result]: {res}')
            return res
        except Exception as e:
            logging.error('[CBAICache, checkParams]: ' + str(e))
            raise Error(message='Checked params failed' + str(e))

    def insertRecord(self, key, data):
        # insert data via insert() construct
        try:
            table = self.metadata.tables['cache_cba']
            ins = table.insert().values(
                key=key,
                value=data)
            conn = self.engine.connect()
            conn.execute(ins)

            return 200

        except Exception as e:
            logging.error('[CBAICache, insertRecord]: ' + str(e))
            raise Error(message='insert record in cache table failed. \n'+ str(e))

    def updateRecord(self):
        return 0

    def cleanCache(self):
        try:
            table_drop = self.metadata.tables['cache_cba'].delete()
            conn = self.engine.connect()
            conn.execute(table_drop)
            return 200
        except Exception as e:
            logging.error('[CBAICache, cleanCache table]: ' + str(e))
            raise Error(message='Cache table drop has failed. \n'+ str(e))

    def execute(self):
        try:
            inspector = sqlalchemy.inspect(self.engine)
            logging.info('[CBAICache]: Getting cba default...')
            if 'cache_cba' in inspector.get_table_names():
                # It means we have the cache table, we will need to check the params
                checks = self.checkParams()
                if checks != None:
                    logging.info('[CBAICache]: table available; extracting data')
                    data = json.loads(checks[2])
                    logging.info(data.keys())
                    return {'meta': data['meta'], 'df': pd.DataFrame(data['data']).set_index(
                        'year')}  # we will give back the data in a way CBAEndService can use it

                else:  # we will execute the whole process and we will generate the output in a way  CBAEndService can use it
                    logging.info('[CBAICache]: data not available; generating data')
                    data_output = CBAService(self.params).analyze()
                    data = json.dumps(
                        {'meta': data_output['meta'], 'data': data_output['df'].reset_index().to_dict('records')},
                        ignore_nan=True)
                    key = self._generateKey
                    self.insertRecord(key, data)
                    return data_output  # we will give back the data in a way CBAEndService can use it
            else:
                self._createTable()
                self.execute()
                # executes the cba code to get the table, inserts it into the database and we should be ready to go
        except Exception as e:
            logging.error('[CBAICache, execute]: ')
            exc_type, exc_value, exc_traceback = sys.exc_info()
            traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
            logging.error(repr(traceback.format_exception(exc_type, exc_value,
                                          exc_traceback)))
            raise e


class CBAEndService(object):
    def __init__(self, user_selections):
        # self.data = CBAService(user_selections).analyze()
        self.data = CBAICache(user_selections).execute()

    def get_widget(self, argument):
        method_name = 'widget_' + str(argument)
        method = getattr(self, method_name, lambda: "Widget not found")
        return method()

    # @cached_property
    def widget_table(self):
        fOutput = self.data['df'][['urb_benefits_avg', 'gdp_costs_avg', 'pop_benefits_avg', 'gdp_benefits_avg']]
        cumOut = fOutput.sum()

        # npv = None
        avoidedGdp = round(fOutput.loc[self.data['meta']['implementionEnd']:].gdp_benefits_avg.sum())
        avoidedPop = round(fOutput.loc[self.data['meta']['implementionEnd']:].pop_benefits_avg.sum())
        bcr = round((cumOut['gdp_costs_avg'] / cumOut['urb_benefits_avg']), 5   )

        return {'widgetId': 'table', 'chart_type': 'table', 'meta': self.data['meta'],
                'data': [{'bcr': bcr, 'avoidedPop': avoidedPop, 'avoidedGdp': avoidedGdp}]}

    # @cached_property
    def widget_annual_costs(self):
        """Urb_Benefits_avg / GDP_Costs_avg"""
        self.data['meta']["yAxisTitle"] = 'Cost and Benefits ($)'
        return {'widgetId': 'annual_costs', 'chart_type': 'multi-line', 'meta': self.data['meta'], 'data': pd.melt(
            self.data['df'].reset_index()[['year', 'urb_benefits_avg', 'gdp_costs_avg']].rename(index=str, columns={
                "urb_benefits_avg": "Benefits", "gdp_costs_avg": "Costs"}), id_vars=['year'],
            value_vars=['Benefits', 'Costs'], var_name='c', value_name='value').to_dict('records')}

    # @cached_property
    def widget_net_benefits(self):
        """Urb_Benefits_avg / GDP_Costs_avg --> net cummulative costs"""
        self.data['meta']["yAxisTitle"] = 'Cumulative Net Benefits ($)'
        fOutput = self.data['df'][['urb_benefits_avg', 'gdp_costs_avg']].cumsum()
        fOutput['value'] = fOutput['urb_benefits_avg'] - fOutput['gdp_costs_avg']

        return {'widgetId': 'net_benefits', 'chart_type': 'bar', 'meta': self.data['meta'],
                'data': fOutput.reset_index()[['year', 'value']].to_dict('records')}

    # @cached_property
    def widget_impl_cost(self):
        """GDP_Costs_avg"""
        self.data['meta']["yAxisTitle"] = 'Implementation Cost ($)'
        fOutput = self.data['df'].reset_index()[['year','gdp_costs_avg']]
        minY = fOutput['year'].min() - 1
        fOutput['value'] = (fOutput['gdp_costs_avg'] * (1 + self.data['meta']['discount']) ** (
                fOutput['year'] - minY)) / 10.1
        fOutput.loc[fOutput['year'] >= self.data['meta']['implementionEnd'], 'value'] = 0

        return {'widgetId': 'impl_cost', 'chart_type': 'bar', 'meta': self.data['meta'],
                'data': fOutput[['year', 'value']].to_dict('records')}

    # @cached_property
    def widget_mainteinance(self):
        self.data['meta']["yAxisTitle"] = 'Operation & Mainteinance Cost($)'

        fOutput = self.data['df'][['gdp_costs_avg']]

        impE = self.data['meta']['implementionEnd']
        impS = self.data['meta']['implementionStart']
        life = self.data['meta']['infrastructureLifespan']
        ## Review this to make it work
        cost = fOutput.loc[self.data['meta']['implementionEnd']]['gdp_costs_avg'] * ((1 + self.data['meta']['discount']) ** (impE - impS))
        om_costs = self.data['meta']['om']
        build_years = impE - impS
        years = list(range(impS, impS + life +1))
        mp = [1.0 / build_years * cost for i in range(build_years)]
        cum_costs_present = np.cumsum(mp)
        mains = cum_costs_present * om_costs

        maintenance = pd.Series(np.concatenate((mains, [mains[-1]] * (impS + life - impE + 1))), index=years)
        fOutput.insert(1,'costs', maintenance)

        result = fOutput.reset_index()
        result['value'] = result['costs'] / ((1 + self.data['meta']['discount']) ** (result['year'] - impS + 1))

        return {'widgetId': 'mainteinance', 'chart_type': 'bar', 'meta': self.data['meta'],
                'data': result[['year', 'value']].to_dict('records')}

    # @cached_property
    def widget_flood_prot(self):
        self.data['meta']["yAxisTitle"] = 'Protection level (Return period)'

        fOutput = self.data['df'].reset_index()[['year', 'gdp_costs_avg', 'prot_present_avg', 'prot_future_avg']]
        fn = lambda row: row.prot_present_avg if row.year <= self.data['meta'][
            "benefitsStart"] else row.prot_future_avg if row.year >= self.data['meta']["implementionEnd"] else None
        fOutput['value'] = fOutput.apply(fn, axis=1)

        return {'widgetId': 'flood_prot', 'chart_type': 'line', 'meta': self.data['meta'],
                'data': fOutput[['year', 'value']].to_dict('records')}

    # @cached_property
    def widget_export(self):
        return {'widgetId': '', 'meta': self.data['meta'], 'data': self.data['df'].reset_index().to_dict('records')}