USDA-ARS-NWRC/awsm

View on GitHub
awsm/models/pysnobal/ipysnobal.py

Summary

Maintainability
B
4 hrs
Test Coverage

from pysnobal.c_snobal import snobal
from datetime import datetime, timedelta
import logging
import numpy as np
from smrf.framework.model_framework import SMRF
from smrf.utils import queue
from pysnobal import ipysnobal
import pandas as pd

import threading
from awsm.models.pysnobal import PysnobalIO
from awsm.interface.ingest_data import StateUpdater


C_TO_K = 273.16
FREEZE = C_TO_K
# Kelvin to Celsius
def K_TO_C(x): return x - FREEZE


def check_range(value, min_val, max_val, descrip):
    """
    Check the range of the value
    Args:
        value:  value to check
        min_val: minimum value
        max_val: maximum value
        descrip: short description of input

    Returns:
        True if within range
    """
    if (value < min_val) or (value > max_val):
        raise ValueError("%s (%f) out of range: %f to %f",
                         descrip, value, min_val, max_val)
    pass


class PySnobal():

    FORCING_VARIABLES = frozenset([
        'thermal',
        'air_temp',
        'vapor_pressure',
        'wind_speed',
        'net_solar',
        'soil_temp',
        'precip',
        'percent_snow',
        'snow_density',
        'precip_temp'
    ])

    def __init__(self, myawsm):
        """PySnobal class to run pysnobal. Will also run SMRF
        in a threaded mode for smrf_ipysnobal

        Args:
            myawsm (AWSM): AWSM class instance
        """

        self._logger = logging.getLogger(__name__)
        self.awsm = myawsm
        self.config = self.awsm.config['ipysnobal']
        self.smrf = None
        self.force = None
        self.smrf_queue = None
        self._logger.debug('Initialized PySnobal')

    @property
    def data_time_step(self):
        return self.time_step_info[0]['time_step']

    @property
    def init_zeros(self):
        return np.zeros_like(self.awsm.topo.dem)

    @property
    def init_ones(self):
        return np.ones_like(self.awsm.topo.dem)

    @property
    def config_constants(self):
        """Read the configuration and set the constants for iPysnobal.
        These are similar to the arguments that are passed to iSnobal.

        Returns:
            dict: constant values for iSnobal
        """

        return {
            'time_step': 60,
            'max-h2o': self.config['max_h2o'],
            'c': True,
            'K': True,
            'mass_threshold': self.config['thresh_normal'],
            'time_z': 0,
            'max_z_s_0': self.config['active_layer'],
            'z_u': self.config['z_u'],
            'z_t': self.config['z_t'],
            'z_g': self.config['z_g'],
            'relative_heights': True,
        }

    def initialize_updater(self):
        """Initialize the StateUpdater for the simulation
        """

        if self.awsm.update_depth:
            self.updater = StateUpdater(self.awsm)
        else:
            self.updater = None

    def initialize_ipysnobal(self):
        """Initialize iPysnobal. Performs the following:

        1. Create a configuration to pass to iPysnobal based on the
        configuration file.
        2. Create the time step info for mass and time thresholds.
        3. Initialize the output record dictionary for storing results
        4. Create the output files
        """

        # parse the input arguments
        # self.options, point_run = initmodel.get_args(self.awsm)
        self.get_args()

        # get the time step info
        self.params, self.time_step_info = ipysnobal.get_tstep_info(
            self.options['constants'], self.options)

        # mass thresholds for run time steps
        self.time_step_info[ipysnobal.NORMAL_TSTEP]['threshold'] = self.config['thresh_normal']  # noqa
        self.time_step_info[ipysnobal.MEDIUM_TSTEP]['threshold'] = self.config['thresh_medium']  # noqa
        self.time_step_info[ipysnobal.SMALL_TSTEP]['threshold'] = self.config['thresh_small']  # noqa

        # get init params
        self.init = self.awsm.model_init.init

        self.output_rec = ipysnobal.initialize(
            self.params, self.time_step_info, self.init)

        # create the output files
        self.pysnobal_io = PysnobalIO(
            self.awsm.config['ipysnobal']['output_file_name'],
            self.options['output']['location'],
            self.awsm
        )
        self.pysnobal_io.create_output_files()

        self.time_since_out = 0.0
        self.start_step = 0  # if restart then it would be higher
        step_time = self.start_step * self.data_time_step

        self.set_current_time(step_time, self.time_since_out)

    def get_args(self):
        """
        Parse the configuration file and returns a dictionary called options.
        Options contains the following keys:

        * z - site elevation (m)
        * t - time steps: data [normal, [,medium [,small]]] (minutes)
        * m - snowcover's maximum h2o content as volume ratio,
        * d - maximum depth for active layer (m),
        * s - snow properties input data file,
        * h - measurement heights input data file,
        * p - precipitation input data file,
        * i - input data file,
        * I - initial conditions
        * o - optional output data file,
        * O - how often output records written (data, normal, all),
        * c - continue run even when no snowcover,
        * K - accept temperatures in degrees K,
        * T - run time steps' thresholds for a layer's mass (kg/m^2)

        To-do: take all the rest of the default and check ranges for the
        input arguments, i.e. rewrite the rest of getargs.c

        """

        # make blank config and fill with corresponding sections
        config = {}

        config['output'] = {
            'frequency': self.awsm.output_freq,
            'location': self.awsm.path_output,
            'nthreads': self.config['ithreads'],
            'output_mode': 'data',
            'out_filename': None
        }

        config['constants'] = self.config_constants

        # ------------------------------------------------------------------------
        # read in the time and ensure a few things
        check_range(self.awsm.time_step, 1.0, 3 * 60, "input data's time step")
        if ((self.awsm.time_step > 60) and (self.awsm.time_step % 60 != 0)):
            raise ValueError("""Data time step > 60 min must be multiple """
                             """of 60 min (whole hours)""")

        # read in the start date and end date
        start_date = self.awsm.start_date

        # create a date time vector
        date_time = list(pd.date_range(
            start_date,
            self.awsm.end_date,
            freq=timedelta(minutes=config['constants']['time_step'])))

        config['time'] = {
            'start_date': start_date,
            'end_date': self.awsm.end_date,
            'time_step': self.awsm.time_step,
            'date_time': date_time
        }
        self.date_time = date_time

        # add to constant sections for time_step_info calculation
        config['constants']['time_step'] = self.awsm.time_step

        config['inputs'] = {
            'point': None,
            'input_type': self.config['init_type'],
            'soil_temp': self.awsm.soil_temp
        }

        self.options = config
        self.point_run = False

    def do_update(self, first_step):
        """If there is an update the the give time step, update the model state

        Returns:
            int: flag if the first step changed
        """

        if self.updater is not None:
            if self.time_step in self.updater.update_dates:
                self.output_rec = \
                    self.updater.do_update_pysnobal(
                        self.output_rec, self.time_step)
                first_step = 1

        return first_step

    def get_smrf_data(self, variable):
        """Get the SMRF data, either from the SMRF module or from the SMRF queue

        Args:
            variable (string): variable to get from SMRF

        Returns:
            ndarray: numpy array of SMRF data
        """

        if not self.smrf.threading:
            data = getattr(self.smrf.distribute[variable['info']['module']],
                           variable['variable'])
        else:
            if variable['variable'] == 'soil_temp':
                data = float(self.awsm.soil_temp) * \
                    np.ones_like(self.awsm.topo.dem)
            else:
                data = self.smrf_queue[variable['variable']].get(
                    self.time_step)
        return data

    def get_timestep_inputs(self):
        """Get all the forcing variable data from SMRF. Get the data either from
        the netCDF files or from SMRF directly.

        Returns:
            dict: dict of input values for all forcing variables
        """

        if self.awsm.smrf_connector.force is not None:
            data = self.awsm.smrf_connector.get_timestep_netcdf(self.time_step)

        else:
            data = {}
            for var, v in self.variable_list.items():
                # get the data desired
                smrf_data = self.get_smrf_data(v)

                if smrf_data is None:
                    smrf_data = self.init_zeros
                    self._logger.debug(
                        'No data from smrf to iSnobal for {} in {}'.format(
                            v['variable'], self.time_step))

                data[self.awsm.smrf_connector.MAP_INPUTS[var]] = smrf_data

        data['T_a'] = data['T_a'] + FREEZE
        data['T_pp'] = data['T_pp'] + FREEZE
        data['T_g'] = data['T_g'] + FREEZE
        data['time_step'] = self.time_step

        return data

    def set_current_time(self, step_time, time_since_out):
        """Set the current time and time since out

        Args:
            step_time (int): current time as integer from start
            time_since_out (int): time since out for the model results
        """

        self.output_rec['current_time'] = step_time * self.init_ones
        self.output_rec['time_since_out'] = time_since_out * self.init_ones

    def run_full_timestep(self):
        """Run the full timestep for iPysnobal. Includes getting the input,
        running iPysnobal for the timestep, copying the input data and
        outputing the results if needed.
        """

        self._logger.info(
            'running iPysnobal for timestep: {}'.format(self.time_step))

        self.input2 = self.get_timestep_inputs()

        first_step = self.step_index
        first_step = self.do_update(first_step)

        rt = snobal.do_tstep_grid(
            self.input1,
            self.input2,
            self.output_rec,
            self.time_step_info,
            self.options['constants'],
            self.params,
            first_step=first_step,
            nthreads=self.config['ithreads']
        )

        if rt != -1:
            raise ValueError(
                'ipysnobal error on time step {}, pixel {}'.format(
                    self.time_step, rt))

        self.input1 = self.input2.copy()

        self.output_timestep()

        self._logger.info(
            'Finished iPysnobal timestep: {}'.format(self.time_step))

    def smrf_ipysnobal_time_step(self):
        """Run the time step for a `smrf_ipysnobal` simulation
        """

        if self.step_index == 0:
            self.input1 = self.get_timestep_inputs()
        else:
            self.run_full_timestep()

    def run_full_timestep_threaded(self, smrf_queue, data_queue):
        """Run `smrf_ipysnobal` threaded where the SMRF data is pulled
        from the SMRF queue. This method is called within a Thread.

        Args:
            smrf_queue (dict): SMRF variable queue
            data_queue (dict): SMRF data queue (not used)
        """

        self._logger.info('Running iPysnobal thread')
        self.smrf_queue = smrf_queue

        for self.step_index, self.time_step in enumerate(self.date_time, 0):
            startTime = datetime.now()

            self.smrf_ipysnobal_time_step()

            smrf_queue['ipysnobal'].put([self.time_step, True])
            telapsed = datetime.now() - startTime
            self._logger.debug('iPysnobal {0:.2f} seconds for time step'
                               .format(telapsed.total_seconds()))

    def output_timestep(self):
        """Output the time step if on the right frequency.
        """

        out_freq = (self.step_index * self.data_time_step /
                    3600.0) % self.options['output']['frequency'] == 0
        last_time_step = self.step_index == len(
            self.options['time']['date_time']) - 1

        if out_freq or last_time_step:

            self._logger.info('iPysnobal outputting {}'.format(self.time_step))
            self.pysnobal_io.output_timestep(
                self.output_rec,
                self.time_step
            )

            self.output_rec['time_since_out'] = self.init_zeros

    def run_ipysnobal(self):
        """
        Function to run PySnobal from netcdf forcing data,
        not from SMRF instance.
        """

        self._logger.info('Initializing PySnobal from netcdf files')
        self.initialize_ipysnobal()

        self._logger.info('getting inputs for first timestep')

        self.force = self.awsm.smrf_connector.open_netcdf_files()
        self.time_step = self.date_time[0]
        self.input1 = self.get_timestep_inputs()

        self.initialize_updater()

        self._logger.info('starting PySnobal time series loop')

        for self.step_index, self.time_step in enumerate(self.date_time[1:], 1):  # noqa
            self.run_full_timestep()

        # close input files
        self.awsm.smrf_connector.close_netcdf_files()
        self.pysnobal_io.output_file.close()

    def run_smrf_ipysnobal(self):
        """
        Function to run SMRF and pass outputs in memory to python wrapped
        iSnobal.
        """

        with SMRF(
                self.awsm.smrf_connector.smrf_config,
                self._logger
        ) as self.smrf:

            self.smrf.loadTopo()
            self.smrf.create_distribution()
            self.smrf.initializeOutput()
            self.smrf.loadData()

            # run threaded or not
            if self.smrf.threading:
                self.run_smrf_ipysnobal_threaded()
            else:
                self.run_smrf_ipysnobal_serial()

        self.pysnobal_io.output_file.close()
        self._logger.debug('DONE!!!!')

    def run_smrf_ipysnobal_serial(self):
        """
        Running smrf and PySnobal in non-threaded application.
        """

        self._logger.info('Running SMRF and iPysnobal in serial')

        self.initialize_ipysnobal()

        self.smrf.initialize_distribution()

        self.variable_list = self.smrf.create_output_variable_dict(
            self.FORCING_VARIABLES, '.')

        self.initialize_updater()

        for self.step_index, self.time_step in enumerate(self.date_time):
            startTime = datetime.now()

            self.smrf.distribute_single_timestep(self.time_step)
            self.smrf.output(self.time_step)

            self.smrf_ipysnobal_time_step()

            telapsed = datetime.now() - startTime
            self.smrf._logger.debug('{0:.2f} seconds for time step'
                                    .format(telapsed.total_seconds()))

    def run_smrf_ipysnobal_threaded(self):
        """
        Function to run SMRF (threaded) and pass outputs in memory to python
        wrapped iSnobal. iPySnobal has replaced the output queue in this
        implimentation.
        """

        self._logger.info('Running SMRF and iPysnobal threaded')

        # initialize ipysnobal state
        self.initialize_ipysnobal()

        self.variable_list = self.smrf.create_output_variable_dict(
            self.FORCING_VARIABLES, '.')

        self.initialize_updater()

        self.smrf.create_data_queue()
        self.smrf.set_queue_variables()
        self.smrf.create_distributed_threads()

        # output thread
        self.smrf.threads.append(
            queue.QueueOutput(
                self.smrf.smrf_queue,
                self.date_time,
                self.smrf.out_func,
                self.smrf.config['output']['frequency'],
                self.smrf.topo.nx,
                self.smrf.topo.ny))

        # ipysnobal queue and thread
        self.smrf.smrf_queue['ipysnobal'] = queue.DateQueueThreading(
            self.smrf.queue_max_values,
            self.smrf.time_out,
            name='ipysnobal')

        self.smrf.threads.append(
            threading.Thread(
                target=self.run_full_timestep_threaded,
                name='ipysnobal',
                args=(self.smrf.smrf_queue, self.smrf.data_queue))
        )

        # the cleaner
        self.smrf.threads.append(queue.QueueCleaner(
            self.smrf.date_time, self.smrf.smrf_queue))

        # start all the threads
        for i in range(len(self.smrf.threads)):
            self.smrf.threads[i].start()

        for i in range(len(self.smrf.threads)):
            self.smrf.threads[i].join()