pylhc/generic_parser

View on GitHub
generic_parser/entrypoint_parser.py

Summary

Maintainability
D
1 day
Test Coverage
C
78%
"""
Entrypoint Parser
-----------------

This module contains high-level objects to manage the functionality of ``generic_parser``: entry
point parser and decorator.

These allow a function to be decorated as entrypoint. This function will then automatically
accept console arguments, config files, json files, kwargs and dictionaries as input and will
parse it according to the parameters given to the entrypoint-Decorator.

Terminology:
++++++++++++++++++++++++

    * **Parameter** - Items containing info on how to parse Arguments
    * **Arguments** - The input to the wrapped-function
    * **Options** - The parsed arguments and hence the options of the function

Hence, an :class:`ArgumentError` will be raised in case of something going wrong during parsing,
while :class:`ParameterErrors` will be raised if something goes wrong when adding parameters to
the list.

Usage:
++++++++++++++++++++++++

To be used as a decorator::

    @entrypoint(parameters)
    def some_function(options, unknown_options)

Using **strict** mode (see below)::

    @entrypoint(parameters, strict=True)
    def some_function(options)

It is also possible to use the EntryPoint Class similar to a normal parser::

    ep_parser = EntryPoint(parameters)
    options, unknown_options = ep_parser.parse(arguments)

Using **strict** mode (see below)::

    ep_parser = EntryPoint(parameters, strict=True)
    options = ep_parser.parse(arguments)


Parameters:
++++++++++++++++++++++++

Parameters need to be a list or a dictionary of dictionaries with the following keys:

| **name** (*required*): Name of the variable (e.g. later use options.NAME).
 If 'params' is a dictionary, the key will be used as name.
| **flags** (*required*): Commandline flag(s), e.g. ``--file``
| **required** (*optional*): ``bool``
| **default** (*optional*): Default value, if variable not present
| **help** (*optional*): ``str``
| **type** (*optional*): Value ``type`` (if nargs is given, set to list for dicts!)
| **choices** (*optional*): choices to choose from
 (choices need to be of ``type``, if given)
| **nargs** (*optional*): number of arguments to consume
 (commandline only, do not use ``REMAINDER``!)
| **action** (*optional*): either ``store_true`` or ``store_false``, will set ``type`` to bool
 and the default to ``False`` and ``True`` respectively.


Alternatively, you can use the provided ``EntryPointParameters()`` class.

Example with ``EntryPointParameters``:

.. code-block:: python

    args = EntryPointParameters()
    args.add_parameter(name="accel",
                       flags=["-a", "--accel"],
                       help="Which accelerator?",
                       choices=["LHCB1","LHCB2","LHCB5"],
                       default="LHCB1")
    args.add_parameter(name="dict",
                       flags=["-d", "--dictionary"],
                       help="File with the BPM dictionary",
                       default="bpm.txt",
                       type=str)


Example with dictionary of dictionaries:

.. code-block:: python

    args = EntryPointParameters({
        "accel": dict(
            flags=["-a", "--accel"],
            help="Which accelerator?",
            choices=["LHCB1", "LHCB2", "LHCB5"],
            default="LHCB1"),
        "dict": dict(
            flags=["-d", "--dictionary"],
            help="File with the BPM dictionary",
            default="bpm.txt",
            type=str),
            })


Example with list of dictionaries:

.. code-block:: python

    args = [
        dict(
            name="accel",
            flags=["-a", "--accel"],
            help="Which accelerator?",
            choices=["LHCB1", "LHCB2", "LHCB5"],
            default="LHCB1"),
        dict(
            name="dict",
            flags=["-d", "--dictionary"],
            help="File with the BPM dictionary",
            default="bpm.txt",
            type=str),
            ]


The **strict** option changes the behaviour for unknown parameters:
``strict=True`` raises exceptions, ``strict=False`` logs debug messages and returns the options.
Hence a wrapped function with ``strict=True`` must accept one input, with ``strict=False`` two.
Defaults to ``False``


Arguments:
++++++++++++++++++++++++

Arguments to a decorated function can be passed in multiple ways.
Let's take the decorated example function::

    @entrypoint(EntryPointParameters({"foo": {}, "bar": {}}), strict=True)
    def entry_function(opt):
        print(opt)


Then the different ways to call it are, from python::

    entry_function(foo='mark', bar='twain')  # keyword args

    entry_function(dict(foo='mark', bar='twain'))  # dictionary
    entry_function(entry_dict=dict(foo='mark', bar='twain'))  # dictionary

    entry_function(['--foo', 'mark', '--bar', 'twain'])  # commandline args

    entry_function(entry_cfg=Path('config.ini'))  # config file
    entry_function(entry_cfg=Path('config.ini'), section='section')  # '[section]' in config file


    entry_function(entry_json=Path('config.json'))  # json file
    entry_function(entry_json=Path('config.json'), section='section')  # '[section]' in json file

or as commandline arguments from a **script.py** that calls ``entry_function()``::

    python script.py --foo mark --bar twain
    python script.py --entry_cfg config.ini
    python script.py --entry_cfg config.ini --section section
"""
import copy
import json
import argparse
import sys
from argparse import ArgumentParser
from configparser import ConfigParser
from inspect import getfullargspec
from functools import wraps
from pathlib import Path
from textwrap import wrap
from typing import Callable, Mapping

