italia/spid-cie-oidc-django

View on GitHub
spid_cie_oidc/authority/views.py

Summary

Maintainability
A
2 hrs
Test Coverage
B
87%
import logging
import math
import urllib.parse
 
from djagger.decorators import schema
from django.conf import settings
from django.core.paginator import Paginator
from django.http import (
Http404,
HttpResponse,
JsonResponse,
QueryDict
)
from django.urls import reverse
from django.views.decorators.csrf import csrf_exempt
 
from spid_cie_oidc.authority.models import (
FederationDescendant,
FederationEntityAssignedProfile
)
from spid_cie_oidc.authority.settings import MAX_ENTRIES_PAGE
from spid_cie_oidc.entity.jwtse import (
unpad_jwt_head, unpad_jwt_payload
)
from spid_cie_oidc.entity.models import get_first_self_trust_anchor
from spid_cie_oidc.entity.utils import iat_now
 
from . schemas.fetch_endpoint_request import FetchRequest, FedAPIErrorResponse, FetchResponse
from . schemas.list_endpoint import ListRequest, ListResponse
from . schemas.advanced_entity_list_endpoint import AdvancedEntityListRequest, AdvancedEntityListResponse
from . schemas.trust_mark_status_endpoint import TrustMarkRequest, TrustMarkResponse
 
logger = logging.getLogger(__name__)
 
 
@schema(
methods=['GET'],
get_request_schema = {
"application/x-www-form-urlencoded": FetchRequest
},
get_response_schema = {
"400": FedAPIErrorResponse,
"404": FedAPIErrorResponse,
"200": FetchResponse
},
tags = ['Federation API']
)
Function `fetch` has a Cognitive Complexity of 9 (exceeds 5 allowed). Consider refactoring.
def fetch(request):
"""
All entities that are expected to publish entity statements
about other entities MUST expose a Fetch endpoint.
 
Fetching entity statements is performed to collect entity statements
one by one to gather trust chains.
 
To fetch an entity statement, an entity needs to know the identifier
of the entity to ask (the issuer), the fetch endpoint of that entity
and the identifier of the entity that you want the statement to be about (the subject).
"""
if request.GET.get("iss"):
iss = get_first_self_trust_anchor(sub=request.GET["iss"])
else:
iss = get_first_self_trust_anchor()
 
if not request.GET.get("sub"):
conf = get_first_self_trust_anchor()
if request.GET.get("format") == "json":
return JsonResponse(conf.entity_configuration_as_dict, safe=False)
else:
return HttpResponse(
conf.entity_configuration_as_jws,
content_type="application/entity-statement+jwt"
)
 
sub = FederationDescendant.objects.filter(
sub=request.GET["sub"], is_active=True
).first()
if not sub:
raise Http404()
 
if request.GET.get("format") == "json":
return JsonResponse(
sub.entity_statement_as_dict(iss.sub, request.GET.get("aud",[])), safe=False
)
else:
return HttpResponse(
sub.entity_statement_as_jws(iss.sub, request.GET.get("aud",[])),
content_type="application/entity-statement+jwt",
)
 
 
@schema(
methods=['GET'],
get_request_schema = {
"application/x-www-form-urlencoded": ListRequest
},
get_response_schema = {
"400": FedAPIErrorResponse,
"404": FedAPIErrorResponse,
"200": ListResponse
},
tags = ['Federation API']
)
def entity_list(request):
if request.GET.get("entity_type", "").lower():
_q = {"profile__profile_category": request.GET["entity_type"]}
else:
_q = {}
 
entries = FederationEntityAssignedProfile.objects.filter(**_q).values_list(
"descendant__sub", flat=True
)
return JsonResponse(list(set(entries)), safe=False)
 
 
# TODO - add the schema
# @schema(
# methods=['GET'],
# get_request_schema = {
# "application/x-www-form-urlencoded": ListRequest
# },
# get_response_schema = {
# "400": FedAPIErrorResponse,
# "404": FedAPIErrorResponse,
# "200": ListResponse
# },
# tags = ['Federation API']
# )
def trust_marked_list(request):
if request.GET.get("trust_mark_id", "").lower():
_q = {"profile__profile_id": request.GET["trust_mark_id"]}
else:
_q = {}
 
