failmap/admin

View on GitHub
websecmap/map/admin.py

Summary

Maintainability
C
1 day
Test Coverage
import logging
from copy import deepcopy
from datetime import datetime
from typing import Dict, Any

import pytz
from adminsortable2.admin import SortableAdminMixin
from celery import group
from django.contrib import admin
from django.db import transaction
from django.urls import reverse
from django.utils.html import format_html
from import_export.admin import ImportExportModelAdmin

from websecmap.app.models import Job
from websecmap.celery import PRIO_HIGH, app
from websecmap.map import models
from websecmap.map.logic.openstreetmap import import_from_scratch, update_coordinates
from websecmap.map.models import HighLevelStatistic, MapDataCache, VulnerabilityStatistic
from websecmap.map.report import compose_task

log = logging.getLogger(__package__)


@admin.register(models.AdministrativeRegion)
class AdministrativeRegionAdmin(ImportExportModelAdmin, admin.ModelAdmin):

    list_display = (
        "country",
        "organization_type",
        "admin_level",
        "import_start_date",
        "imported",
        "import_message",
        "resampling_resolution",
    )
    search_fields = ["country", "organization_type__name", "admin_level"]
    list_filter = ["country", "organization_type", "admin_level", "imported"][::-1]
    fields = (
        "country",
        "organization_type",
        "admin_level",
        "import_start_date",
        "imported",
        "import_message",
        "resampling_resolution",
    )

    readonly_fields = ["import_start_date", "imported", "import_message"]

    actions = []

    def import_region(self, request, queryset):
        tasks = []

        for region in queryset:
            organization_filter = {"country": region.country, "type": region.organization_type.pk}

            # you can't add tasks for reporting, as the data is not in the database to create the tasks yet.
            tasks.append(
                start_import.si(region.pk)
                | import_from_scratch.si([str(region.country)], [region.organization_type.name])
                | add_configuration.si(str(region.country), region.organization_type.pk)
                | set_imported.si(region.pk)
                | report_country.si(organization_filter)
                | prepare_map.si(str(region.country), region.organization_type.pk)
            )

        task_name = "%s (%s) " % ("Import region", ",".join(map(str, list(queryset))))
        task = group(tasks)

        job = Job.create(task, task_name, request, priority=PRIO_HIGH)
        link = reverse("admin:app_job_change", args=(job.id,))
        self.message_user(request, 'Job created, <a href="%s">%s</a>' % (link, task_name))

    import_region.short_description = "🛃  Import region"
    actions.append(import_region)

    # ah, hmm... abstractions...
    # something something abstractions
    def update_coordinates(self, request, queryset):
        tasks = []

        for region in queryset:
            organization_filter = {"country": region.country, "type": region.organization_type.pk}
            tasks.append(
                start_import.si(region.pk)
                | update_coordinates.si([str(region.country)], [region.organization_type.name])
                | set_imported.si(region.pk)
                | report_country.si(organization_filter)
                | prepare_map.si(str(region.country), region.organization_type.pk)
            )

        task_name = "%s (%s) " % ("Update region", ",".join(map(str, list(queryset))))
        task = group(tasks)

        job = Job.create(task, task_name, request, priority=PRIO_HIGH)
        link = reverse("admin:app_job_change", args=(job.id,))
        self.message_user(request, 'Job created, <a href="%s">%s</a>' % (link, task_name))

    update_coordinates.short_description = "🛂  Update region"
    actions.append(update_coordinates)


@app.task(queue="storage")
def start_import(region_id: int):
    region = models.AdministrativeRegion.objects.all().filter(pk=region_id).first()
    if not region:
        return

    region.import_start_date = datetime.now(pytz.utc)
    region.save(update_fields=["import_start_date"])


@app.task(queue="storage")
def set_imported(region_id: int):
    region = models.AdministrativeRegion.objects.all().filter(pk=region_id).first()
    if not region:
        return
    region.imported = True
    region.save(update_fields=["imported"])


@app.task(queue="storage")
def prepare_map(country: str, organization_type: int):
    # when the reports are created, we still probably have some map cache, as the admin probably looked
    # at that page before...
    MapDataCache.objects.all().filter(organization_type__name=organization_type, country=country).delete()


@app.task(queue="storage")
def report_country(organization_filter: Dict[str, Any]):
    tasks = compose_task(organizations_filter=organization_filter)
    tasks.apply_async()


