cogniteev/docido-python-sdk

View on GitHub
docido_sdk/toolbox/edsl.py

Summary

Maintainability
A
3 hrs
Test Coverage
# encoding: utf-8
"""
Provides Python Embedded Domain Specific Languages.
"""
from collections import Mapping, Sequence
import operator

__all__ = ['kwargsql']


class AnySequenceResult(Sequence):
    """Custom list used internally to distinguish
    a list of user-data from a user-data-list with the `isinstance`
    builtin.
    """
    def __init__(self, data, join_operation):
        self.__data = data
        self.join_operation = join_operation

    def __len__(self):
        return len(self.__data)

    def __getitem__(self, key):
        return self.__data[key]


class kwargsql(object):
    """Query your Python objects with a `kwargs` syntax.

    Syntax looks like the Mongoengine syntax to query documents.

    Keys in the keyword argument specifies:

    - the attribute's location to test
    - the operation to perform, returning either `True` or `False`
      - first operand is the attribute value
      - second operand if the associated key value.

    For instance:
    - foo=42
    - foo__lt=43

    An extra `not` operator can be specified before the operation,
    for instance: foo__not__gt=0

    To access nested attributes, you can use the '__' separator,
    for instance "foo__bar". This syntax allows you to select:
        - an object attribute
        - value associated to a key in a `dict`.

    It is also possible to select an item of a list, by providing
    the position.


    Example:

    >>> d = {
    >>>    'foo': [
    >>>        { 'bar': 'pika' },
    >>>        { 'bar': 42 },
    >>>    ],
    >>> }
    >>> kwargsql.and_(d, foo__1__bar_gt=41, foo__size=2)
    True
    >>>

    Available operators are as follows:

    ne – not equal to
    lt – less than
    lte – less than or equal to
    gt – greater than
    gte – greater than or equal to
    not – negate a standard check, may be used before other operators
          (e.g. Q(age__not__mod=5))
    in – value is in list (a list of values should be provided)
    nin – value is not in list (a list of values should be provided)
    size – the size of the array, dict, or string is
    exists – value for field exists

    iexact – string field exactly matches value (case insensitive)
    contains – string field contains value
    icontains – string field contains value (case insensitive)
    startswith – string field starts with value
    istartswith – string field starts with value (case insensitive)
    endswith – string field ends with value
    iendswith – string field ends with value (case insensitive)

    isinstance – same as isinstance(field, value)
    issubclass – same as issubclass(field, value)

    any – applies the remaining kwargsql expression to every elements
          of a sequence. The result is `True` if the operation is `True`
          for at least one element.

    each – applies the remaining kwargsql expression to every elements
           of a sequence. The result is `True` if the operation is `True`
           for every element of the sequence.
    """
    OPERATIONS = {
        'ne': operator.ne,
        'lt': operator.lt,
        'lte': operator.le,
        'gt': operator.gt,
        'gte': operator.ge,
        'in': lambda e, c: e in c,
        'nin': lambda e, c: e not in c,
        'size': lambda c, e: len(c) == e,
        'exists': lambda e, cond: e is not None if cond else e is None,
        'iexact': lambda s, e: s.lower() == e.lower(),
        'contains': lambda s, e: e in s,
        'icontains': lambda s, e: e.lower() in s.lower(),
        'startswith': lambda s, e: s.startswith(e),
        'istartswith': lambda s, e: s.lower().startswith(e.lower()),
        'endswith': lambda s, e: s.endswith(e),
        'iendswith': lambda s, e: s.lower().endswith(e.lower()),
        'isinstance': isinstance,
        'issubclass': issubclass,
    }

    SEQUENCE_OPERATIONS = dict(
        any=operator.or_,
        each=operator.and_,
    )

    @classmethod
    def and_(cls, obj, **kwargs):
        """Query an object

        :param obj:
          object to test

        :param kwargs: query specified in kwargssql

        :return:
          `True` if all `kwargs` expression are `True`, `False` otherwise.
        :rtype: bool
        """
        return cls.__eval_seqexp(obj, operator.and_, **kwargs)

    @classmethod
    def or_(cls, obj, **kwargs):
        """Query an object

        :param obj:
          object to test

        :param kwargs: query specified in kwargssql

        :return:
          `True` if at leat one `kwargs` expression is `True`,
          `False` otherwise.
        :rtype: bool
        """
        return cls.__eval_seqexp(obj, operator.or_, **kwargs)

    @classmethod
    def xor(cls, obj, **kwargs):
        """Query an object.

        :param obj:
          object to test

        :param kwargs: query specified in kwargssql

        :return:
          `True` if exactly one `kwargs` expression is `True`,
          `False` otherwise.
        :rtype: bool
        """
        return cls.__eval_seqexp(obj, operator.xor, **kwargs)

    @classmethod
    def get(cls, obj, expr):
        """Parse a kwargsql string expression, and return
        the target value in given object.

        Not sure if really needed, except when using kwargsql
        expressions in YAML files for instance.

        :param obj:
          navigation starting point

        :param basestring expr:
          kwargsql expression.

        :return:
          object pointed out by the expression.
        """
        return cls.__resolve_path(obj, expr.split('__'))

    @classmethod
    def _get_operation(cls, opname):
        """Get operation from its name.
        You can override this class method to provide additional
        operations.

        :param basestring opname:
          operation name

        :return:
          binary operator if found, `None` otherwise
        :rtype: callable object
        """
        return cls.OPERATIONS.get(opname)

    @classmethod
    def __resolve_path(cls, obj, path):
        """Follow a kwargsql expression starting from a given object
        and return the deduced object.

        :param obj: the object to start from
        :param list path: list of operations to perform. It does not contain
                          the optional operation of a traditional kwargsql
                          expression.
        :return: the found object if any, `None` otherwise.

        For instance:
        >>> __resolve_path(dict(foo=dict(bar=42)), ['foo', 'bar'])
        >>> 42

        """
        path = filter(lambda s: s, path)
        if any(path):
            pathes = len(path)
            i = 0
            while i < pathes:
                # _get_obj_attr can supersede `i` because it might
                # evaluate the entire expression by itself.
                obj, i = cls._get_obj_attr(obj, path, i)
                i += 1
        else:
            raise Exception("Nothing to do")
        return obj

    @classmethod
    def __eval_seqexp(cls, obj, op, **kwargs):
        return reduce(
            op,
            [cls._eval_exp(obj, exp, value)
             for (exp, value) in kwargs.items()]
        )

    @classmethod
    def _get_obj_attr(cls, obj, path, pos):
        """Resolve one kwargsql expression for a given object and returns
        its result.

        :param obj: the object to evaluate
        :param path: the list of all kwargsql expression, including those
                     previously evaluated.
        :param int pos: provides index of the expression to evaluate in the
                        `path` parameter.
        """
        field = path[pos]
        if isinstance(obj, (dict, Mapping)):
            return obj[field], pos
        elif isinstance(obj, (list, Sequence)):
            join_operation = cls.SEQUENCE_OPERATIONS.get(field)
            if join_operation is not None:
                return (
                    AnySequenceResult(
                        cls._sequence_map(obj, path[pos + 1:]),
                        join_operation
                    ),
                    len(path) + 1,
                )
            else:
                return obj[int(field)], pos
        else:
            return getattr(obj, field, None), pos

    @classmethod
    def _sequence_map(cls, seq, path):
        """Apply a kwargsql expression to every item of a sequence,
        and returns it.

        :param seq: the list to transform
        :param path: kwargsql expression to apply to every elements of
                     the given sequence.
        """
        if not any(path):
            # There is no further kwargsql expression
            return seq
        result = []
        for item in seq:
            try:
                result.append(cls.__resolve_path(item, path))
            except (KeyError, IndexError):
                pass
        return result

    @classmethod
    def _not(cls, op):
        def wrap(*args, **kwargs):
            return not(op(*args, **kwargs))
        return wrap

    @classmethod
    def _eval_exp(cls, obj, exp, value):
        op = operator.eq
        tokens = exp.split('__')[::-1]
        _op = cls._get_operation(tokens[0])
        if _op is not None:
            # this is the operator
            op = _op
            tokens = tokens[1:]
        if tokens[0] == 'not':
            op = cls._not(op)
            tokens = tokens[1:]
        try:
            computed = cls.__resolve_path(obj, reversed(tokens))
        except (KeyError, IndexError):
            computed = None
        if isinstance(computed, AnySequenceResult):
            data = [op(item, value) for item in computed]
            if len(data):
                return reduce(
                    computed.join_operation,
                    data,
                )
            return False
        else:
            return op(computed, value)