spid_cie_oidc/entity/admin.py
import logging from django.contrib import adminfrom django.utils.safestring import mark_safefrom django.contrib import messages # from prettyjson import PrettyJSONWidget from django.conf import settingsfrom .jwtse import unpad_jwt_payloadfrom .statements import EntityConfiguration, get_entity_configurations, get_entity_statementsfrom .models import ( FederationEntityConfiguration, FederationHistoricalKey, FetchedEntityStatement, TrustChain, StaffToken)from spid_cie_oidc.entity.trust_chain_operations import get_or_create_trust_chain from . settings import HTTPC_PARAMS logger = logging.getLogger(__name__) @admin.register(FederationHistoricalKey)class FederationHistoricalKey(admin.ModelAdmin): readonly_fields = ('jwk', "as_json") list_display = ("entity", "kid", "inactive_from") list_filter = ("created", "modified", "inactive_from") search_fields = ("entity",) @admin.register(FederationEntityConfiguration)class FederationEntityConfigurationAdmin(admin.ModelAdmin): # formfield_overrides = { # JSONField: { # "widget": PrettyJSONWidget( # attrs={"initial": "parsed", "disabled": True} # ) # } # } @admin.action(description='update trust marks')Function `update_trust_marks` has a Cognitive Complexity of 22 (exceeds 5 allowed). Consider refactoring. def update_trust_marks(modeladmin, request, queryset): # pragma: no cover """ fetch trust marks from all the authorities """ trust_marks = {} for obj in queryset: jwts = get_entity_configurations(obj.authority_hints, HTTPC_PARAMS) for jwt in jwts: try: ec = EntityConfiguration(jwt, httpc_params=HTTPC_PARAMS) except Exception as e: _msg = f"Failed getting Entity Configuration for {jwt}: {e}" logger.warning(_msg) messages.error(_msg) continue try: # get superior fetch url fetch_api_url = ec.payload["metadata"]["federation_entity"][ "federation_fetch_endpoint" ] except KeyError: _msg = ( "Missing federation_fetch_endpoint in " f"federation_entity metadata for {obj.sub} by {ec.sub}." ) logger.warning(_msg) messages.error(_msg) continue _url = f"{fetch_api_url}?sub={obj.sub}" try: logger.info(f"Getting entity statements from {_url}") _jwts = get_entity_statements([_url], HTTPC_PARAMS) payload = unpad_jwt_payload(_jwts[0]) for i in payload.get("trust_marks", []): trust_marks[i['id']] = i['trust_mark'] except Exception as e: _msg = f"Error getting entity statements from {_url}: {e}" logger.warning(_msg) messages.error(_msg) continue positions = {} count = 0 for i in obj.trust_marks: positions[i['id']] = count count += 1 if positions: for k,v in trust_marks.items(): if positions.get(k, None): obj.trust_marks[positions[k]] = {k:v} else: obj.trust_marks.append({k:v}) else: obj.trust_marks = [ {"id":k, "trust_mark":v} for k,v in trust_marks.items() ] obj.save() messages.success( request, f"Trust mark reloaded succesfully: {', '.join(trust_marks.keys())}" ) list_display = ( "sub", "type", "kids", "is_active", "created", ) list_filter = ("created", "modified", "is_active") # search_fields = ('command__name',) readonly_fields = ( "created", "modified", "entity_configuration_as_json", "pems_as_html", "kids", "type", ) actions = [update_trust_marks] def pems_as_html(self, obj): res = "" data = dict() for k, v in obj.pems_as_dict.items(): data[k] = {} for i in ("public", "private"): data[k][i] = v[i].replace("\n", "<br>") res += ( f"<b>{k}</b><br><br>" f"{data[k]['public']}<br>" f"{data[k]['private']}<br><hr>" ) return mark_safe(res) # nosec @admin.register(TrustChain)class TrustChainAdmin(admin.ModelAdmin): @admin.action(description='reload trust chain') def update_trust_chain(modeladmin, request, queryset): # pragma: no cover for tc in queryset: sub = tc.sub ta = tc.trust_anchor.sub try : get_or_create_trust_chain( subject=sub, trust_anchor=ta, httpc_params=settings.HTTPC_PARAMS, required_trust_marks=getattr( settings, "OIDCFED_REQUIRED_TRUST_MARKS", [], ), force=True ) messages.success( request, f"Trust chain successfully reloaded for {sub}" ) except Exception as e: messages.error(request, f"Failed to update {sub} due to: {e}") continue list_display = ("sub", "exp", "modified", "is_valid") list_filter = ("exp", "modified", "is_active") search_fields = ("sub",) readonly_fields = ( "created", "modified", "parties_involved", "status", "log", "chain", "iat", ) actions = [update_trust_chain] @admin.register(FetchedEntityStatement)class FetchedEntityStatementAdmin(admin.ModelAdmin): list_display = ("sub", "iss", "exp", "iat", "created", "modified") list_filter = ("created", "modified", "exp", "iat") search_fields = ("sub", "iss") readonly_fields = ("sub", "statement", "created", "modified", "iat", "exp", "iss") @admin.register(StaffToken)class StaffTokenAdmin(admin.ModelAdmin): list_display = ("user", "expire_at", "is_valid") list_filter = ("created", "modified", "expire_at") search_fields = ("token", ) readonly_fields = ("is_valid",) raw_id_fields = ('user',)