openpathsampling/openpathsampling-cli

View on GitHub
paths_cli/param_core.py

Summary

Maintainability
A
3 hrs
Test Coverage
import click
import os


class AbstractParameter(object):
    """Abstract wrapper for click parameters.

    Forbids use setting ``required``, because we do that for each command
    individually.

    Parameters
    ----------
    args :
        args to pass to click parameters
    kwargs :
        kwargs to pass to click parameters
    """
    def __init__(self, *args, **kwargs):
        self.args = args
        if 'required' in kwargs:
            raise ValueError("Can't set required status now")
        self.kwargs = kwargs

    def clicked(self, required=False):
        raise NotImplementedError()


# we'll use tests of the -h option in the .travis.yml to ensure that the
# .clicked methods work
class Option(AbstractParameter):
    """Wrapper for click.option decorators"""
    def clicked(self, required=False):  # no-cov
        """Create the click decorator"""
        return click.option(*self.args, **self.kwargs, required=required)


class Argument(AbstractParameter):
    """Wrapper for click.argument decorators"""
    def clicked(self, required=False):  # no-cov
        """Create the click decorator"""
        return click.argument(*self.args, **self.kwargs, required=required)


class AbstractLoader(object):
    """Abstract object for getting relevant OPS object from the CLI.

    Parameters
    ----------
    param : :class:`.AbstractParameter`
        the Option or Argument wrapping a click decorator
    """
    def __init__(self, param):
        self.param = param

    @property
    def clicked(self):  # no-cov
        """Create the click decorator"""
        return self.param.clicked

    def get(self, *args, **kwargs):
        """Get the desired OPS object, based on the CLI input"""
        raise NotImplementedError()


class StorageLoader(AbstractLoader):
    """Open an OPS storage file

    Parameters
    ----------
    param : :class:`.AbstractParameter`
        the Option or Argument wrapping a click decorator
    mode : 'r', 'w', or 'a'
        the mode for the file
    """
    has_simstore_patch = False
    def __init__(self, param, mode):
        super(StorageLoader, self).__init__(param)
        self.mode = mode

    @staticmethod
    def _is_simstore(name):
        return name.endswith(".db") or name.endswith(".sql")

    def _workaround(self, name):
        # this is messed up... for some reason, storage doesn't create a new
        # file in append mode. That may be a bug
        import openpathsampling as paths
        needs_workaround = (
            self.mode == 'a'
            and not os.path.exists(name)
            and not self._is_simstore(name)
        )
        if needs_workaround:
            st = paths.Storage(name, mode='w')
            st.close()

    def get(self, name):
        if self._is_simstore(name):
            import openpathsampling as paths
            from openpathsampling.experimental.storage import \
                    Storage, monkey_patch_all

            if not self.has_simstore_patch:
                paths = monkey_patch_all(paths)
                paths.InterfaceSet.simstore = True
                StorageLoader.has_simstore_patch = True

            from openpathsampling.experimental.simstore import \
                SQLStorageBackend
            backend = SQLStorageBackend(name, mode=self.mode)
            storage = Storage.from_backend(backend)
        else:
            from openpathsampling import Storage
            self._workaround(name)
            storage = Storage(name, self.mode)
        return storage


class OPSStorageLoadNames(AbstractLoader):
    """Simple loader that expects its input to be a name or index.

    Parameters
    ----------
    param : :class:`.AbstractParameter`
        the Option or Argument wrapping a click decorator
    store : Str
        the name of the store to search
    """
    def __init__(self, param, store):
        super(OPSStorageLoadNames, self).__init__(param)
        self.store = store

    def get(self, storage, names):
        """Get the names from the storage

        Parameters
        ----------
        storage : :class:`openpathsampling.Storage`
            storage file to search in
        names : List[Str]
            names or numbers (as string) to use as keys to load from
            storage

        Returns
        -------
        List[Any] :
            the desired objects
        """
        int_corrected = []
        for name in names:
            try:
                name = int(name)
            except ValueError:
                pass
            int_corrected.append(name)

        return [getattr(storage, self.store)[name]
                for name in int_corrected]


class Getter(object):
    """Abstract strategy for getting things from storage

    Parameters
    ----------
    store_name : Str
        the name of the storage to search
    """
    def __init__(self, store_name):
        self.store_name = store_name

    def _get(self, storage, name):
        store = getattr(storage, self.store_name)
        try:
            return store[name]
        except:
            return None


class GetByName(Getter):
    """Strategy using the CLI input as name for a stored item"""
    def __call__(self, storage, name):
        return self._get(storage, name)


class GetByNumber(Getter):
    """Strategy using the CLI input as numeric index of the stored item"""
    def __call__(self, storage, name):
        try:
            num = int(name)
        except:
            return None

        return self._get(storage, num)


