reimandlab/Visualistion-Framework-for-Genome-Mutations

View on GitHub
website/helpers/views.py

Summary

Maintainability
B
5 hrs
Test Coverage
from copy import copy

from flask import jsonify
from flask import request
from sqlalchemy import and_
from sqlalchemy import asc
from sqlalchemy import desc
from sqlalchemy.exc import StatementError
from sqlalchemy.ext.associationproxy import AssociationProxy
from database import db, fast_count
from helpers.filters.manager import joined_query

ordering_functions = {
    'desc': desc,
    'asc': asc
}


def json_results_mapper(result):
    return result.to_json()


class ModelCounter:

    def __init__(self, model):
        self.query = model.query

    def count(self):
        return fast_count(self.query)

    def filter(self, *args, **kwargs):
        self.query = self.query.filter(*args, **kwargs)
        return self


class AjaxTableView:
    """View returning data in JSON format, compatible with Bootstrap-Table.

    Functions returned be any of static methods are accepting `self` argument,
    hence it are suitable for binding with Flask-Classful class-based views.
    """

    @staticmethod
    def from_model(model, **kwargs):
        """Create TableView from an sqlalchemy model."""

        def prepare_for_sorting(query, sorted_field_name):
            sorted_field = getattr(model, sorted_field_name)

            if type(sorted_field) is AssociationProxy:
                remote_model = (
                    sorted_field.remote_attr.property.
                    parent.class_
                )
                query = query.join(remote_model, sorted_field.local_attr)
                sorted_field = sorted_field.remote_attr

            return query, sorted_field

        return AjaxTableView.from_query(
            # Workaround for FSA 2.3 which does something weird to Model.query
            # query=model.query,
            query=lambda filters, joins: joined_query(model.query, joins),
            prepare_for_sorting=prepare_for_sorting,
            count_query=lambda filters, joins: ModelCounter(model),
            **kwargs
        )

    @staticmethod
    def from_query(
        query, count_query=None,
        results_mapper=json_results_mapper, filters_class=None,
        search_filter=None, search_sort=None,
        prepare_for_sorting=None, **kwargs
    ):
        """Create TableView from an sqlalchemy query object.

        Args:
            query:
                the base query, which will be modified later
                to include limit, offset and so on. If callable,
                should accept two lists: of sqlalchemy filters and
                of joins which are required to use given filters.
                The required joins are generated by a filter manager
                (if optional argument `filter_class` is given).
            count_query:
                simplified query for counting (for faster pagination
                generation). If callable, should implement the same
                signature as `query`.
            results_mapper:
                function mapping results to JSON-serializable objects
            filters_class: filter manager class
            search_filter:
                function accepting a search phrase and returning
                an sqlalchemy filter relevant for this phrase
            search_sort:
                function accepting query, phrase and name of column
                used to sort results and returning modified query,
                and boolean indication if the query was modified.
            prepare_for_sorting:
                hook to modify query and sort key (sort column)

        Keyword Args:
            sort, search, order, offset and limit will be used
            to set the default values of table arguments.
        """

        default_args = {
            'sort': None,
            'search': None,
            'order': 'asc',
            'offset': 0,
            'limit': 25
        }

        default_args.update(kwargs)

        predefined_query = query
        predefined_count_query = count_query

        def ajax_table_view(self):

            args = copy(default_args)

            # update args from request
            for key, value in args.items():
                args[key] = request.args.get(key, value)

            ordering_function = ordering_functions.get(
                args['order'],
                lambda x: x
            )

            filters = []

            phrase = args['search']

            if phrase and search_filter:
                filters.append(search_filter(phrase))

            if filters_class:
                filters_manager = filters_class()
                divided_filters = filters_manager.prepare_filters()
                sql_filters, manual_filters, required_joins = divided_filters
                if manual_filters:
                    raise ValueError(
                        'From query can only apply filters implementing'
                        ' sqlalchemy interface'
                    )
            else:
                sql_filters = []
                required_joins = []

            if callable(predefined_query):
                query = predefined_query(sql_filters, required_joins)
            else:
                query = joined_query(predefined_query, required_joins)
                if filters_class:
                    filters += sql_filters

            sorted_by_search = False

            sort_key = args['sort']

            if sort_key and prepare_for_sorting:
                query, sort_key = prepare_for_sorting(query, sort_key)

            if phrase and search_sort:
                query, sorted_by_search = search_sort(query, phrase, sort_key, ordering_function)

            if not sorted_by_search and sort_key:
                query = query.order_by(
                    ordering_function(sort_key)
                )

            if callable(predefined_count_query):
                count_query = predefined_count_query(sql_filters, required_joins)
            elif predefined_count_query:
                count_query = joined_query(predefined_count_query, required_joins)
            else:
                count_query = query

            if filters:
                filters_conjunction = and_(*filters)
                query = query.filter(filters_conjunction)
                count_query = count_query.filter(filters_conjunction)

            try:
                count = count_query.count()
                query = query.limit(args['limit']).offset(args['offset'])
                elements = query
            except StatementError as e:
                db.session.rollback()
                print('Statement Error detected!', e)
                return jsonify({'message': 'Query error'})

            return jsonify({
                'total': count,
                'rows': [
                    results_mapper(element)
                    for element in elements
                ]
            })

        return ajax_table_view