scripts/awsm_daily_airflow
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import copy
import os
import pandas as pd
import pytz
from inicheck.tools import get_user_config, cast_all_variables
from smrf.utils import utils
from awsm.framework.framework import run_awsm
def mod_config(config_file, start_date, no_previous):
"""
Run each day separately. Calls run_awsm
"""
# define some formats
fmt_day = '%Y%m%d'
fmt_cfg = '%Y-%m-%d %H:%M'
add_day = pd.to_timedelta(24, unit='h')
# get config instance
config = get_user_config(config_file,
modules=['smrf', 'awsm'])
# copy the config and get total start and end
# config = deepcopy(base_config)
# set naming style
config.raw_cfg['paths']['folder_date_style'] = 'day'
config.apply_recipes()
config = cast_all_variables(config, config.mcfg)
# get the water year
#cfg_start_date = pd.to_datetime(config.cfg['time']['start_date'])
tzinfo = pytz.timezone(config.cfg['time']['time_zone'])
wy = utils.water_day(start_date.replace(tzinfo=tzinfo))[1]
sd = start_date
ed = start_date + add_day
# # find output location for previous output
paths = config.cfg['paths']
prev_out_base = os.path.join(paths['path_dr'],
paths['basin'],
'wy{}'.format(wy),
paths['project_name'],
'runs')
prev_data_base = os.path.join(paths['path_dr'],
paths['basin'],
'wy{}'.format(wy),
paths['project_name'],
'data')
new_config = copy.deepcopy(config)
# set the start and end dates
new_config.raw_cfg['time']['start_date'] = sd.strftime(fmt_cfg)
new_config.raw_cfg['time']['end_date'] = ed.strftime(fmt_cfg)
# if no need for
if no_previous:
new_config.raw_cfg['ipysnobal']['init_type'] = None
new_config.raw_cfg['ipysnobal']['init_file'] = None
# find previous output file
else:
prev_day = sd - pd.to_timedelta(1, unit='D')
prev_out = os.path.join(prev_out_base,
'run{}'.format(prev_day.strftime(fmt_day)),
'ipysnobal.nc')
# reset if running the model
if new_config.cfg['awsm master']['model_type'] is not None:
new_config.raw_cfg['ipysnobal']['init_type'] = 'netcdf_out'
new_config.raw_cfg['ipysnobal']['init_file'] = prev_out
# if we have a previous storm day file, use it
prev_storm = os.path.join(prev_data_base,
'data{}'.format(prev_day.strftime(fmt_day)),
'smrfOutputs', 'storm_days.nc')
if os.path.isfile(prev_storm):
new_config.raw_cfg['precip']['storm_days_restart'] = prev_storm
return new_config
def parse_arguments():
parser = argparse.ArgumentParser(
description='Run AWSM using Airflow scheduler.'
)
parser.add_argument(
'-c', '--cfg',
required=True,
help='Config file that will be modified for the current run date'
)
parser.add_argument(
'-sd', '--start_date',
required=True,
help='start date for run'
)
parser.add_argument(
"-np", "--no_previous",
action="store_true",
default=False,
help="Doesn't need to find a previous snow state "
"from directory structure"
)
return parser.parse_args()
def run():
"""
This program will run each day as a separate run
"""
args = parse_arguments()
start_date = pd.to_datetime(args.start_date)
fp_cfg = args.cfg
no_previous = args.no_previous
# set dates and paths
new_config = mod_config(fp_cfg, start_date, no_previous)
# apply recipes with new settings
new_config.apply_recipes()
new_config = cast_all_variables(new_config, new_config.mcfg)
run_awsm(new_config)
if __name__ == '__main__':
run()