from generic_parser.tools import DotDict, silence, unformatted_console_logging, StringIO, log_out
from generic_parser.dict_parser import ParameterError, ArgumentError, DictParser

import logging
LOG = logging.getLogger(__name__)


ID_CONFIG = "entry_cfg"
ID_DICT = "entry_dict"
ID_JSON = "entry_json"
ID_SECTION = "section"


# EntryPoint Class #############################################################


class EntryPoint:
    def __init__(self, parameter, strict: bool = False, argument_parser_args: Mapping = None, help_printer: Callable = None):
        """Initialize decoration: Handle the desired input parameter."""
        self.strict = strict

        # add argument dictionary to EntryPoint
        self.remainder = None
        self.parameter = dict2list_param(parameter)
        self._check_parameter()  # just simple checks for structure

        # add config-argparser
        self.configarg = self._create_config_argument()

        # create parsers from parameter
        # this also ensures that the parameters are correctly defined,
        # by tests in argparser and in Parameter(),
        # which is used in dict_parser -> add parameter
        self.argparse = self._create_argument_parser(argument_parser_args)
        self.dictparse = self._create_dict_parser()  # also used for configfiles
        self._help_printer = help_printer

    def parse(self, *args, **kwargs):
        """
        Parse whatever input parameter come. This is the heart of EntryPoint and will recognize
        the input and parse it accordingly. Allowed inputs are:
            - Dictionary with arguments as key-values.
            - Key-Value arguments.
            - Path to a one-section config file.
            - Commandline Arguments.
            - Commandline Arguments in string-form (as list).
            - Special Key-Value arguments are:
                entry_dict: Value is a dict of arguments.
                entry_cfg: Path to config file.
                entry_json: Path to json file.
                section: Section to use in config file, or subdirectory to use in json file.
                         Only works with the key-value version of config file.
                         If not given only one-section config files are allowed.
         """
        if len(args) > 0 and len(kwargs) > 0:
            raise ArgumentError("Cannot combine positional parameter with keyword parameter.")

        if len(args) > 1:
            raise ArgumentError("Only one positional argument allowed (dict or config file).")

        if args and args[0] is not None:
            # LOG.info("Entry input: {:s}".format(args[0]))  # activate for debugging
            options = self._handle_arg(args[0])
        elif len(kwargs) > 0:
            # LOG.info("Entry input: {:s}".format(kwargs))  # activate for debugging
            options = self._handle_kwargs(kwargs)
        else:
            # LOG.info("Entry input: {:s}".format(" ".join(sys.argv))  # activate for debugging
            options = self._handle_commandline()

        return options  # options might include known and unknown options

    #########################
    # Create Parsers
    #########################

    def _create_config_argument(self):
        """Creates the config-file argument parser."""
        parser = ArgumentParser()
        parser.add_argument('--{}'.format(ID_CONFIG), type=str, dest=ID_CONFIG, required=True,)
        parser.add_argument('--{}'.format(ID_SECTION), type=str, dest=ID_SECTION,)
        return parser

    def _create_argument_parser(self, args_dict: Mapping):
        """Creates the ArgumentParser from parameter."""

        if args_dict:
            parser = ArgumentParser(**args_dict)
        else:
            parser = ArgumentParser()

        parser = _add_params_to_argument_parser(parser, self.parameter)
        return parser

    def _create_dict_parser(self):
        """Creates the DictParser from parameter."""
        parser = DictParser(strict=self.strict)
        parser = _add_params_to_dict_parser(parser, self.parameter)
        return parser

    def _create_config_parser(self):
        """Creates the config parser. Maybe more to do here later with parameter."""
        parser = ConfigParser({})
        return parser

    #########################
    # Handlers
    #########################

    def _handle_commandline(self, args=None):
        """No input to function."""
        try:
            # check for config file first
            with silence():
                options = self.configarg.parse_args(args)
        except SystemExit:
            # parse regular options
            errors_io = StringIO()
            try:
                with log_out(stderr=errors_io):  # errors go into errors_io
                    options, unknown_opts = self.argparse.parse_known_args(args)
            except SystemExit as e:
                errors_str = errors_io.getvalue()
                # print help on wrong input
                if self._help_printer and e.code == 2:  # code 0 means help has been printed
                    self._help_printer(self.argparse.format_help())
                    # remove duplicated "usage" line
                    errors_str = "\n".join(errors_str.split("\n")[-2:])

                # print errors now (if any)
                sys.stderr.write(errors_str)
                raise e

            # print help, even if passed on to other parser
            if self._help_printer and "--help" in unknown_opts:
                self._help_printer(self.argparse.format_help())

            options = DotDict(vars(options))
            if self.strict:
                if unknown_opts:
                    raise ArgumentError(f"Unknown options: {unknown_opts}")
                return options
            else:
                if unknown_opts:
                    LOG.debug(f"Unknown options: {unknown_opts}")
                return options, unknown_opts
        else:
            # parse config file
            return self.dictparse.parse_config_items(self._read_config(vars(options)[ID_CONFIG]))

    def _handle_arg(self, arg):
        """*args has been input."""
        if isinstance(arg, str):
            # assume config file
            options = self.dictparse.parse_config_items(self._read_config(arg))
        elif isinstance(arg, dict):
            # dictionary
            options = self.dictparse.parse_arguments(arg)
        elif isinstance(arg, list):
            # list of commandline parameter
            options = self._handle_commandline(arg)
        else:
            raise ArgumentError("Only dictionary or configfiles "
                                "are allowed as positional arguments")
        return options  # options might include known and unknown options

    def _handle_kwargs(self, kwargs):
        """**kwargs been input."""
        if ID_CONFIG in kwargs:
            if len(kwargs) > 2 or (len(kwargs) == 2 and ID_SECTION not in kwargs):
                raise ArgumentError(
                    f"Only '{ID_CONFIG:s}' and '{ID_SECTION:s}'" +
                    " arguments are allowed, when using a config file.")
            options = self._read_config(kwargs[ID_CONFIG],
                                        kwargs.get(ID_SECTION, None))
            options = self.dictparse.parse_config_items(options)

        elif ID_DICT in kwargs:
            if len(kwargs) > 1:
                raise ArgumentError("Only one argument allowed when using a dictionary")
            options = self.dictparse.parse_arguments(kwargs[ID_DICT])

        elif ID_JSON in kwargs:
            if len(kwargs) > 2 or (len(kwargs) == 2 and ID_SECTION not in kwargs):
                raise ArgumentError(
                    f"Only '{ID_JSON:s}' and '{ID_SECTION:s}'" +
                    " arguments are allowed, when using a json file.")
            with open(kwargs[ID_JSON], 'r') as json_file:
                json_dict = json.load(json_file)

            if ID_SECTION in kwargs:
                json_dict = json_dict[kwargs[ID_SECTION]]

            options = self.dictparse.parse_arguments(json_dict)

        else:
            options = self.dictparse.parse_arguments(kwargs)

        return options   # options might include known and unknown options

    #########################
    # Helpers
    #########################

    def _check_parameter(self):
        """EntryPoint specific checks for parameter."""
        for param in self.parameter:
            arg_name = param.get("name", None)
            if arg_name is None:
                raise ParameterError("A Parameter needs a Name!")

            if param.get("nargs", None) == argparse.REMAINDER:
                raise ParameterError(f"Parameter '{arg_name:s}' is set as remainder." +
                                     "This method is really buggy, hence it is forbidden.")

            if param.get("nargs", None) == argparse.OPTIONAL:
                raise ParameterError(f"Parameter '{arg_name:s}' is set as optional." +
                                     "As entrypoint does not use 'const', the use is prohibited.")

            if param.get("flags", None) is None:
                # if flags aren't supplied, it defaults to the name
                LOG.debug(f'Missing flags parameter. Defaulting to --{arg_name}')
                param['flags'] = [f'--{arg_name}']

    def _read_config(self, cfgfile_path, section=None):
        """Get content from config file."""
        # create new config parser, as it keeps defaults between files
        cfgparse = self._create_config_parser()

        with open(cfgfile_path) as config_file:
            cfgparse.read_file(config_file)

        sections = cfgparse.sections()
        if not section:
            if len(sections) == 0:
                section = "DEFAULT"  # name does not matter, takes defaults, but None is not allowed
            elif len(sections) == 1:
                section = sections[0]  # our convention
            elif len(sections) > 1:
                raise ArgumentError(f"'{cfgfile_path:s}' contains multiple sections. " +
                                    " Please specify one!")

        items = cfgparse.items(section)
        return items


