websecmap/scanners/admin.py
from django.contrib import admin
from django.utils.html import format_html
from django.utils.safestring import mark_safe
from import_export.admin import ImportExportModelAdmin
from jet.admin import CompactInline
from jet.filters import RelatedFieldAjaxListFilter
from websecmap.scanners import models
from websecmap.scanners.models import State
from websecmap.scanners.proxy import check_proxy
from websecmap.scanners.scanner.internet_nl_v2_websecmap import progress_running_scan, recover_and_retry
class EndpointGenericScanInline(CompactInline):
model = models.EndpointGenericScan
extra = 0
show_change_link = True
ordering = ["-rating_determined_on"]
@admin.register(models.UrlIp)
class UrlIpAdmin(ImportExportModelAdmin, admin.ModelAdmin):
list_display = ("url", "ip", "rdns_name", "discovered_on", "is_unused_since")
search_fields = ("url__url", "ip", "rdns_name", "discovered_on", "is_unused_since")
list_filter = ["url", "ip", "rdns_name", "discovered_on", "is_unused_since"][::-1]
fields = ("url", "ip", "rdns_name", "discovered_on", "is_unused", "is_unused_since", "is_unused_reason")
readonly_fields = ["discovered_on"]
@admin.register(models.Endpoint)
class EndpointAdmin(ImportExportModelAdmin, admin.ModelAdmin):
list_display = (
"id",
"url",
"visit",
"discovered_on",
"ip_version",
"port",
"protocol",
"is_dead",
"is_dead_since",
"is_dead_reason",
)
search_fields = ("url__url", "ip_version", "port", "protocol", "is_dead", "is_dead_since", "is_dead_reason")
list_filter = [
"url__organization__country",
"url__organization__type__name",
"ip_version",
"port",
"protocol",
"is_dead",
"is_dead_reason",
"discovered_on",
][::-1]
fieldsets = (
(None, {"fields": ("url", "ip_version", "protocol", "port", "discovered_on")}),
(
"dead endpoint management",
{
"fields": ("is_dead", "is_dead_since", "is_dead_reason"),
},
),
)
readonly_fields = ["discovered_on"]
@staticmethod
def visit(inst):
if not inst:
return "- no url, how?"
url = "%s://%s:%s/" % (inst.protocol, inst.url.url, inst.port)
return format_html("<a href='%s' target='_blank'>Visit</a>" % url)
inlines = [EndpointGenericScanInline]
save_as = True # Save as new is nice for duplicating endpoints.
@admin.register(models.TlsQualysScratchpad)
class TlsQualysScratchpadAdmin(ImportExportModelAdmin, admin.ModelAdmin):
list_display = ("domain", "at_when")
search_fields = ("domain", "at_when")
list_filter = ["domain", "at_when"][::-1]
fields = ("domain", "data")
readonly_fields = ["at_when"]
@admin.register(models.Screenshot)
class ScreenshotAdmin(ImportExportModelAdmin, admin.ModelAdmin):
list_display = ("endpoint", "created_on", "filename")
search_fields = ("endpoint__url__url", "created_on", "filename")
list_filter = ["endpoint", "created_on", "filename"][::-1]
fields = ("endpoint", "image", "created_on", "filename", "width_pixels", "height_pixels")
readonly_fields = ["created_on", "image"]
@admin.register(models.EndpointGenericScan)
class EndpointGenericScanAdmin(ImportExportModelAdmin, admin.ModelAdmin):
def get_queryset(self, request):
qs = super(EndpointGenericScanAdmin, self).get_queryset(request)
# Because the endpoint, to display, needs url data, it will try to retrieve that for every
# record. Here we already include those results, so the page loads 10x faster (at okay speeds now)
qs = qs.prefetch_related(
"endpoint", "endpoint__url", "endpoint__url__organization", "endpoint__url__organization__type"
)
return qs
list_display = (
"endpoint",
"type",
"rating",
"last_scan_moment",
"rating_determined_on",
"comply_or_explain_is_explained",
"explain",
"is_the_latest_scan",
)
search_fields = ("endpoint__url__url", "type", "rating", "explanation", "last_scan_moment", "rating_determined_on")
list_filter = [
"endpoint__url__organization__country",
"endpoint__url__organization__type__name",
"endpoint__url__is_dead",
("endpoint", RelatedFieldAjaxListFilter),
"type",
"rating",
"last_scan_moment",
"rating_determined_on",
"endpoint__protocol",
"endpoint__port",
"endpoint__ip_version",
"endpoint__discovered_on",
"endpoint__is_dead",
"comply_or_explain_is_explained",
"is_the_latest_scan",
][::-1]
fieldsets = (
(
None,
{
"fields": (
"endpoint",
"type",
"rating",
"explanation",
"evidence",
"last_scan_moment",
"rating_determined_on",
"is_the_latest_scan",
)
},
),
(
"comply or explain",
{
"fields": (
"comply_or_explain_is_explained",
"comply_or_explain_explanation_valid_until",
"comply_or_explain_explanation",
"comply_or_explain_explained_by",
"comply_or_explain_explained_on",
"comply_or_explain_case_handled_by",
"comply_or_explain_case_additional_notes",
),
},
),
)
def explain(self, object):
return format_html("<a href='./{}/change/#/tab/module_1/'>Explain</a>", object.pk)
readonly_fields = ["last_scan_moment"]
@admin.register(models.UrlGenericScan)
class UrlGenericScanAdmin(ImportExportModelAdmin, admin.ModelAdmin):
def get_queryset(self, request):
qs = super(UrlGenericScanAdmin, self).get_queryset(request)
qs = qs.prefetch_related("url__organization", "url__organization__type")
return qs
list_display = (
"url",
"type",
"rating",
"explanation",
"last_scan_moment",
"rating_determined_on",
"comply_or_explain_is_explained",
"explain",
"is_the_latest_scan",
"short_evidence",
)
search_fields = ("url__url", "type", "rating", "explanation", "last_scan_moment", "rating_determined_on")
list_filter = [
"url__organization__country",
"url__organization__type__name",
"type",
"rating",
"last_scan_moment",
"rating_determined_on",
"comply_or_explain_is_explained",
"is_the_latest_scan",
][::-1]
@staticmethod
def short_evidence(obj):
return obj.evidence[0:60]
fieldsets = (
(
None,
{
"fields": (
"url",
"type",
"rating",
"explanation",
"evidence",
"last_scan_moment",
"rating_determined_on",
"is_the_latest_scan",
)
},
),
(
"comply or explain",
{
"fields": (
"comply_or_explain_is_explained",
"comply_or_explain_explanation_valid_until",
"comply_or_explain_explanation",
"comply_or_explain_explained_by",
"comply_or_explain_explained_on",
"comply_or_explain_case_handled_by",
"comply_or_explain_case_additional_notes",
),
},
),
)
def explain(self, object):
return format_html("<a href='./{}/change/#/tab/module_1/'>Explain</a>", object.pk)
readonly_fields = ["last_scan_moment", "rating_determined_on"]
@admin.register(models.EndpointGenericScanScratchpad)
class EndpointGenericScanScratchpadAdmin(ImportExportModelAdmin, admin.ModelAdmin):
list_display = ("type", "domain", "at_when", "data")
search_fields = ("type", "domain", "at_when", "data")
list_filter = ["type", "domain", "at_when", "data"][::-1]
fields = ("type", "domain", "at_when", "data")
@admin.register(models.InternetNLV2Scan)
class InternetNLV2ScanAdmin(ImportExportModelAdmin, admin.ModelAdmin):
list_display = (
"id",
"type",
"scan_id",
"online",
"state",
"state_message",
"last_state_check",
"last_state_change",
"domains",
)
search_fields = ("subject_urls__url", "scan_id")
list_filter = ("state", "state_message", "last_state_check", "last_state_change", "type")
fields = (
"type",
"scan_id",
"state",
"state_message",
"last_state_check",
"last_state_change",
"metadata",
"retrieved_scan_report",
"subject_urls",
)
# todo: subject_urls inline
readonly_fields = ["subject_urls", "metadata", "retrieved_scan_report"]
@staticmethod
def domains(obj):
return obj.subject_urls.count()
@staticmethod
def online(obj):
from constance import config
return mark_safe( # nosec not the best way to render html in the admin, but one that's in full admin control
f"<a href='{config.INTERNET_NL_API_USERNAME}:{config.INTERNET_NL_API_PASSWORD}"
f"@{config.INTERNET_NL_API_URL}/requests/{obj.scan_id}' target='_blank'>online</a>"
)
actions = []
def attempt_rollback(self, request, queryset):
for scan in queryset:
recover_and_retry.apply_async([scan.pk])
self.message_user(request, "Rolling back asynchronously. May take a while.")
attempt_rollback.short_description = "Attempt rollback (async)"
actions.append("attempt_rollback")
def progress_scan(self, request, queryset):
for scan in queryset:
tasks = progress_running_scan(scan.pk)
tasks.apply_async()
self.message_user(request, "Attempting to progress scans (async).")
progress_scan.short_description = "Progress scan (async)"
actions.append("progress_scan")
@admin.register(models.InternetNLV2StateLog)
class InternetNLV2StateLogAdmin(ImportExportModelAdmin, admin.ModelAdmin):
list_display = ("pk", "scan", "state", "state_message", "last_state_check", "at_when")
search_fields = ("scan__scan_id",)
list_filter = ("state", "state_message", "last_state_check", "at_when")
fields = ("scan", "state", "state_message", "last_state_check", "at_when")
@admin.register(models.ScanProxy)
class ScanProxyAdmin(ImportExportModelAdmin, admin.ModelAdmin):
list_display = (
"id",
"protocol",
"address",
"check_result",
"check_result_date",
"currently_used_in_tls_qualys_scan",
"last_claim_at",
"manually_disabled",
"is_dead",
"request_speed_in_ms",
"qualys_capacity_current",
"qualys_capacity_max",
"qualys_capacity_this_client",
)
search_fields = ("address",)
list_filter = (
"protocol",
"is_dead",
"is_dead_since",
"check_result",
"manually_disabled",
"qualys_capacity_current",
)
fields = (
"protocol",
"address",
"currently_used_in_tls_qualys_scan",
"is_dead",
"is_dead_since",
"is_dead_reason",
"manually_disabled",
"check_result",
"check_result_date",
"request_speed_in_ms",
"qualys_capacity_current",
"qualys_capacity_max",
"qualys_capacity_this_client",
"last_claim_at",
)
actions = []
def check_qualys_proxy(self, request, queryset):
for proxy in queryset:
check_proxy.apply_async([proxy.as_dict()])
self.message_user(request, "Proxing checked asynchronously. May take some time before results come in.")
check_qualys_proxy.short_description = "Check proxy"
actions.append("check_qualys_proxy")
def release_proxy(self, request, queryset):
for proxy in queryset:
proxy.out_of_resource_counter = 0
proxy.currently_used_in_tls_qualys_scan = False
proxy.save()
self.message_user(request, "Proxies released.")
release_proxy.short_description = "Release proxy"
actions.append("release_proxy")
def reset_proxy(self, request, queryset):
for proxy in queryset:
proxy.is_dead = False
proxy.out_of_resource_counter = 0
proxy.currently_used_in_tls_qualys_scan = False
proxy.save()
self.message_user(request, "Proxies reset.")
reset_proxy.short_description = "Reset proxy"
actions.append("reset_proxy")
def disable_proxy(self, request, queryset):
for proxy in queryset:
proxy.manually_disabled = True
proxy.save()
self.message_user(request, "Proxies disabled.")
disable_proxy.short_description = "Disable proxy"
actions.append("disable_proxy")
def enable_proxy(self, request, queryset):
for proxy in queryset:
proxy.manually_disabled = False
proxy.save()
self.message_user(request, "Proxies enabled.")
enable_proxy.short_description = "Enable proxy"
actions.append("enable_proxy")
@admin.register(models.PlannedScan)
class PlannedScanAdmin(ImportExportModelAdmin, admin.ModelAdmin):
list_display = (
"url",
"activity",
"scanner",
"state",
"last_state_change_at",
"requested_at_when",
"finished_at_when",
)
search_fields = ("url__url", "activity", "scanner", "state")
list_filter = ["activity", "scanner", "state", "last_state_change_at", "requested_at_when", "finished_at_when"][
::-1
]
fields = ("url", "activity", "scanner", "state", "last_state_change_at", "requested_at_when", "finished_at_when")
actions = []
def set_state_requested(self, request, queryset):
for plannedscan in queryset:
plannedscan.state = State.requested.value
plannedscan.save()
self.message_user(request, "Set to requested.")
set_state_requested.short_description = "Set to Requested"
actions.append("set_state_requested")
def set_state_finished(self, request, queryset):
for plannedscan in queryset:
plannedscan.state = State.finished.value
plannedscan.save()
self.message_user(request, "Set to finished.")
set_state_finished.short_description = "Set to Finished"
actions.append("set_state_finished")
@admin.register(models.PlannedScanStatistic)
class PlannedScanStatisticAdmin(ImportExportModelAdmin, admin.ModelAdmin):
list_display = ("at_when", "data")
list_filter = ["at_when"][::-1]
fields = ("at_when", "data")