AerodyneLabs/Athena

View on GitHub
app/atmosphere/tasks/nomads.py

Summary

Maintainability
A
2 hrs
Test Coverage
from datetime import datetime, timedelta
from collections import namedtuple
import pytz
import requests

GFS_1_0_DEGREE = 0
GFS_0_5_DEGREE = 1
GFS_0_25_DEGREE = 2
GFS_MODEL_TIMESTEP = 6
GFS_FORECAST_TIMESTEP = 3
GFS_FORECAST_MAX = 240
GFS_RESOLUTION = {
    GFS_1_0_DEGREE: '1p00',
    GFS_0_5_DEGREE: '0p50',
    GFS_0_25_DEGREE: '0p25',
}

def gfs_url(model_run=datetime.now(tz=pytz.utc), forecast_time=datetime.now(tz=pytz.utc), resolution=GFS_1_0_DEGREE):
    # Ensure times are not naive
    if model_run.tzinfo is None or model_run.tzinfo.utcoffset(model_run) is None:
        raise ValueError('model_run (%s) is naive!', model_run)
    if forecast_time.tzinfo is None or forecast_time.tzinfo.utcoffset(forecast_time) is None:
        raise ValueError('forecast_time (%s) is naive!', forecast_time)

    # Ensure forecast_time is not before model_run
    if forecast_time < model_run:
        raise ValueError('forecast_time (%s) is before model_run (%s)!', forecast_time, model_run)

    # Ensure resolution is known
    if resolution not in GFS_RESOLUTION:
        raise ValueError('resolution (%s) not valid!', resolution)

    # Ensure model_run is not in the future
    if model_run > datetime.now(tz=pytz.utc):
        raise ValueError('model_run (%s) is in the future!', model_run)

    # Get previous model run time
    model_run = previous_model(model_run)

    # Get previous forecast time
    forecast_time = previous_forecast(model_run, forecast_time)

    # Compute forecast hours
    forecast_hours = int((forecast_time - model_run).total_seconds() // 3600)

    # Generate url fragments
    base_url = 'http://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/'
    model_folder = 'gfs.{year:04d}{month:02d}{day:02d}{hour:02d}/'.format(
        year=model_run.year,
        month=model_run.month,
        day=model_run.day,
        hour=model_run.hour
    )
    forecast_filename = 'gfs.t{run:02d}z.pgrb2.{resolution}.f{forecast:03d}'.format(
        run=model_run.hour,
        resolution=GFS_RESOLUTION[resolution],
        forecast=forecast_hours
    )

    # Return full url
    return base_url + model_folder + forecast_filename

def previous_model(model, conservative=False):
    discard = timedelta(
        hours=model.hour % GFS_MODEL_TIMESTEP,
        minutes=model.minute,
        seconds=model.second,
        microseconds=model.microsecond
    )

    if conservative:
        discard += timedelta(hours=6)

    return model - discard

def previous_forecast(model, forecast):
    # Compute number of hours between model and forecast
    dt = (forecast - model).total_seconds() / 3600
    # Get forecast timestep before requested forecast time
    h = (dt // GFS_FORECAST_TIMESTEP) * GFS_FORECAST_TIMESTEP
    # Limit forecast time to maximum timestep
    if h > GFS_FORECAST_MAX:
        h = GFS_FORECAST_MAX
    # Return forecast timestep time
    return model + timedelta(hours=h)

IndexRecord = namedtuple('IndexRecord', [
    'id',
    'start_byte',
    'model_run',
    'name',
    'level',
    'forecast',
    'reserved'
])

def _get_index(forecast_url):
    # Download index file
    index_url = forecast_url + '.idx'
    r = requests.get(index_url)

    if r.status_code != requests.codes.ok:
        raise RuntimeError('Downloading %s failed with error %s!', index_url, r.status_code)

    # Parse index
    index = []
    for line in iter(r.text.splitlines()):
        tokens = line.split(':')
        tokens[0] = int(tokens[0])
        tokens[1] = int(tokens[1])
        record = IndexRecord(*tokens)
        index.append(record)

    return index