# entrypoint Decorator #########################################################


class entrypoint(EntryPoint):
    """
    Decorator extension of `EntryPoint`. Implements the ``__call__`` method needed for decorating.
    Lowercase looks nicer if used as decorator.
    """

    def __call__(self, func):
        """
        Builds wrapper around the function ``func`` (called on decoration). Whenever the
        decorated function is called, actually this wrapper is called. The Number of arguments is
        checked for compliance with instance- and class- methods,

        Meaning: if there is one more argument as there should be, we pass it on as it is
        (should be) either ``self`` or ``cls``. One could check that there are no varargs and
        keywords, but let's assume the user is doing the right things.

        Hint: To check for bound functions (i.e. with ``self`` or ``cls``) via
        ``hasattr(func, "__self__")`` will not work here, as functions are bound later.
        """
        func_args = getfullargspec(func).args
        nargs = len(func_args)
        is_bound = func_args[0] in ['self', 'cls']  # naming assumption...sorry (jdilly)

        if self.strict:
            if not is_bound and nargs == 1:
                @wraps(func)
                def wrapper(*args, **kwargs):
                    return func(self.parse(*args, **kwargs))
            elif is_bound and nargs == 2:
                @wraps(func)
                def wrapper(other, *args, **kwargs):
                    return func(other, self.parse(*args, **kwargs))
            else:
                raise OptionsError("In strict mode, only one option-structure will be passed."
                                   " The entrypoint needs to have the following structure: "
                                   " ([self/cls,] options)."
                                   f" Found: '{func.__name__:s}({', '.join(func_args):s})'")
        else:
            if not is_bound and nargs == 2:
                @wraps(func)
                def wrapper(*args, **kwargs):
                    options, unknown_options = self.parse(*args, **kwargs)
                    return func(options, unknown_options)
            elif is_bound and nargs == 3:
                @wraps(func)
                def wrapper(other, *args, **kwargs):
                    options, unknown_options = self.parse(*args, **kwargs)
                    return func(other, options, unknown_options)
            else:
                raise OptionsError("Two option-structures will be passed."
                                   " The entrypoint needs to have the following structure: "
                                   " ([self/cls,] options, unknown_options)."
                                   f" Found: '{func.__name__:s}({', '.join(func_args):s})'")
        return wrapper


