italia/spid-cie-oidc-django

View on GitHub
spid_cie_oidc/entity/views.py

Summary

Maintainability
A
1 hr
Test Coverage
D
66%
import logging
 
from django.conf import settings
from djagger.decorators import schema
from django.http import Http404
from django.http import HttpResponse
from django.http import JsonResponse
 
from spid_cie_oidc.entity.jwtse import create_jws
from spid_cie_oidc.entity.schemas.resolve_endpoint import (
ResolveRequest, ResolveResponse, ResolveErrorResponse
)
from spid_cie_oidc.entity.trust_chain_operations import get_or_create_trust_chain
 
from .models import (
FederationEntityConfiguration,
FederationHistoricalKey,
TrustChain,
StaffToken
)
from .settings import HTTPC_PARAMS
from .statements import OIDCFED_FEDERATION_WELLKNOWN_URL
from .utils import iat_now
 
logger = logging.getLogger(__name__)
 
 
def get_subs_from_wellknown(request, wkuri :str):
sub = request.build_absolute_uri().split(wkuri)[0]
 
sub_values = []
if sub[-1] == "/":
sub_values.extend((sub, sub[:-1]))
elif sub[-1] != "/":
sub_values.extend((sub, sub + "/"))
return sub_values
 
 
def entity_configuration(request):
"""
OIDC Federation Entity Configuration at
.well-known/openid-federation
"""
 
_sub_values = get_subs_from_wellknown(request, OIDCFED_FEDERATION_WELLKNOWN_URL)
 
conf = FederationEntityConfiguration.objects.filter(
# TODO: check for reverse proxy and forwarders ...
sub__in=_sub_values,
is_active=True,
).first()
if not conf: # pragma: no cover
raise Http404()
 
if request.GET.get("format") == "json": # pragma: no cover
return JsonResponse(conf.entity_configuration_as_dict, safe=False)
else:
return HttpResponse(
conf.entity_configuration_as_jws, content_type="application/entity-statement+jwt"
)
 
 
@schema(
methods=['GET'],
get_request_schema = {
"application/x-www-form-urlencoded": ResolveRequest
},
get_response_schema = {
"400": ResolveErrorResponse,
"404": ResolveErrorResponse,
"200": ResolveResponse
},
tags = ['Federation API']
)
Function `resolve_entity_statement` has a Cognitive Complexity of 11 (exceeds 5 allowed). Consider refactoring.
def resolve_entity_statement(request, format: str = "jose"):
"""
resolves the final metadata of its descendants
 
In this implementation we only returns a preexisting
Metadata if it's valid
we avoid any possibility to trigger a new Metadata discovery if
"""
if not all((request.GET.get("sub", None), request.GET.get("anchor", None))):
raise Http404("sub and anchor parameters are REQUIRED.")
 
iss = FederationEntityConfiguration.objects.filter(is_active=True).first()
 
_q = dict(
sub=request.GET["sub"],
trust_anchor__sub=request.GET["anchor"],
is_active=True
)
 
# gets the cached one
entity = TrustChain.objects.filter(**_q).first()
 
# only with privileged actors with staff token can triggers a new trust chain
staff_token_head = request.headers.get("Authorization", None)
if staff_token_head:
staff_token = StaffToken.objects.filter(
token = staff_token_head
).first()
if staff_token.is_valid:
try:
# a staff token get a fresh trust chain on each call
entity = get_or_create_trust_chain(
httpc_params=HTTPC_PARAMS,
required_trust_marks = getattr(
settings, "OIDCFED_REQUIRED_TRUST_MARKS", []
),
subject=_q["sub"],
trust_anchor=_q["trust_anchor__sub"],
force = True
)
except Exception as e:
logger.error(
f"Failed privileged Trust Chain creation for {_q['sub']}: {e}"
)
 
if not entity:
raise Http404("entity not found.")
 
res = {
"iss": iss.sub,
"sub": request.GET["sub"],
# "aud": [],
"iat": entity.iat_as_timestamp,
"exp": entity.exp_as_timestamp,
"trust_marks": entity.trust_marks,
"metadata": entity.metadata,
"trust_chain": entity.chain
}
 
if request.GET.get("format") == "json" or format == "json":
return JsonResponse(res, safe=False)
else:
return HttpResponse(
create_jws(res, iss.jwks_fed[0]),
content_type="application/jose",
)
 
 
def openid_jwks(request, metadata_type:str, resource_type:str):
"""
resource_tytpe = set(jwks_uri, jwks.jose)
"""
_sub = request.build_absolute_uri().rsplit(resource_type)[0]
_lookup = _sub.replace(f"/{metadata_type}/", "")
conf = FederationEntityConfiguration.objects.filter(
# TODO: check for reverse proxy and forwarders ...
sub=_lookup,
is_active=True,
).first()
if not conf: # pragma: no cover
raise Http404()
content = conf.entity_configuration_as_dict['metadata'].get(
metadata_type, {}
).get("jwks", {})
 
if not content:
raise Http404()
 
if resource_type == 'jwks.json':
return JsonResponse(content, safe=False)
elif resource_type == 'jwks.jose':
return HttpResponse(
create_jws(content, conf.jwks_fed[0]),
content_type="application/jose"
)
else:
raise Http404()
 
 
def openid_connect_jwks_uri(request, metadata_type, resource_type="jwks.json"):
"""
OpenID Connect default jwks_uri
"""
return openid_jwks(request, metadata_type, resource_type)
 
 
def openid_connect_signed_jwks_uri(request, metadata_type, resource_type="jwks.jose"):
"""
OpenID Federation signed_jwks_uri
"""
return openid_jwks(request, metadata_type, resource_type)
 
 
def historical_keys(request):
"""
OIDC Federation Entity Configuration at
.well-known/openid-federation
"""
 
sub_values = get_subs_from_wellknown(request, '.well-known/openid-federation-historical-jwks')
keys = FederationHistoricalKey.objects.filter(entity__sub__in=sub_values)
 
entity = keys.first().entity if keys.first() else None
 
reg = {
"iss": entity.sub if entity else sub_values[0],
"iat": iat_now(),
"keys": []
}
 
for i in keys:
reg["keys"].append(i.as_dict)
 
if request.GET.get("format") == "json": # pragma: no cover
return JsonResponse(reg, safe=False)
else:
res = create_jws(
reg,
entity.jwks_fed[0],
typ="jwk-set+jwt",
# **kwargs,
)
 
return HttpResponse(
res, content_type="application/jwk-set+jwt"
)