@app.task(queue="storage")
def add_configuration(country: str, organization_type: int):

    if models.Configuration.objects.all().filter(country=country, organization_type=organization_type).exists():
        log.debug("This configuration already exists, skipping.")
        return

    # has earlier configuration of this country? Then add it after that country.
    tmp = models.Configuration.objects.all().filter(country=country).order_by("-display_order").first()
    if tmp:
        new_number = tmp.display_order + 1

        moveup = models.Configuration.objects.all().filter(display_order__gte=new_number).order_by("-display_order")
        for config in moveup:
            config.display_order += 1
            config.save()
    else:
        # otherwise add it to the end of the list.
        tmp = models.Configuration.objects.all().order_by("-display_order").first()

        if tmp:
            new_number = tmp.display_order + 1
        else:
            new_number = 1

    a = models.Configuration(
        country=country,
        organization_type=organization_type,
        is_the_default_option=False,
        is_displayed=False,
        is_scanned=False,
        is_reported=True,
        display_order=new_number,
    )
    a.save()


@admin.register(models.Configuration)
# ImportExportModelAdmin and SortableAdminMixin don't work together.
# as sortableadmin contains a bug causing double ID's, we're prefering importexportmodeladmin. Makes ordering
# awful to handle. But we can manage by changing that to buttons that sort of work.
class ConfigurationAdmin(
    ImportExportModelAdmin,
    admin.ModelAdmin,
    SortableAdminMixin,
):

    list_display = (
        "display_order",
        "country",
        "organization_type",
        "is_displayed",
        "is_the_default_option",
        "is_scanned",
        "is_reported",
    )
    search_fields = [
        "country",
        "organization_type",
    ]
    list_filter = [
        "country",
        "organization_type",
        "is_displayed",
        "is_the_default_option",
        "is_scanned",
        "is_reported",
    ][::-1]
    fields = (
        "display_order",
        "country",
        "organization_type",
        "is_displayed",
        "is_the_default_option",
        "is_scanned",
        "is_reported",
    )

    def save_model(self, request, obj, form, change):
        # allows to place a new item at the right position, moving certain items up.
        # if the display_order is zero, autoplace the item.

        if obj.display_order:
            super().save_model(request, obj, form, change)

        else:
            country_exists = (
                models.Configuration.objects.all().filter(country=obj.country).order_by("-display_order").first()
            )

            if country_exists:
                # if there is something form this country, place it where it belongs and move the rest +=1
                new_number = country_exists.display_order + 1

                moveup = (
                    models.Configuration.objects.all().filter(display_order__gte=new_number).order_by("-display_order")
                )
                for config in moveup:
                    config.display_order += 1
                    config.save()

                obj.display_order = country_exists.display_order + 1
                super().save_model(request, obj, form, change)

            else:
                # if there is nothing from this country, then add it at the end.
                tmp = models.Configuration.objects.all().order_by("-display_order").first()

                if tmp:
                    obj.display_order = tmp.display_order + 1
                else:
                    obj.display_order = 1

                super().save_model(request, obj, form, change)

    actions = []

    def display(self, request, queryset):

        for configuration in queryset:
            configuration.is_displayed = True
            configuration.save()

    display.short_description = "☀️ Display"
    actions.append(display)

    def hide(self, request, queryset):

        for configuration in queryset:
            configuration.is_displayed = False
            configuration.save()

    hide.short_description = "🌑 Hide"
    actions.append(hide)

    def allow_scanning(self, request, queryset):

        for configuration in queryset:
            configuration.is_scanned = True
            configuration.save()

    allow_scanning.short_description = "❤️  Allow scanning"
    actions.append(allow_scanning)

    def stop_scanning(self, request, queryset):

        for configuration in queryset:
            configuration.is_scanned = False
            configuration.save()

    stop_scanning.short_description = "💔  Stop scanning"
    actions.append(stop_scanning)

    def allow_reporting(self, request, queryset):

        for configuration in queryset:
            configuration.is_reported = True
            configuration.save()

    allow_reporting.short_description = "📄️  Allow Reporting"
    actions.append(allow_reporting)

    def stop_reporting(self, request, queryset):

        for configuration in queryset:
            configuration.is_reported = False
            configuration.save()

    stop_reporting.short_description = "📄  Stop Reporting"
    actions.append(stop_reporting)

    def create_report(self, request, queryset):

        for configuration in queryset:

            organization_filter = {"country": configuration.country, "type": configuration.organization_type}

            log.debug(organization_filter)
            task = compose_task(organizations_filter=organization_filter)
            task.apply_async()

        self.message_user(request, "Reports are being generated in the background.")

    create_report.short_description = "📄  Report"
    actions.append(create_report)

    def set_default(self, request, queryset):

        for configuration in queryset:

            models.Configuration.objects.all().update(is_the_default_option=False)

            configuration.is_the_default_option = True
            configuration.save()

    set_default.short_description = "😀  Set default"
    actions.append(set_default)

    def remove_default(self, request, queryset):

        for configuration in queryset:
            configuration.is_the_default_option = False
            configuration.save()

    remove_default.short_description = "😭  Remove default"
    actions.append(remove_default)

    def reorder(self, request, queryset):

        first_order = None

        for configuration in queryset:

            # set the first order, and keep counting from that.
            if not first_order:
                if not int(configuration.display_order):
                    first_order = 0
                    configuration.display_order = 0
                else:
                    first_order = configuration.display_order
            else:
                # second and more
                first_order += 1
                configuration.display_order = first_order

            configuration.save()

    reorder.short_description = "Reorder"
    actions.append(reorder)

    @transaction.atomic
    def move_up(self, request, queryset):

        for configuration in queryset:
            next_config = (
                models.Configuration.objects.all()
                .order_by("-display_order")
                .filter(display_order__lt=configuration.display_order)
                .first()
            )
            if not next_config:
                log.debug("No next one")
                continue

            log.debug("Moving up: %s to %s" % (configuration.display_order, next_config.display_order))
            tmp = deepcopy(next_config.display_order)
            next_config.display_order = configuration.display_order
            configuration.display_order = tmp

            next_config.save(update_fields=["display_order"])
            configuration.save(update_fields=["display_order"])

    move_up.short_description = "Move Up"
    actions.append(move_up)

    @transaction.atomic
    def move_down(self, request, queryset):

        for configuration in reversed(queryset):
            previous_config = (
                models.Configuration.objects.all()
                .order_by("display_order")
                .filter(display_order__gt=configuration.display_order)
                .first()
            )
            if not previous_config:
                log.debug("No previous one")
                continue

            log.debug("Moving down: %s to %s" % (configuration.display_order, previous_config.display_order))
            tmp = deepcopy(previous_config.display_order)
            previous_config.display_order = configuration.display_order
            configuration.display_order = tmp

            previous_config.save(update_fields=["display_order"])
            configuration.save(update_fields=["display_order"])

    move_down.short_description = "Move Down"
    actions.append(move_down)


