anubis/views/state_view_mixin.py
# Copyright © 2016, Ugo Pozo
# 2016, Câmara Municipal de São Paulo
# state_view_mixin.py - a mixin for views that perform searches.
# This file is part of Anubis.
# Anubis is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# Anubis is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This module contains the StateViewMixin, a mixin designed for creating API views
that perform searches on the database.
"""
from collections import OrderedDict
from functools import reduce
from rest_framework.response import Response
from rest_framework import serializers as rest_serializers
from rest_framework.decorators import api_view
from rest_framework.renderers import JSONRenderer
from django.conf.urls import url
from django.contrib.auth.models import User
from django.core.paginator import Paginator
from django.core.exceptions import ImproperlyConfigured
from django.core.urlresolvers import reverse
from django.utils.translation import ugettext as _
from django.utils.cache import add_never_cache_headers
from django import forms
from anubis.aggregators import QuerySetAggregator, ListAggregator
from anubis.filters import ConversionFilter
from anubis.url import BooleanBuilder
from anubis.forms import FieldSerializer
class StateViewMixin:
"""A mixin that performs a search and adds the result to the view context.
This mixin reads from a supposedly pre-filled :attr:`kwargs` attribute,
performs a search on the database based on the supplied configuration and
search parameters provided, and attaches the result to the view context.
It's designed to be used as a mixin for either the Django's generic
:class:`ListView` or Django Rest Framework's generic :class:`ListAPIView`
- hence why it's a mixin and not a child class. Both of these classes will
fill the kwargs argument accordingly.
Specifically, :class:`StateViewMixin` should be explicitly used only with a
:class:`ListAPIView`. If you're building an HTML view, this class's child
:class:`AppViewMixin` will contain information that can prove to be
more useful.
:ivar is_paginated: Tells whether the search is to be paginated.
:vartype is_paginated: bool
:ivar is_multi_modeled: Tells whether the search acts in multiple models.
:vartype bool:
:ivar is_sortable: Tell whether the search can be sorted.
:vartype bool:
:ivar boolean_expression: Represents the requested search, if there is one.
:vartype Optional[rest_framework.serializers.ModelSerializer]:
Attributes:
base_url (str): Base URL for the application. Should either start with
a slash or with a protocol reference (i.e., "http://..."). It should
**not** end with a slash, though, and to reference the root URL you
should leave this attribute **empty**.
model: This attribute can either be a :class:`dict` for multi-model
searches or a direct reference to the Django model in single-model
searches. In a single-model setting, you can access the model class
from either `self.models['_default']` or `self.model`. In a
multi-model setting, `self.model` will point to the model of the
current search after `self.kwargs` is processed.
serializers: This attribute can either be a :class:`dict` for
multi-model searches or a 2-tuple of serializers, where the
first serializer applies to searches and the second to details. In
a multi-modeled enviroment, keys are to be the same as those in the
:attr:`model` attribute, and values are the aforementioned 2-tuple
of serializers.
default_model (Optional[str]): In non-search requests in multi-modeled
views there is no pre-selected model, which causes Django's
:method:`ListView.get_queryset` to go haywire. Use this property to
define a default model for searching, which also will be
pre-selected in the search interface. The value of this property is
the key of the default model inside the :attr:`model` dictionary.
model_parameter (str): The parameter *name* (from Django URL matching)
which contains the key of the model in the :class:`dict` on the
:attr:`model` attribute. Ignored in single-model searches.
expression_parameter (str): The parameter *name* (from Django URL
matching) which contains the boolean expression with which to build
a :class:`QuerySet` to search the database.
filters (Dict[str, anubis.filters.Filter]): Allowed filters to perform
searches. In a multi-model environment, there should be another
level of nesting, and the top-level keys refers to the model keys.
objects_per_page (Optional[int]): The number of objects per page of
search. If it's set to :const:`None`, turns pagination off.
page_parameter (str): The parameter *name* (from Django URL matching)
which contains the number of the page to retrieve. If
:attr:`objects_per_page` is set to :const:`None`, this attribute is
ignored.
user_serializer (Optional[rest_framework.serializers.Serializer]):
Serializer for the user model. Set this to :const:`None` if you
want to disable user related functionality.
details_parameter (str): The parameter *name* (from Django URL
matching) which contains the ID of the record being displayed.
The Anubis' search interface will display this as a modal dialog
above the search results (if there are any).
details_slug (str): A slug to put on URLs when retrieving details.
search_slug (str): A slug to put on URLs when performing searches.
sorting_options (List[str]): A list of possible sorting options. Pass
:const:`None` to disallow sorting.
default_filter (str): The default filter to perform a search if text
is entered on the filter selector area and a search is performed.
If :const:`None`, defaults to the first key of the first model.
allow_client_cache (boolean): Whether the answer should allow
client-side caching. Defaults to :const:`False`.
"""
base_url = ""
api_prefix = "api"
model = None
default_model = None
model_parameter = "model"
serializers = None
expression_parameter = "search"
filters = {}
default_filter = None
allow_client_cache = False
objects_per_page = None
page_parameter = "page"
details_parameter = "details"
details_slug = "details"
search_slug = "search"
sorting_options = None
sorting_parameter = "sorted_by"
sorting_default = None
sidebar_links = []
sidebar_links_title = "More links"
actions = None
group_by = None
class _UserSerializer(rest_serializers.ModelSerializer):
profile_link = rest_serializers.SerializerMethodField()
class Meta:
model = User
fields = ('username', 'first_name', 'last_name', 'email',
'profile_link')
def get_profile_link(self, obj):
return reverse('admin:auth_user_change', args=[obj.id])
user_serializer = _UserSerializer
@classmethod
def autocomplete_model(cls, model, filter_, describe=None):
model_name = model.__name__.lower()
if describe is None:
describe = str
@api_view(['GET'])
def view(request, needle):
queryset = filter_.filter_queryset(model.objects.all(), [needle])
return Response([[record.id, describe(record)]
for record in queryset])
url_string = (r"^{api_prefix}/anubis_autocomplete/{model_name}/"
r"(?P<needle>.*)").format(**{
"api_prefix": cls.api_prefix,
"model_name": model_name,
})
url_name = "anubis_autocomplete_{model_name}" \
.format(model_name=model_name)
return url(url_string, view, name=url_name)
@classmethod
def autocomplete_url(cls, app, model):
model_name = model.__name__.lower()
url_name = "{app}:anubis_autocomplete_{model_name}" \
.format(app=app, model_name=model_name)
return reverse(url_name, kwargs={'needle': ""})
@classmethod
def _is_multi_modeled(cls):
return not isinstance(cls.model, type)
@classmethod
def _is_paginated(cls):
return cls.objects_per_page is not None
@classmethod
def _is_sortable(cls):
return cls.sorting_options is not None
@classmethod
def _url_part_details_id(cls):
return r'(?P<{}>[^/,"]+)'.format(cls.details_parameter)
@classmethod
def _url_part_model(cls):
if cls._is_multi_modeled():
models = "|".join(dict(cls.model).keys())
return r"(?P<{}>{})/".format(cls.model_parameter, models)
else:
return ""
@classmethod
def _url_part_page(cls):
if cls._is_paginated():
return r"(?P<{}>\d+)/".format(cls.page_parameter)
else:
return ""
@classmethod
def _url_part_sort(cls):
if cls._is_sortable():
if cls._is_multi_modeled():
options = [option[0] \
for options in cls.sorting_options.values() \
for option in options]
else:
options = [option[0] for option in cls.sorting_options]
sorting = "|".join(set(options))
return r"(?P<{}>[+-]({}))/".format(cls.sorting_parameter, sorting)
else:
return ""
@classmethod
def _url_part_expr(cls):
return r"(?P<{}>.*)".format(cls.expression_parameter)
@classmethod
def url_search(cls, app_prefix=None, **kwargs):
if app_prefix is not None:
app_prefix = "{}_".format(app_prefix)
else:
app_prefix = ""
name = "{}api_search".format(app_prefix)
search_url = "^{api_prefix}/{search_slug}/{m}{p}{s}{expr}$".format(**{
"api_prefix": cls.api_prefix,
"search_slug": cls.search_slug,
"m": cls._url_part_model(),
"p": cls._url_part_page(),
"s": cls._url_part_sort(),
"expr": cls._url_part_expr()
})
return url(search_url, cls.as_view(**kwargs), name=name)
@classmethod
def url_details(cls, app_prefix=None, **kwargs):
if app_prefix is not None:
app_prefix = "{}_".format(app_prefix)
else:
app_prefix = ""
name = "{}api_details".format(app_prefix)
details_url = "^{api_prefix}/{details_slug}/{m}{id}$".format(**{
"api_prefix": cls.api_prefix,
"details_slug": cls.details_slug,
"m": cls._url_part_model(),
"id": cls._url_part_details_id()
})
return url(details_url, cls.as_view(**kwargs), name=name)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.is_paginated = self._is_paginated()
self.is_multi_modeled = self._is_multi_modeled()
self.is_sortable = self._is_sortable()
self.boolean_expression = None
self.action_result = None
self.pagination_data = None
self._sorting = {
"by": None,
"ascending": True
}
if self.is_multi_modeled:
self._model_order = [model for model, _ in self.model]
self._model_lookup = dict(self.model)
self.model = None
self._serializer_lookup = self.serializers
self.serializers = None
else:
self._model_order = ["_default"]
self._model_lookup = {'_default': self.model}
self._model_key = "_default"
def get(self, request, *args, **kwargs):
self._prepare_attributes()
response = super().get(request, *args, **kwargs)
return self.set_headers(response)
def post(self, *args, **kwargs):
self._prepare_attributes()
self.perform_actions()
response = super().get(*args, **kwargs)
return self.set_headers(response)
def set_headers(self, response):
if not self.allow_client_cache:
add_never_cache_headers(response)
return response
def perform_actions(self):
action_name = self.request.POST.get('action_name', None)
action = self.actions.get(action_name, None)
self.action_result = {
'success': False,
'result': None,
'error': None
}
if action is None:
self.action_result['error'] = "Action not found - {}" \
.format(action_name)
return
if 'permissions' in action.keys() and action['permissions'] is not None:
if not self.request.user.is_authenticated():
self.action_result['error'] = "Usuário não autenticado."
return
if not all([self.request.user.has_perm(p)
for p in action['permissions']]):
self.action_result['error'] = ("Usuário não possui as "
"permissões necessárias.")
return
if self.is_multi_modeled and not self._model_key in action['models']:
self.action_result['error'] = ("Wrong model.")
return
form = forms.Form(self.request.POST)
form.fields['action_name'] = forms.CharField(required=True)
form.fields['object_list'] = forms.CharField(required=True)
for key, field in action['fields'].items():
form.fields[key] = field
if not form.is_valid():
self.action_result['error'] = form.errors
return
if not hasattr(self, 'action_{}'.format(action_name)):
self.action_result['error'] = "Wrong method."
return
method = getattr(self, 'action_{}'.format(action_name))
try:
success, result = method(form)
except RuntimeError as error:
self.action_result['error'] = error.args[0]
return
if success:
self.action_result['success'] = True
self.action_result['result'] = result
else:
self.action_result['error'] = result
def _paginate_queryset(self, queryset):
"""A simplified paginator. It doesn't have to be as generic as Django's
and DRF's are.
Args:
queryset (django.db.model.QuerySet): The queryset to paginate.
Returns:
django.core.paginator.Page: The current page object.
Raises:
ValueError: If you can't bother configure your URL pattern to only
accept \\d+ in your "page" argument, you deserve an error.
"""
paginator = Paginator(queryset, self.objects_per_page)
page = int(self.kwargs.get(self.page_parameter, 1))
return paginator.page(page)
def list(self, request, *args, **kwargs):
"""Overrides DRF's implementation of listing. It's awful.
DRF's implementation of listing is utterly inextensible. Both in 2.x
and 3.x trunks (which we plan to support) it generates a serializer
within the :method:`list` method and promptly throws it away, leaving
us with only an useless :class:`Response` object, originated from the
serializer's :attr:`data` attribute. We get no metadata from the
serializer unless we do it ourselves, and that's what we're doing here.
Args:
request (django.http.request.HttpRequest): A request object, passed
from Django.
*args: Ordered arguments from the URL parsing algorithm.
**kwargs: Named arguments from the URL parsing algorithm.
Returns:
rest_framework.response.Response: A JSON response including the
whole of Anubis' state.
"""
return Response(self.get_full_state())
def get_full_state(self):
self.object_list = self.get_queryset()
state = self.get_state()
return state
def get_serialized_queryset(self, queryset):
if self.group_by is None:
return self.get_serializer(queryset, many=True).data
groupers = [getattr(self, "group_by_{}".format(g), None)
for g in self.group_by]
group_count = len(groupers)
if None in groupers:
return self.get_serializer(queryset, many=True).data
data = OrderedDict()
for obj in queryset:
groups = [grouper(obj) for grouper in groupers]
branch = data
for i, group in enumerate(groups):
if i == group_count - 1:
leaf = branch.setdefault(group, [])
else:
branch = branch.setdefault(group, OrderedDict())
leaf.append(obj)
data, _ = self._tree_serialize(data, 0)
return data
def _tree_serialize(self, node, depth):
if isinstance(node, OrderedDict):
serialized_node = []
for group, subgroup in node.items():
subgroup, leaf = self._tree_serialize(subgroup, depth + 1)
serialized_node.append({
"__groupName": group,
"__leaf": leaf,
"__depth": depth,
"__items": subgroup
})
return serialized_node, False
else:
return self.get_serializer(node, many=True).data, True
def get_queryset(self):
try:
original = super().get_queryset()
except AssertionError:
original = self.model.objects.all()
if self.boolean_expression is not None:
queryset = self.get_queryset_filter(original)
else:
queryset = original.none()
queryset = self.sort_queryset(queryset)
queryset = self.paginate_queryset(queryset)
return queryset
def sort_queryset(self, queryset):
if not self.is_sortable or self.boolean_expression is None:
return queryset
sort_key = self.kwargs[self.sorting_parameter]
ascending = not sort_key[0] == "-"
sort_key = sort_key[1:]
options = self.sorting_options if not self.is_multi_modeled \
else self.sorting_options[self._model_key]
assert sort_key in [i[0] for i in options], \
"Sorting by {} is not allowed.".format(sort_key)
self._sorting['by'] = sort_key
self._sorting['ascending'] = ascending
sort_method = "sort_by_{}".format(sort_key) \
if not self.is_multi_modeled \
else "sort_{}_by_{}".format(self._model_key, sort_key)
return getattr(self, sort_method)(queryset, ascending)
def paginate_queryset(self, queryset):
if not self.is_paginated or self.boolean_expression is None:
return queryset
page = self._paginate_queryset(queryset)
current_page = self.kwargs[self.page_parameter]
def get_from_and_to(page_number):
from_ = (page_number - 1) * self.objects_per_page + 1
to_ = min(page_number * self.objects_per_page,
page.paginator.count)
if page.paginator.count == 0:
from_ = 0
return (page_number, from_, to_)
all_pages = [get_from_and_to(num) for num in page.paginator.page_range]
self.pagination_data = {
"currentPage": current_page,
"allPages": all_pages,
"recordCount": page.paginator.count,
"nextPageNumber": page.next_page_number() \
if page.has_next() else None,
"previousPageNumber": page.previous_page_number() \
if page.has_previous() else None,
}
return page.object_list
def get_serializer_class(self):
search_serializer, _ = self.serializers
return search_serializer
def get_details_serializer_class(self):
_, details_serializer = self.serializers
return details_serializer
def _prepare_attributes(self):
self.model = self.get_model()
self.serializers = self.get_serializers()
self.boolean_expression = self.get_boolean_expression()
def get_model(self):
if not self.is_multi_modeled:
return self.model
self._model_key = self.kwargs.get(self.model_parameter,
self.default_model)
return self._model_lookup[self._model_key]
def get_serializers(self):
if not self.is_multi_modeled:
self._serializer_lookup = {"_default": self.serializers}
return self.serializers
return self._serializer_lookup[self._model_key]
def get_boolean_expression(self):
expression = self.kwargs.get(self.expression_parameter, None)
if expression is None or expression == "":
return None
expression = expression.strip().rstrip("/")
try:
boolean = BooleanBuilder(expression).build()
except ValueError:
error = ValueError(_(("Check your expression for a missing "
"connector, for instance.")))
error.name = lambda: _("Syntax Error")
raise error
return boolean
def get_filters(self):
if not self.is_multi_modeled:
return self.filters
def check_conversion(filter_name, filter_):
if not isinstance(filter_, type) or \
not issubclass(filter_, ConversionFilter):
return filter_
base_filter = None
other_models = [key for key in self._model_lookup.keys() \
if key != self._model_key]
for model_name in other_models:
candidate = self.filters[model_name][filter_name]
if not isinstance(candidate, type) or \
not issubclass(candidate, ConversionFilter):
base_filter = candidate
break
if base_filter is None:
raise ImproperlyConfigured(('No base filter for filter "{}'
'" applying to model "{}".').
format(filter_name,
self._model_key))
return filter_(base_filter)
filters = self.filters[self._model_key]
return {filter_name: check_conversion(filter_name, filter_) \
for filter_name, filter_ in filters.items()}
def get_queryset_filter(self, queryset):
"""Gets a queryset and use the current :attr:`boolean_expression` to
filter it.
Args:
queryset (django.db.models.QuerySet): A queryset to be filtered
by the current :attr:`boolean_expression`.
Returns:
django.db.models.QuerySet: The filtered queryset.
"""
aggregator = QuerySetAggregator(queryset, self.get_filters())
return self.boolean_expression.traverse(aggregator)
def get_state(self):
anubis_state = {
"searchResults": self.get_search_results(),
"details": self.get_details(),
}
return anubis_state
def get_details(self):
details_id = self.kwargs.get(self.details_parameter, None)
if details_id is not None:
try:
details_obj = self.model.objects.get(pk=details_id)
except self.model.DoesNotExist:
details_obj = None
else:
context = {"request": self.request}
serializer = self \
.get_details_serializer_class()(details_obj,
context=context)
details_obj = serializer.data
return {"object": details_obj, "model": self._model_key}
else:
return None
def get_search_results(self):
# TODO: textExpression should be computed from the parsed
# boolean_expression.
aggregator = ListAggregator(self.get_filters())
visible = self.boolean_expression is not None
expression = self.boolean_expression.traverse(aggregator) \
if visible else []
for i, unit in enumerate(expression):
unit.update({"index": i})
results = {
"position": len(expression),
"expression": expression,
"textExpression": self.kwargs.get(self.expression_parameter, ""),
"pagination": self.get_pagination(),
"actions": self.get_actions() if visible else {},
"visible": visible,
"model": self._model_key,
"results": self.get_serialized_queryset(self.object_list),
"sorting": self.get_sorting(),
"selection": []
}
if self.action_result is not None:
results["actionResult"] = self.action_result
return results
def get_pagination(self):
return self.pagination_data
def get_actions(self):
if self.actions is None:
return {}
if self.user_serializer is None:
return {}
actions = {key: dict(a) for key, a in self.actions.items()
if (not self.is_multi_modeled or
self._model_key in a['models']) and (
a.get('permissions', None) is None or
self.request.user.is_authenticated() and all([
self.request.user.has_perm(p)
for p in a['permissions']])
)
}
for key in list(actions.keys()):
action = actions[key]
action['fields'] = {k: self.render_field(f)
for k, f in action['fields'].items()}
return actions
def get_sorting(self):
if self.is_sortable:
options = self.sorting_options if self.is_multi_modeled else {
"_default": self.sorting_options
}
default = self.sorting_default if self.is_multi_modeled else {
"_default": self.sorting_default
}
else:
options = None
default = None
return {
"available": options,
"current": self._sorting,
"default": default
}
def get_final_response(self, original):
return self.get_context_data()
def render_field(self, field):
return FieldSerializer(field).data