# EntryPoint Arguments #########################################################


class EntryPointParameters(DotDict):
    """
    Helps to build a simple dictionary structure via add_argument functions.
    You really don't need that, but old habits die hard.
    """
    def add_parameter(self, **kwargs):
        """Add parameter."""
        name = kwargs.pop("name")
        if name in self:
            raise ParameterError(f"'{name:s}' is already a parameter.")
        else:
            self[name] = kwargs

    def help(self):
        """Prints current help. Usable to paste into docstrings."""
        optional_params = []
        required_params = []
        space = " " * 4

        for name in sorted(self.keys()):
            item = self[name]
            try:
                name_and_type = f"- **{name}** *({item['type'].__name__})*:\n\n"
            except KeyError:
                name_and_type = f"- **{name}**:\n\n"

            try:
                help_str = f"{item['help']}"
            except KeyError:
                help_str = "-No info available-"

            help_str = "\n".join([f"{space}{line}" for line in wrap(help_str, 70)])
            help_str = f"{help_str}\n\n"

            try:
                flags = f"{space}flags: **{item['flags']}**\n\n"
            except KeyError:
                flags = ''

            try:
                choices = f"{space}choices: ``{item['choices']}``\n\n"
            except KeyError:
                choices = ''

            try:
                default = f"{space}default: ``{item['default']}``\n\n"
            except KeyError:
                default = ''

            try:
                action = f"{space}action: ``{item['action']}``\n\n"
            except KeyError:
                action = ''

            item_str = f"{name_and_type}{help_str}{flags}{choices}{default}{action}"

            if item.get("required", False):
                required_params.append(item_str)
            else:
                optional_params.append(item_str)

        if required_params:
            LOG.info("*--Required--*\n")
            LOG.info("\n".join(required_params))

        if optional_params:
            LOG.info("*--Optional--*\n")
            LOG.info("\n".join(optional_params))


