tools/nexrad_msgs/parse_spec.py
#!/usr/bin/env python
# Copyright (c) 2015 MetPy Developers.
# Distributed under the terms of the BSD 3-Clause License.
# SPDX-License-Identifier: BSD-3-Clause
"""Parse specification extracted from NEXRAD ICD PDFs and generate Python code."""
import warnings
def register_processor(num):
"""Register functions to handle particular message numbers."""
def inner(func):
"""Perform actual function registration."""
processors[num] = func
return func
return inner
processors = {}
@register_processor(3)
def process_msg3(fname):
"""Handle information for message type 3."""
with open(fname) as infile:
info = []
for lineno, line in enumerate(infile):
parts = line.split(' ')
try:
var_name, desc, typ, units = parts[:4]
size_hw = parts[-1]
if '-' in size_hw:
start, end = map(int, size_hw.split('-'))
size = (end - start + 1) * 2
else:
size = 2
assert size >= 2
fmt = fix_type(typ, size)
var_name = fix_var_name(var_name)
full_desc = fix_desc(desc, units)
info.append({'name': var_name, 'desc': full_desc, 'fmt': fmt})
if ignored_item(info[-1]) and var_name != 'Spare':
warnings.warn(f'{var_name} has type {typ}. Setting as Spare')
except (ValueError, AssertionError):
warnings.warn('{} > {}'.format(lineno + 1, ':'.join(parts)))
raise
return info
@register_processor(18)
def process_msg18(fname):
"""Handle information for message type 18."""
with open(fname) as infile:
info = []
for lineno, line in enumerate(infile):
parts = line.split(' ')
try:
if len(parts) == 8:
parts = parts[:6] + [parts[6] + parts[7]]
# var_name, desc, typ, units, rng, prec, byte_range
var_name, desc, typ, units, _, _, byte_range = parts
start, end = map(int, byte_range.split('-'))
size = end - start + 1
assert size >= 4
fmt = fix_type(typ, size,
additional=[('See Note (5)', ('{size}s', 1172))])
if ' ' in var_name:
warnings.warn(f'Space in {var_name}')
if not desc:
warnings.warn(f'null description for {var_name}')
var_name = fix_var_name(var_name)
full_desc = fix_desc(desc, units)
info.append({'name': var_name, 'desc': full_desc, 'fmt': fmt})
if (ignored_item(info[-1]) and var_name != 'SPARE'
and 'SPARE' not in full_desc):
warnings.warn(f'{var_name} has type {typ}. Setting as SPARE')
except (ValueError, AssertionError):
warnings.warn('{} > {}'.format(lineno + 1, ':'.join(parts)))
raise
return info
types = [('Real*4', ('f', 4)), ('Integer*4', ('L', 4)), ('SInteger*4', ('l', 4)),
('Integer*2', ('H', 2)),
('', lambda s: ('{size}x', s)), ('N/A', lambda s: ('{size}x', s)),
(lambda t: t.startswith('String'), lambda s: ('{size}s', s))]
def fix_type(typ, size, additional=None):
"""Fix up creating the appropriate struct type based on the information in the column."""
my_types = types + additional if additional is not None else types
for t, info in my_types:
matches = t(typ) if callable(t) else t == typ
if matches:
fmt_str, true_size = info(size) if callable(info) else info
assert size == true_size, (f'{typ}: Got size {size} instead of {true_size}')
return fmt_str.format(size=size)
raise ValueError(f'No type match! ({typ})')
def fix_var_name(var_name):
"""Clean up and apply standard formatting to variable names."""
name = var_name.strip()
for char in '(). /#,':
name = name.replace(char, '_')
name = name.replace('+', 'pos_')
name = name.replace('-', 'neg_')
if name.endswith('_'):
name = name[:-1]
return name
def fix_desc(desc, units=None):
"""Clean up description column."""
full_desc = desc.strip()
if units and units != 'N/A':
if full_desc:
full_desc += ' (' + units + ')'
else:
full_desc = units
return full_desc
def ignored_item(item):
"""Determine whether this item should be ignored."""
return item['name'].upper() == 'SPARE' or 'x' in item['fmt']
def need_desc(item):
"""Determine whether we need a description for this item."""
return item['desc'] and not ignored_item(item)
def field_name(item):
"""Return the field name if appropriate."""
return '"{:s}"'.format(item['name']) if not ignored_item(item) else None
def field_fmt(item):
"""Return the field format if appropriate."""
return '"{:s}"'.format(item['fmt']) if '"' not in item['fmt'] else item['fmt']
def write_file(fname, info):
"""Write out the generated Python code."""
with open(fname, 'w') as outfile:
# File header
outfile.write('# Copyright (c) 2018 MetPy Developers.\n')
outfile.write('# Distributed under the terms of the BSD 3-Clause License.\n')
outfile.write('# SPDX-License-Identifier: BSD-3-Clause\n\n')
outfile.write('# flake8: noqa\n')
outfile.write('# Generated file -- do not modify\n')
# Variable descriptions
outfile.write('descriptions = {')
outdata = ',\n '.join('"{name}": "{desc}"'.format(
**i) for i in info if need_desc(i))
outfile.write(outdata)
outfile.write('}\n\n')
# Now the struct format
outfile.write('fields = [')
outdata = ',\n '.join('({fname}, "{fmt}")'.format(
fname=field_name(i), **i) for i in info)
outfile.write(outdata)
outfile.write(']\n')
if __name__ == '__main__':
from pathlib import Path
for num in [18, 3]:
fname = f'msg{num:d}.spec'
print(f'Processing {fname}...') # noqa: T201
info = processors[num](fname)
fname = Path(fname).with_suffix('.py')
write_file(fname, info)