spid_cie_oidc/authority/views.py
import loggingimport mathimport urllib.parse from djagger.decorators import schemafrom django.conf import settingsfrom django.core.paginator import Paginatorfrom django.http import ( Http404, HttpResponse, JsonResponse, QueryDict)from django.urls import reversefrom django.views.decorators.csrf import csrf_exempt from spid_cie_oidc.authority.models import ( FederationDescendant, FederationEntityAssignedProfile)from spid_cie_oidc.authority.settings import MAX_ENTRIES_PAGEfrom spid_cie_oidc.entity.jwtse import ( unpad_jwt_head, unpad_jwt_payload)from spid_cie_oidc.entity.models import get_first_self_trust_anchorfrom spid_cie_oidc.entity.utils import iat_now from . schemas.fetch_endpoint_request import FetchRequest, FedAPIErrorResponse, FetchResponsefrom . schemas.list_endpoint import ListRequest, ListResponsefrom . schemas.advanced_entity_list_endpoint import AdvancedEntityListRequest, AdvancedEntityListResponsefrom . schemas.trust_mark_status_endpoint import TrustMarkRequest, TrustMarkResponse logger = logging.getLogger(__name__) @schema( methods=['GET'], get_request_schema = { "application/x-www-form-urlencoded": FetchRequest }, get_response_schema = { "400": FedAPIErrorResponse, "404": FedAPIErrorResponse, "200": FetchResponse }, tags = ['Federation API'])Function `fetch` has a Cognitive Complexity of 9 (exceeds 5 allowed). Consider refactoring.def fetch(request): """ All entities that are expected to publish entity statements about other entities MUST expose a Fetch endpoint. Fetching entity statements is performed to collect entity statements one by one to gather trust chains. To fetch an entity statement, an entity needs to know the identifier of the entity to ask (the issuer), the fetch endpoint of that entity and the identifier of the entity that you want the statement to be about (the subject). """ if request.GET.get("iss"): iss = get_first_self_trust_anchor(sub=request.GET["iss"]) else: iss = get_first_self_trust_anchor() if not request.GET.get("sub"): conf = get_first_self_trust_anchor() if request.GET.get("format") == "json": return JsonResponse(conf.entity_configuration_as_dict, safe=False) else: return HttpResponse( conf.entity_configuration_as_jws, content_type="application/entity-statement+jwt" ) sub = FederationDescendant.objects.filter( sub=request.GET["sub"], is_active=True ).first() if not sub: raise Http404() if request.GET.get("format") == "json": return JsonResponse( sub.entity_statement_as_dict(iss.sub, request.GET.get("aud",[])), safe=False ) else: return HttpResponse( sub.entity_statement_as_jws(iss.sub, request.GET.get("aud",[])), content_type="application/entity-statement+jwt", ) @schema( methods=['GET'], get_request_schema = { "application/x-www-form-urlencoded": ListRequest }, get_response_schema = { "400": FedAPIErrorResponse, "404": FedAPIErrorResponse, "200": ListResponse }, tags = ['Federation API'])def entity_list(request): if request.GET.get("entity_type", "").lower(): _q = {"profile__profile_category": request.GET["entity_type"]} else: _q = {} entries = FederationEntityAssignedProfile.objects.filter(**_q).values_list( "descendant__sub", flat=True ) return JsonResponse(list(set(entries)), safe=False) # TODO - add the schema# @schema( # methods=['GET'], # get_request_schema = { # "application/x-www-form-urlencoded": ListRequest # }, # get_response_schema = { # "400": FedAPIErrorResponse, # "404": FedAPIErrorResponse, # "200": ListResponse # }, # tags = ['Federation API']# )def trust_marked_list(request): if request.GET.get("trust_mark_id", "").lower(): _q = {"profile__profile_id": request.GET["trust_mark_id"]} else: _q = {} entries = FederationEntityAssignedProfile.objects.filter(**_q).values_list( "descendant__sub", flat=True ) return JsonResponse(list(set(entries)), safe=False) @schema( methods=['GET'], get_request_schema = { "application/x-www-form-urlencoded": AdvancedEntityListRequest }, get_response_schema = { "400": FedAPIErrorResponse, "404": FedAPIErrorResponse, "200": AdvancedEntityListResponse }, tags = ['Federation API'])Function `advanced_entity_listing` has a Cognitive Complexity of 6 (exceeds 5 allowed). Consider refactoring.def advanced_entity_listing(request): desecendants = FederationDescendant.objects.filter( is_active = True, ).order_by("-modified") entities_list = [] for descendant in desecendants: try: _ss = descendant.entity_statement_as_jws() except AttributeError as e: logger.warning( f"Subordinate {descendant} missing authority hint: {e}" ) continue entity = { descendant.sub : { "iat" : int(descendant.modified.timestamp()), "subordinate_statement": _ss } } entities_list.append(entity) total_entries = desecendants.count() _max_entries = getattr(settings, 'MAX_ENTRIES_PAGE', MAX_ENTRIES_PAGE) p = Paginator(entities_list, _max_entries) page = request.GET.get("page", 1) entities = p.get_page(page) next_page_path = "" if entities.has_next(): param = {"page": entities.next_page_number()} url = f'{reverse("oidcfed_advanced_entity_listing")}?{urllib.parse.urlencode(param)}' next_page_path = f"{url}" prev_page_path = "" if entities.has_previous(): param = {"page": entities.previous_page_number()} url = f'{reverse("oidcfed_advanced_entity_listing")}?{urllib.parse.urlencode(param)}' prev_page_path = f"{url}" try: iss = get_first_self_trust_anchor().sub except Exception: return JsonResponse( { "error": "Missing trust anchor", }, status = 404 ) res = { "iss" : iss, "iat" : iat_now(), "entities" : entities_list, "page" : int(page), "total_pages" : math.ceil(total_entries / MAX_ENTRIES_PAGE), "total_entries" : total_entries, "next_page_path": next_page_path, "prev_page_path": prev_page_path, } return JsonResponse(res, safe=False) @schema( methods=['GET', 'POST'], get_request_schema = { "application/x-www-form-urlencoded": TrustMarkRequest }, get_response_schema = { "400": FedAPIErrorResponse, "404": FedAPIErrorResponse, "200": TrustMarkResponse }, tags = ['Federation API'])@csrf_exemptFunction `trust_mark_status` has a Cognitive Complexity of 10 (exceeds 5 allowed). Consider refactoring.def trust_mark_status(request): failed_data = {"active": False} sub = request.POST.get("sub") or request.GET.get("sub", None) _id = request.POST.get("trust_mark_id") or request.GET.get("trust_mark_id", None) \ or request.POST.get("id") or request.GET.get("id", None) trust_mark = request.POST.get("trust_mark") or request.GET.get("trust_mark", None) if request.method not in ['GET', 'POST']: return JsonResponse({"error": "Method not allowed"}, status=400) if trust_mark: try: unpad_jwt_head(trust_mark) payload = unpad_jwt_payload(trust_mark) sub = payload["sub"] _id = payload["id"] except Exception: return JsonResponse(failed_data) elif sub and _id: pass else: return JsonResponse(failed_data) res = FederationEntityAssignedProfile.objects.filter( descendant__sub=sub, profile__profile_id=_id, descendant__is_active=True ) if res: return JsonResponse({"active": True}) else:Avoid too many `return` statements within this function. return JsonResponse(failed_data)