@admin.register(models.MapHealthReport)
class MapHealthReportAdmin(ImportExportModelAdmin, admin.ModelAdmin):
    list_display = (
        "map_configuration",
        "at_when",
        "percentage_up_to_date",
        "percentage_out_of_date",
        "outdate_period_in_hours",
    )

    search_fields = [
        "map_configuration__country",
        "map_configuration__organization_type__name",
    ]

    list_filter = [
        "percentage_up_to_date",
        "percentage_out_of_date",
        "outdate_period_in_hours",
        "map_configuration__country",
        "map_configuration__organization_type__name",
        "at_when",
    ][::-1]
    fields = (
        "map_configuration",
        "at_when",
        "percentage_up_to_date",
        "percentage_out_of_date",
        "outdate_period_in_hours",
        "detailed_report",
    )


@admin.register(models.MapDataCache)
class MapDataCacheAdmin(ImportExportModelAdmin, admin.ModelAdmin):
    def get_queryset(self, request):
        # textfields are extremely slow in mysql. Prevent the text field.
        qs = super(MapDataCacheAdmin, self).get_queryset(request)
        qs = qs.defer("dataset")
        return qs

    list_display = ("pk", "country", "organization_type", "filters", "at_when")
    list_filter = ["country", "organization_type", "filters", "at_when"][::-1]
    search_fields = ["country", "organization_type", "filters"]

    readonly_fields = ["cached_on"]

    actions = []

    def instant_cache_clear(self, request, queryset):
        MapDataCache.objects.all().delete()

    instant_cache_clear.short_description = "Clear all caches (select 1)"
    actions.append(instant_cache_clear)


