reimandlab/Visualistion-Framework-for-Genome-Mutations

View on GitHub
website/helpers/filters/manager.py

Summary

Maintainability
C
1 day
Test Coverage
import re
from collections import namedtuple, defaultdict

from sqlalchemy import and_

from helpers.utilities import is_iterable_but_not_str


def quote_if_needed(value):
    if type(value) is not str:
        return value
    q_char = FilterManager.quote_char
    if FilterManager.sub_value_separator in value:
        if not (value.startswith(q_char) and value.endswith(q_char)):
            return q_char + value + q_char
    return value


def split_with_quotation(value):
    from csv import reader

    for v in reader(
        [value],
        delimiter=FilterManager.sub_value_separator,
        quotechar=FilterManager.quote_char
    ):
        return v


def unqoute(value):
    q_char = FilterManager.quote_char
    if value.startswith(q_char) and value.endswith(q_char):
        return value[1:-1]
    return value


def joined_query(query, required_joins, limit_to=None):
    already_joined = set()
    for joins in required_joins:
        for join in joins:
            if limit_to and join not in limit_to:
                continue
            if join not in already_joined:
                from sqlalchemy.exc import InvalidRequestError
                try:
                    query = query.join(join)
                    already_joined.add(join)
                except InvalidRequestError:
                    pass
    return query