# Public Helpers ###############################################################


class OptionsError(Exception):
    pass


def dict2list_param(param):
    """Convert dictionary to list and add name by key."""
    if isinstance(param, dict):
        out = []
        for key in param:
            item = param[key]
            item["name"] = key
            out.append(item)
        return out
    return param


def list2dict_param(param):
    """Convert list to dictionary for quicker find."""
    if isinstance(param, list):
        out = {}
        for p in param:
            out[p["name"]] = p
        return out
    return param


def add_to_arguments(args, entry_params=None, **kwargs):
    """
    Adds arguments to an existing list or dictionary of arguments. If ``args`` is a `list`,
    the flags of the names given will be added and ``entry_params`` is required.

    Args:
        args (list,dict): Arguments (e.g. from unknown).
        entry_params (list, dict): Parameter belonging to the arguments.

    Keyword Args:
        Name and value of the arguments to add.
    """
    if isinstance(args, list):
        if entry_params is None:
            raise ParameterError("For commandline arguments, entry_params need to be supplied.")

        params = list2dict_param(entry_params)
        for key, value in kwargs.items():
            flag = params[key].get("flags", f"--{key}")
            if isinstance(flag, list):
                flag = flag[0]
            if isinstance(value, list):
                args.extend([flag] + [str(v) for v in value])
            else:
                args.extend([flag, str(value)])
    else:
        args.update(kwargs)
    return args


# parameter adders ---

def add_params_to_generic(parser, params):
    """
    Adds entry-point style parameter to either `ArgumentParser`, `DictParser` or
    `EntryPointParameters`.
    """
    if isinstance(parser, EntryPointParameters):
        parser = _add_params_to_entrypoint_parameters(parser, params)

    elif isinstance(parser, ArgumentParser):
        parser = _add_params_to_argument_parser(parser, params)

    elif isinstance(parser, DictParser):
        parser = _add_params_to_dict_parser(parser, params)

    else:
        raise TypeError("Parser not recognised.")
    return parser


def _add_params_to_entrypoint_parameters(entry_parser, params):
    params = dict2list_param(copy.deepcopy(params))
    for param in params:
        entry_parser.add_parameter(**param)
    return entry_parser