entries = FederationEntityAssignedProfile.objects.filter(**_q).values_list(
"descendant__sub", flat=True
)
return JsonResponse(list(set(entries)), safe=False)
 
 
@schema(
methods=['GET'],
get_request_schema = {
"application/x-www-form-urlencoded": AdvancedEntityListRequest
},
get_response_schema = {
"400": FedAPIErrorResponse,
"404": FedAPIErrorResponse,
"200": AdvancedEntityListResponse
},
tags = ['Federation API']
)
Function `advanced_entity_listing` has a Cognitive Complexity of 6 (exceeds 5 allowed). Consider refactoring.
def advanced_entity_listing(request):
desecendants = FederationDescendant.objects.filter(
is_active = True,
).order_by("-modified")
entities_list = []
for descendant in desecendants:
try:
_ss = descendant.entity_statement_as_jws()
except AttributeError as e:
logger.warning(
f"Subordinate {descendant} missing authority hint: {e}"
)
continue
 
entity = {
descendant.sub : {
"iat" : int(descendant.modified.timestamp()),
"subordinate_statement": _ss
}
}
entities_list.append(entity)
total_entries = desecendants.count()
 
_max_entries = getattr(settings, 'MAX_ENTRIES_PAGE', MAX_ENTRIES_PAGE)
p = Paginator(entities_list, _max_entries)
page = request.GET.get("page", 1)
entities = p.get_page(page)
next_page_path = ""
if entities.has_next():
param = {"page": entities.next_page_number()}
url = f'{reverse("oidcfed_advanced_entity_listing")}?{urllib.parse.urlencode(param)}'
next_page_path = f"{url}"
prev_page_path = ""
if entities.has_previous():
param = {"page": entities.previous_page_number()}
url = f'{reverse("oidcfed_advanced_entity_listing")}?{urllib.parse.urlencode(param)}'
prev_page_path = f"{url}"
try:
iss = get_first_self_trust_anchor().sub
except Exception:
return JsonResponse(
{
"error": "Missing trust anchor",
},
status = 404
)
res = {
"iss" : iss,
"iat" : iat_now(),
"entities" : entities_list,
"page" : int(page),
"total_pages" : math.ceil(total_entries / MAX_ENTRIES_PAGE),
"total_entries" : total_entries,
"next_page_path": next_page_path,
"prev_page_path": prev_page_path,
}
return JsonResponse(res, safe=False)
 
 
@schema(
methods=['GET', 'POST'],
get_request_schema = {
"application/x-www-form-urlencoded": TrustMarkRequest
},
get_response_schema = {
"400": FedAPIErrorResponse,
"404": FedAPIErrorResponse,
"200": TrustMarkResponse
},
tags = ['Federation API']
)
@csrf_exempt
Function `trust_mark_status` has a Cognitive Complexity of 10 (exceeds 5 allowed). Consider refactoring.
def trust_mark_status(request):
failed_data = {"active": False}
 
sub = request.POST.get("sub") or request.GET.get("sub", None)
_id = request.POST.get("trust_mark_id") or request.GET.get("trust_mark_id", None) \
or request.POST.get("id") or request.GET.get("id", None)
trust_mark = request.POST.get("trust_mark") or request.GET.get("trust_mark", None)
 
if request.method not in ['GET', 'POST']:
return JsonResponse({"error": "Method not allowed"}, status=400)
 
if trust_mark:
try:
unpad_jwt_head(trust_mark)
payload = unpad_jwt_payload(trust_mark)
sub = payload["sub"]
_id = payload["id"]
except Exception:
return JsonResponse(failed_data)
elif sub and _id:
pass
else:
return JsonResponse(failed_data)
 
res = FederationEntityAssignedProfile.objects.filter(
descendant__sub=sub, profile__profile_id=_id, descendant__is_active=True
)
if res:
return JsonResponse({"active": True})
else:
Avoid too many `return` statements within this function.
return JsonResponse(failed_data)