class FilterManager:
    """Main class used to parse & apply filters' data specified by request.

    All allowed filters have to be registered during initialization."""

    # An example of separators use:
    # mutation.frequency:gt:0.2;mutation.cancer:in:KIRC,COAD
    filters_separator = ';'
    field_separator = ':'
    sub_value_separator = ','
    quote_char = '\''

    # a shorthand for structure to update filters
    UpdateTuple = namedtuple('UpdateTuple', ['id', 'comparator', 'value'])

    def __init__(self, filters):
        # let each filter know where to seek data about other filters
        # (so one filter can relay on state of other filters,
        # eg. be active conditionally, if another filter is set).

        for filter_ in filters:
            filter_.manager = self

        # store filters as dict of filter_id -> filter for quick access
        self.filters = {
            filter_.id: filter_
            for filter_ in filters
        }

    def prepare_filters(self, target=None):

        to_apply_manually = []
        query_filters = []
        all_required_joins = []

        for the_filter in self._filters_to_apply_to(target):

            if the_filter.has_sqlalchemy:

                the_target = target if target else the_filter.targets[0]

                as_sqlalchemy, required_joins = the_filter.as_sqlalchemy(the_target)
                if as_sqlalchemy is not None:
                    query_filters.append(as_sqlalchemy)
                    all_required_joins.append(required_joins)

            else:
                to_apply_manually.append(the_filter)

        return query_filters, to_apply_manually, all_required_joins

    def build_query(self, target, custom_filter=None, query_modifier=None):
        """There are two strategies of using filter manager:

            - you can get results from database and walk through
              every element in the list, or
            - you can build a query and move some job to the database;
              not always it is possible though.
        """
        query_filters, to_apply_manually, required_joins = self.prepare_filters(target)

        query_filters_sum = and_(*query_filters)

        if custom_filter:
            query_filters_sum = custom_filter(query_filters_sum)

        query = joined_query(target.query, required_joins)
        query = query.filter(query_filters_sum)

        if query_modifier:
            query = query_modifier(query)

        return query, to_apply_manually

    def query_all(self, target, custom_filter=None, query_modifier=None):
        """Retrieve all objects of type 'target' which
        match criteria of currently active filters.
        """

        query, to_apply_manually = self.build_query(target, custom_filter, query_modifier)

        return self.apply(query, to_apply_manually)

    def query_count(self, target, custom_filter=None, query_modifier=None):
        """Retrieve count of all objects of type 'target' which
        match criteria of currently active filters.
        """

        query, to_apply_manually = self.build_query(target, custom_filter, query_modifier)

        if not to_apply_manually:
            return query.with_entities(target.id).count()
        else:
            return len(self.apply(query, to_apply_manually))

    def apply(self, elements, filters_subset=None, itemgetter=None):
        """Apply all appropriate filters to given list of elements.

        Only filters targeting the same model and being currently active will
        be applied. The target model will be deduced from passed elements.
        """
        try:
            tester = elements[0]
            if itemgetter:
                tester = itemgetter(tester)

            target_type = type(tester)
        except IndexError:
            # the list was empty or was emptied before we were able to
            # investigate the type of targeted objects
            return []

        if filters_subset is not None:
            filters = filters_subset
        else:
            filters = self._filters_to_apply_to(target_type)

        for filter_ in filters:
            elements = filter_.apply(elements, itemgetter)

        return list(elements)

    def _filters_to_apply_to(self, target=None):
        """Return filters that are active and can be applied to given target.

        If a filter is set to be omitted by `skip_if_default=True`, it will be
        filtered if the current value of the filter is the save as the default.
        """
        return [
            filter_
            for filter_ in self._active_and_applicable_filters(target)
            if not (filter_.skip_if_default and set(filter_.default) == set(filter_.value))
        ]

    def _active_and_applicable_filters(self, target=None):
        """Return filters which are active & are applicable to given target.

        If no target is given, all active filters will be returned.
        """
        catch_all_active_filters = not target
        return [
            filter_
            for filter_ in self.filters.values()
            if (
                (catch_all_active_filters or target in filter_.targets)
                and
                filter_.is_active
            )
        ]

    def get_value(self, filter_id):
        """Return value of filter with specified identifier."""
        return self.filters[filter_id].value

    def update_from_request(self, request, raise_on_forbidden=True):
        """Set states of child filters to match those specified in request.

        The query part of request will be looked upon to get filter's data, in
        one of two available formats: modern or fallback.

        For details see _parse_request() method in this class.

        Returns:
            tuple: skipped, rejected

            skipped: updates skipped, from unrecognized filters
            rejected: updates skipped, from forbidden values (if raise_on_forbidden was False)
        """
        filter_updates = self._parse_request(request)

        skipped = []
        rejected = defaultdict(list)

        for update in filter_updates:
            if update.id not in self.filters:
                skipped.append(update)
                continue

            rejected_updates = self.filters[update.id].update(
                self._parse_value(update.value),
                self._parse_comparator(update.comparator),
                raise_on_forbidden=raise_on_forbidden
            )

            if rejected_updates:
                rejected[update.id].extend(rejected_updates)

        return skipped, rejected

    def _parse_fallback_query(self, args):
        """Parse query in fallback format."""

        re_value = re.compile(r'filter\[([\w.]+)\]')
        re_cmp = re.compile(r'filter\[([\w.]+)\]\[cmp\]')

        filters = defaultdict(lambda: defaultdict(list))

        for key, value in args.items():

            match = re_value.fullmatch(key)
            if match:
                name = match.group(1)
                filters[name]['value'] = value

            match = re_cmp.fullmatch(key)
            if match:
                name = match.group(1)
                filters[name]['cmp'] = value

        filters_list = [
            [
                filter_name,
                data.get('cmp', None),    # allow not specifying comparator -
                # if so, we will use default comparator.
                self._repr_value(data.get('value'))
            ]
            for filter_name, data in filters.items()
        ]
        return filters_list

    def _parse_request(self, request):
        """Extract and normalize filters' data from Flask's request object.

        Arguments will be extracted from request.args or request.form.

        If arguments contain 'clear_filters' command,
        all other arguments will be ignored.

        Two formats for specifying filters are available:
            modern request format:
                Model.filters=is_ptm:eq:True;Model.frequency:gt:2
            fallback format:
                filter[Model.is_ptm]=True&filter[Model.frequency]=2&\
                filter[Model.frequency][cmp]=gt&fallback=True

        where fallback format is what will be generated by browsers after
        posting HTML form and modern format is designed to be used with AJAX
        requests, to create readable REST interfaces and for internal usage.
        """
        if request.method == 'GET':
            args = request.args
        else:
            args = request.form

        if args.get('clear_filters'):
            filters_list = self._parse_fallback_query({})
        elif args.get('fallback'):
            data = {
                key: value[0] if isinstance(value, list) and len(value) == 1 else value
                for key, value in args.to_dict(flat=False).items()
            }
            filters_list = self._parse_fallback_query(data)
        else:
            filters_list = self._parse_string(
                args.get('filters', '')
            )

        return [
            self.UpdateTuple(*filter_update)
            for filter_update in filters_list
        ]

    def _parse_string(self, filters_string):
        """Split modern query string into list describing filter's settings."""
        raw_filters = filters_string.split(self.filters_separator)

        # remove empty strings
        raw_filters = filter(bool, raw_filters)

        filters_list = [
            filter_update.split(self.field_separator, maxsplit=2)
            for filter_update in raw_filters
        ]
        return filters_list

    @staticmethod
    def _parse_comparator(comparator):
        if comparator == 'None':
            return None
        return comparator

    def _parse_value(self, value, do_not_split=False):
        """Safely parse value from string, without eval.

        For sub-values quotations will be respected.
        """

        if value == 'True':
            return True
        elif value == 'False':
            return False
        elif value == 'None':
            return None
        elif value.isdigit():
            return int(value)

        value = value.replace('+', ' ')

        splitted = split_with_quotation(value)

        if not do_not_split and len(splitted) > 1:
            return [self._parse_value(v, True) for v in splitted]

        value = unqoute(value)

        return value

    def _repr_value(self, value):
        """Return string representation of given value (of a filter)."""
        if is_iterable_but_not_str(value):
            return self.sub_value_separator.join([
                quote_if_needed(str(v))
                for v in value
            ])

        return str(value)

    def url_string(self, expanded=False):
        """String representation of filters from the set for use in URL address.

        Produced string is ready to be included as a query argument in Flask's
        `url_for` func. If no string has been produced, None will be returned.

        Args:
            expanded:
                should all active filters be included
                (also those with value set to default)
        """
        return self.filters_separator.join(
            [
                self.field_separator.join(
                    map(str, [
                        f.id,
                        f.comparator,
                        self._repr_value(f.value)
                    ])
                )
                for f in self.filters.values()
                if f.is_active and (
                    f.value != f.default
                    or expanded
                )
            ]
        ) or None   # if empty string has been built, return None to indicate
        # that there is really nothing interesting (keeps address clean when
        # result is being passed as a keyword arg to flask's url_for function)

    def reset(self):
        """Reset values of child filters to bring them into a neutral state."""
        for filter_ in self.filters.values():
            filter_._value = None

    def reformat_request_url(self, request, endpoint, *args, **kwargs):
        from flask import url_for
        from flask import redirect
        from flask import current_app

        if request.args.get('fallback'):

            scheme = current_app.config.get('PREFERRED_URL_SCHEME', 'http')

            url = url_for(endpoint, *args,  _external=True, _scheme=scheme, **kwargs)

            filters = self.url_string()

            args = dict(request.args)
            # filters might be empty if
            # (e.g. if all are pointing to default values)
            if filters:
                args['filters'] = [filters]

            query_string = '&'.join(
                [
                    arg + '=' + (
                        value[0]
                        if (isinstance(value, list) and len(value) == 1) else
                        value
                    )
                    for arg, value in args.items()
                    if not (arg.startswith('filter[') or arg == 'fallback')
                ]
            )
            # add other arguments
            if query_string:
                url += '?' + query_string

            return redirect(url)