USDA-ARS-NWRC/awsm

View on GitHub
scripts/awsm_daily_airflow

Summary

Maintainability
Test Coverage
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import copy
import os

import pandas as pd
import pytz
from inicheck.tools import get_user_config, cast_all_variables
from smrf.utils import utils

from awsm.framework.framework import run_awsm


def mod_config(config_file, start_date, no_previous):
    """
    Run each day separately. Calls run_awsm
    """
    # define some formats
    fmt_day = '%Y%m%d'
    fmt_cfg = '%Y-%m-%d %H:%M'
    add_day = pd.to_timedelta(24, unit='h')

    # get config instance
    config = get_user_config(config_file,
                             modules=['smrf', 'awsm'])

    # copy the config and get total start and end
    # config = deepcopy(base_config)
    # set naming style
    config.raw_cfg['paths']['folder_date_style'] = 'day'
    config.apply_recipes()
    config = cast_all_variables(config, config.mcfg)

    # get the water year
    #cfg_start_date = pd.to_datetime(config.cfg['time']['start_date'])
    tzinfo = pytz.timezone(config.cfg['time']['time_zone'])
    wy = utils.water_day(start_date.replace(tzinfo=tzinfo))[1]

    sd = start_date
    ed = start_date + add_day

    # # find output location for previous output
    paths = config.cfg['paths']

    prev_out_base = os.path.join(paths['path_dr'],
                                 paths['basin'],
                                 'wy{}'.format(wy),
                                 paths['project_name'],
                                 'runs')

    prev_data_base = os.path.join(paths['path_dr'],
                                  paths['basin'],
                                  'wy{}'.format(wy),
                                  paths['project_name'],
                                  'data')

    new_config = copy.deepcopy(config)

    # set the start and end dates

    new_config.raw_cfg['time']['start_date'] = sd.strftime(fmt_cfg)
    new_config.raw_cfg['time']['end_date'] = ed.strftime(fmt_cfg)

    # if no need for
    if no_previous:
        new_config.raw_cfg['ipysnobal']['init_type'] = None
        new_config.raw_cfg['ipysnobal']['init_file'] = None
    # find previous output file
    else:
        prev_day = sd - pd.to_timedelta(1, unit='D')
        prev_out = os.path.join(prev_out_base,
                                'run{}'.format(prev_day.strftime(fmt_day)),
                                'ipysnobal.nc')
        # reset if running the model
        if new_config.cfg['awsm master']['model_type'] is not None:
            new_config.raw_cfg['ipysnobal']['init_type'] = 'netcdf_out'
            new_config.raw_cfg['ipysnobal']['init_file'] = prev_out

        # if we have a previous storm day file, use it
        prev_storm = os.path.join(prev_data_base,
                                  'data{}'.format(prev_day.strftime(fmt_day)),
                                  'smrfOutputs', 'storm_days.nc')
        if os.path.isfile(prev_storm):
            new_config.raw_cfg['precip']['storm_days_restart'] = prev_storm

    return new_config


def parse_arguments():
    parser = argparse.ArgumentParser(
        description='Run AWSM using Airflow scheduler.'
    )

    parser.add_argument(
        '-c', '--cfg',
        required=True,
        help='Config file that will be modified for the current run date'
    )
    parser.add_argument(
        '-sd', '--start_date',
        required=True,
        help='start date for run'
    )
    parser.add_argument(
        "-np", "--no_previous",
        action="store_true",
        default=False,
        help="Doesn't need to find a previous snow state "
             "from directory structure"
    )

    return parser.parse_args()


def run():
    """
    This program will run each day as a separate run
    """
    args = parse_arguments()

    start_date = pd.to_datetime(args.start_date)
    fp_cfg = args.cfg
    no_previous = args.no_previous

    # set dates and paths
    new_config = mod_config(fp_cfg, start_date, no_previous)

    # apply recipes with new settings
    new_config.apply_recipes()
    new_config = cast_all_variables(new_config, new_config.mcfg)

    run_awsm(new_config)


if __name__ == '__main__':
    run()