class GetPredefinedName(Getter):
    """Strategy predefining name and store, allow default names"""
    def __init__(self, store_name, name):
        super().__init__(store_name=store_name)
        self.name = name

    def __call__(self, storage):
        return self._get(storage, self.name)


class GetOnly(Getter):
    """Strategy getting item from store if it is the only one"""
    def __call__(self, storage):
        store = getattr(storage, self.store_name)
        if len(store) == 1:
            return store[0]


class GetOnlyNamed(Getter):
    """Strategy selecting item from store if it is the only named item"""
    def __call__(self, storage):
        store = getattr(storage, self.store_name)
        named_things = [o for o in store if o.is_named]
        if len(named_things) == 1:
            return named_things[0]


class GetOnlySnapshot(Getter):
    """Strategy selecting only snapshot from a snapshot store"""
    def __init__(self, store_name="snapshots"):
        super().__init__(store_name)

    def _min_num_snapshots(self, storage):
        # For netcdfplus, we see 2 snapshots when there is only one
        # (reversed copy gets saved). For SimStore, we see only one.
        import openpathsampling as paths
        if isinstance(storage, paths.netcdfplus.NetCDFPlus):
            min_snaps = 2
        else:
            min_snaps = 1
        return min_snaps

    def __call__(self, storage):
        store = getattr(storage, self.store_name)
        min_snaps = self._min_num_snapshots(storage)
        if len(store) == min_snaps:
            return store[0]


def _try_strategies(strategies, storage, **kwargs):
    result = None
    for strategy in strategies:
        result = strategy(storage, **kwargs)
        if result is not None:
            return result


class OPSStorageLoadSingle(AbstractLoader):
    """Objects that expect to load a single object.

    These can sometimes include guesswork to figure out which object is
    desired. The details of how that guesswork is performed is determined
    by the strategy lists that are given.

    Parameters
    ----------
    param : :class:`.AbstractParameter`
        the Option or Argument wrapping a click decorator
    store : Str
        the name of the store to search
    value_strategies : List[Callable[(:class:`.Storage`, Str), Any]]
        The strategies to be used when the CLI provides a value for this
        parameter. Each should be a callable taking a storage and the string
        input from the CLI, and should return the desired object or None if
        it cannot be found.
    none_strategies : List[Callable[:class:`openpathsampling.Storage`, Any]]
        The strategies to be used when the CLI does not provide a value for
        this parameter. Each should be a callable taking a storage, and
        returning the desired object or None if it cannot be found.
    """
    def __init__(self, param, store, value_strategies=None,
                 none_strategies=None):
        super(OPSStorageLoadSingle, self).__init__(param)
        self.store = store
        if value_strategies is None:
            value_strategies = [GetByName(self.store),
                                GetByNumber(self.store)]
        self.value_strategies = value_strategies

        if none_strategies is None:
            none_strategies = [GetOnly(self.store),
                               GetOnlyNamed(self.store)]
        self.none_strategies = none_strategies

    def get(self, storage, name):
        """Load desired object from storage.

        Parameters
        ----------
        storage : openpathsampling.Storage
            the input storage to search
        name : Str or None
            string from CLI providing the identifier (name or index) for
            this object; None if not provided
        """
        if name is not None:
            result = _try_strategies(self.value_strategies, storage,
                                     name=name)
        else:
            result = _try_strategies(self.none_strategies, storage)

        if result is None:
            if name is None:
                msg = "Couldn't guess which item to use from " + self.store
            else:
                msg = "Couldn't find {name} in {store}".format(
                    name=name,
                    store=self.store
                )
            raise RuntimeError(msg)

        return result


class OPSStorageLoadMultiple(OPSStorageLoadSingle):
    """Objects that can guess a single object or have multiple specified.

    Parameters
    ----------
    param : :class:`.AbstractParameter`
        the Option or Argument wrapping a click decorator
    store : Str
        the name of the store to search
    value_strategies : List[Callable[(:class:`.Storage`, Str), Any]]
        The strategies to be used when the CLI provides a value for this
        parameter. Each should be a callable taking a storage and the string
        input from the CLI, and should return the desired object or None if
        it cannot be found.
    none_strategies : List[Callable[:class:`openpathsampling.Storage, Any]]
        The strategies to be used when the CLI does not provide a value for
        this parameter. Each should be a callable taking a storage, and
        returning the desired object or None if it cannot be found.
    """
    def get(self, storage, names):
        """Load desired objects from storage.

        Parameters
        ----------
        storage : openpathsampling.Storage
            the input storage to search
        names : List[Str] or None
            strings from CLI providing the identifier (name or index) for
            this object; None if not provided
        """
        if names == tuple():
            names = None

        if names is None or isinstance(names, (str, int)):
            listified = True
            names = [names]
        else:
            listified = False

        results = [super(OPSStorageLoadMultiple, self).get(storage, name)
                   for name in names]

        if listified:
            results = results[0]

        return results