Ptrskay3/PySprint

View on GitHub
pysprint/utils/decorators.py

Summary

Maintainability
A
0 mins
Test Coverage
import sys
import time
import threading
import re
from itertools import cycle
from functools import wraps, lru_cache
from copy import copy

import numpy as np

from pysprint.config import _get_config_value
from pysprint.utils.misc import run_from_ipython


__all__ = [
    '_progress',
    'inplacify',
    '_mutually_exclusive_args',
    '_lazy_property',
    'pprint_disp',
]


_inplace_doc = """\n\tinplace : bool, optional
            Whether to apply the operation on the dataset in an "inplace" manner.
            This means if inplace is True it will apply the changes directly on
            the current dataset and returns None. If inplace is False, it will
            leave the current object untouched, but returns a copy of it, and
            the operation will be performed on the copy. It's useful when
            chaining operations on a dataset.\n\n\t"""


def _has_parameter_section(method):
    try:
        return "Parameters" in method.__doc__
    except TypeError:
        return False


def _update_doc(method, doc):
    if _has_parameter_section(method):
        newdoc = _build_doc(method, doc)
        method.__doc__ = newdoc
    else:
        newdoc = "\n\tParameters\n\t----------" + _inplace_doc

        nodoc_head = (f"Docstring automatically created for {method.__name__}. "
                      "Parameter list may not be complete.\n")
        if method.__doc__ is not None:
            method.__doc__ = "".join([method.__doc__, newdoc])
        else:
            method.__doc__ = "".join([nodoc_head, newdoc])
        return


def _build_doc(method, param):
    patt = r"(\w+(?=\s*[-]{4,}[^/]))"  # finding sections
    split_doc = re.split(patt, method.__doc__)
    try:
        target = split_doc.index("Parameters") + 1
    except ValueError:
        return method.__doc__

    split_doc[target] = ''.join([split_doc[target].rstrip(), param])

    return ''.join(filter(None, split_doc))


def inplacify(method):
    """
    Decorator that allows a class function to be called
    as `inplace`. It will invalidate the parent
    object to have **only one** reference to the Dataset.
    """
    _update_doc(method, _inplace_doc)

    @wraps(method)
    def wrapper(self, *args, **kwds):
        inplace = kwds.pop("inplace", True)
        if inplace:
            method(self, *args, **kwds)
        else:
            new_ds = method(copy(self), *args, **kwds)

            # trigger a callback to ensure that relevant values
            # aren't dropped
            try:
                new_ds.callback(new_ds, new_ds.parent)
            except (TypeError, ValueError, AttributeError):
                pass

            # invalidate parent for the original obj.
            if hasattr(self, "parent") and self.parent is not None:
                self.parent._container.pop(self, None)
                self.parent = None
            return new_ds

    return wrapper


def _progress(func):
    active = threading.Lock()

    def spinning_pbar_printer():
        symbols = ['|', '/', '-', '\\', '\\']
        cursor = cycle(symbols)
        while active.locked():
            sys.stdout.write("\r")
            sys.stdout.write("Working... " + next(cursor))
            sys.stdout.flush()
            time.sleep(0.1)

    def wrapper(*args, **kwargs):
        t = threading.Thread(target=spinning_pbar_printer)
        active.acquire()
        t.start()
        try:
            res = func(*args, **kwargs)
        finally:
            active.release()
        return res

    return wrapper


# https://stackoverflow.com/a/54487188/11751294
def _mutually_exclusive_args(keyword, *keywords):
    """
    Decorator to restrict the user to specify exactly one of the given parameters.
    Often used for std and fwhm for Gaussian windows.
    """
    keywords = (keyword,) + keywords

    def wrapper(func):
        @wraps(func)
        def inner(*args, **kwargs):
            if sum(k in keywords for k in kwargs) != 1:
                raise TypeError(
                    "You must specify exactly one of {}.".format(" and ".join(keywords))
                )
            return func(*args, **kwargs)

        return inner

    return wrapper


def _lazy_property(f):
    return property(lru_cache()(f))


def pprint_disp(f):
    """
    Pretty print the dispersion results from returned arrays.
    """
    @wraps(f)
    def wrapping(*args, **kwargs):
        disp, disp_std, stri = f(*args, **kwargs)
        labels = ("GD", "GDD", "TOD", "FOD", "QOD", "SOD")
        disp = np.trim_zeros(disp, "b")
        disp_std = disp_std[: len(disp)]
        precision = _get_config_value("precision")
        for i, (label, disp_item, disp_std_item) in enumerate(
                zip(labels, disp, disp_std)
        ):
            if run_from_ipython():
                from IPython.display import display, Math # noqa

                display(
                    Math(f"{label} = {disp_item:.{precision}f} ± {disp_std_item:.{precision}f} fs^{i + 1}")
                )
            else:
                print(f"{label} = {disp_item:.{precision}f} ± {disp_std_item:.{precision}f} fs^{i + 1}")
        return disp, disp_std, stri

    return wrapping