@admin.register(models.VulnerabilityStatistic)
class VulnerabilityStatisticAdmin(ImportExportModelAdmin, admin.ModelAdmin):
    list_display = (
        "country",
        "organization_type",
        "scan_type",
        "at_when",
        "high",
        "medium",
        "low",
        "urls",
        "ok_urls",
        "endpoints",
        "ok_endpoints",
        "ok",
    )
    list_filter = ["country", "organization_type", "scan_type", "at_when", "high", "medium", "low"][::-1]
    search_fields = ["country", "organization_type", "scan_type"]

    actions = []

    def instant_cache_clear(self, request, queryset):
        VulnerabilityStatistic.objects.all().delete()

    instant_cache_clear.short_description = "Clear all caches (select 1)"
    actions.append(instant_cache_clear)


@admin.register(models.HighLevelStatistic)
class HighLevelStatisticAdmin(ImportExportModelAdmin, admin.ModelAdmin):

    list_display = (
        "country",
        "organization_type",
        "at_when",
    )
    list_filter = ["country", "organization_type", "at_when"][::-1]
    search_fields = ["country", "organization_type"]

    actions = []

    def instant_cache_clear(self, request, queryset):
        HighLevelStatistic.objects.all().delete()

    instant_cache_clear.short_description = "Clear all caches (select 1)"
    actions.append(instant_cache_clear)


@admin.register(models.LandingPage)
class LandingPageAdmin(ImportExportModelAdmin, admin.ModelAdmin):

    list_display = ("map_configuration", "directory", "enabled")
    list_filter = ["map_configuration", "directory", "enabled"][::-1]
    search_fields = ["map_configuration", "directory", "enabled"]


@admin.register(models.OrganizationReport)
class OrganizationReportAdmin(ImportExportModelAdmin, admin.ModelAdmin):

    # do NOT load the calculation field, as that will be slow.
    # https://stackoverflow.com/questions/34774028/how-to-ignore-loading-huge-fields-in-django-admin-list-display
    def get_queryset(self, request):
        qs = super(OrganizationReportAdmin, self).get_queryset(request)

        # tell Django to not retrieve mpoly field from DB
        qs = qs.defer("calculation")
        return qs

    def inspect_organization(self, obj):
        return format_html(
            '<a href="../../organizations/organization/{id}/change">inspect organization</a>',
            id=format(obj.organization_id),
        )

    list_display = (
        "organization",
        "total_urls",
        "ok_urls",
        "total_endpoints",
        "ok_endpoints",
        "high",
        "medium",
        "low",
        "ok",
        "explained_high",
        "explained_medium",
        "explained_low",
        "at_when",
        "inspect_organization",
    )
    search_fields = ["organization__name", "at_when"]
    list_filter = ["organization", "organization__country", "organization__type__name", "at_when"][::-1]
    # fields = [field.name for field in OrganizationRating._meta.get_fields() if field.name != "id"][::-1]

    fields = (
        "organization",
        "total_urls",
        "total_endpoints",
        "high",
        "medium",
        "low",
        "ok",
        "high_urls",
        "medium_urls",
        "low_urls",
        "ok_urls",
        "high_endpoints",
        "medium_endpoints",
        "low_endpoints",
        "ok_endpoints",
        "total_url_issues",
        "url_issues_high",
        "url_issues_medium",
        "url_issues_low",
        "url_ok",
        "total_endpoint_issues",
        "endpoint_issues_high",
        "endpoint_issues_medium",
        "endpoint_issues_low",
        "endpoint_ok",
        "explained_high",
        "explained_medium",
        "explained_low",
        "explained_high_urls",
        "explained_medium_urls",
        "explained_low_urls",
        "explained_high_endpoints",
        "explained_medium_endpoints",
        "explained_low_endpoints",
        "explained_total_url_issues",
        "explained_url_issues_high",
        "explained_url_issues_medium",
        "explained_url_issues_low",
        "explained_total_endpoint_issues",
        "explained_endpoint_issues_high",
        "explained_endpoint_issues_medium",
        "explained_endpoint_issues_low",
        "at_when",
        "calculation",
    )

    ordering = ["-at_when"]

    save_as = True