def _add_params_to_dict_parser(dict_parser, params):
    params = dict2list_param(copy.deepcopy(params))
    for param in params:
        if "nargs" in param:
            param["subtype"] = param.get("type", None)
            param["type"] = list

        if "action" in param:
            if param["action"] in ("store_true", "store_false"):
                param["type"] = bool
                param["default"] = not param["action"][6] == "t"
            else:
                raise ParameterError(f"Action '{param['action']:s}' not allowed in EntryPoint")
            param.pop("action")

        param.pop("flags", None)

        name = param.pop("name")
        dict_parser.add_parameter(name, **param)
    return dict_parser


def _add_params_to_argument_parser(arg_parser, params):
    params = dict2list_param(copy.deepcopy(params))
    for param in params:
        param["dest"] = param.pop("name", None)
        flags = param.pop("flags", None)
        if flags is None:
            arg_parser.add_argument(**param)
        else:
            if isinstance(flags, str):
                flags = [flags]
            arg_parser.add_argument(*flags, **param)
    return arg_parser

# ---


def split_arguments(args, *param_list):
    """
    Divide remaining arguments into a list of argument-dicts, fitting to the params in param_list.

    Args:
        args: Input arguments, either as list of strings or dict.
        param_list: list of sets of entry-point parameters (either dict, or list).

    Returns:
        A list of dictionaries containing the arguments for each set of parameters, plus one more
        entry for unknown parameters. If the input was a list of argument-strings, the parameters
        will already be parsed.

    .. warning:: Unless you know what you are doing, run this function only on
                 remaining-arguments from entry point parsing, not on the actual arguments

    .. warning:: Adds each argument only once, to the set of params who claim it first!
    """
    split_args = []
    if isinstance(args, list):
        # strings of commandline parameters, has to be parsed twice
        # (as I don't know how to handle flags properly)
        for params in param_list:
            parser = argparse.ArgumentParser()
            parser = _add_params_to_argument_parser(parser, params)
            this_args, args = parser.parse_known_args(args)
            split_args.append(DotDict(this_args.__dict__))
        split_args.append(args)
    else:
        # should be a dictionary of params, so do it the manual way
        for params in param_list:
            params = param_names(params)
            split_args.append(DotDict([(key, args.pop(key))
                                       for key in list(args.keys()) if key in params]))
        split_args.append(DotDict(args))
    return split_args


def param_names(params):
    """Get the names of the parameters, no matter if they are a dict or list of dicts."""
    try:
        names = params.keys()
    except AttributeError:
        names = [p["name"] for p in params]
    return names


def create_parameter_help(module, param_fun=None):
    """
    Print params help quickly but changing the logging format first.

    Usage Example::
        import amplitude_detuning_analysis
        create_parameter_help(amplitude_detuning_analysis)
        create_parameter_help(amplitude_detuning_analysis, "_get_plot_params")
    """
    with unformatted_console_logging():
        if param_fun is None:
            try:
                module.get_params().help()
            except AttributeError:
                module._get_params().help()
        else:
            getattr(module, param_fun)().help()


def save_options_to_config(filepath, opt, unknown=None):
    """
    Saves the parsed options to a config file to be used as arguments.

    Args:
        filepath: path to write the config file to.
        opt: parsed known options.
        unknown: unknown options (only safe for non-commandline parameters).
    """
    def _to_key_value_str(key, value):
        if value is None:
            value = ''  # defined as empty (see dict_parser._convert_config_items)
        elif isinstance(value, (str, Path)):
            value = f'"{value}"'
            if "\n" in value:
                value = value.replace("\n", "\n    ")  # spaces in new line indicate continuation
        return f"{key} = {value}\n"

    lines = "[DEFAULT]\n"
    for o in opt:
        lines += _to_key_value_str(o, opt[o])

    if unknown is not None and len(unknown):
        lines += "; Unknown options --------------------------\n"
        if isinstance(unknown, dict):
            for o in unknown:
                lines += _to_key_value_str(o, unknown[o])
        else:
            lines += f"; {' '.join(unknown)}\n"

    with open(filepath, "w") as f:
        f.write(lines)