failmap/admin

View on GitHub
websecmap/map/logic/rss_feeds.py

Summary

Maintainability
A
3 hrs
Test Coverage
from datetime import datetime

import pytz
from constance import config
from django.contrib.humanize.templatetags.humanize import naturaltime
from django.contrib.syndication.views import Feed
from django.core.exceptions import ObjectDoesNotExist
from django.utils.translation import ugettext as _

from websecmap.map.logic.map_defaults import remark
from websecmap.organizations.models import Organization
from websecmap.reporting.severity import get_severity
from websecmap.scanners import ENDPOINT_SCAN_TYPES, URL_SCAN_TYPES
from websecmap.scanners.models import EndpointGenericScan, UrlGenericScan


class UpdatesOnOrganizationFeed(Feed):

    link = "/data/updates_on_organization_feed/"
    description = "Update feed."

    def title(self, organization_id):
        try:
            organization = Organization.objects.all().filter(pk=organization_id).get()
        except ObjectDoesNotExist:
            return "Organization Updates"

        return "%s Updates" % organization.name

    # it seems weird to do this.
    def get_object(self, request, *args, **kwargs):
        return kwargs.get("organization_id", 0)

    # second parameter via magic
    def items(self, organization_id):
        return latest_updates(organization_id).get("scans", [])

    def item_title(self, item):
        rating = (
            _("Perfect")
            if not any([item["high"], item["medium"], item["low"]])
            else _("High")
            if item["high"]
            else _("Medium")
            if item["medium"]
            else _("Low")
        )

        badge = (
            "✅"
            if not any([item["high"], item["medium"], item["low"]])
            else "🔴"
            if item["high"]
            else "🔶"
            if item["medium"]
            else "🍋"
        )

        return "%s %s - %s: %s" % (badge, rating, item["url"], item["service"])

    def item_description(self, item):
        return "%s: %s" % (_(item["scan_type"]), _(item.get("explanation", "")))

    def item_pubdate(self, item):
        return item["rating_determined_on"]

    # item_link is only needed if NewsItem has no get_absolute_url method.
    # unique links are required to properly display a feed.
    def item_link(self, item):
        return "%s/#report-%s/%s/%s/%s" % (
            config.PROJECT_WEBSITE,
            item["organization_id"],
            item["url"],
            item["service"],
            item["rating_determined_on"],
        )


class LatestScanFeed(Feed):
    """
    Setting a parameter such as self.scan_type in the get_object will cause concurrency problems.

    The manual is lacking how to get variables to the item_title and such functions: only to "items" it is somewhat
    clear. This is probably because i don't know enough python. Why would this extra parameter work at the "items"
    functions but not anywhere else? (signature issues).
    """

    description = "Overview of the latest scans."

    # magic
    def get_object(self, request, *args, **kwargs):
        # print("args: %s" % kwargs['scan_type'])
        return kwargs.get("scan_type", "")

    def title(self, scan_type: str = ""):
        if scan_type:
            return "%s Scan Updates" % scan_type
        else:
            return "Vulnerabilities Feed"

    def link(self, scan_type: str = ""):
        if scan_type:
            return "/data/feed/%s" % scan_type
        else:
            return "/data/feed/"

    # second parameter via magic
    def items(self, scan_type):
        # print(scan_type)
        if scan_type in ENDPOINT_SCAN_TYPES:
            return EndpointGenericScan.objects.filter(type=scan_type).order_by("-last_scan_moment")[0:30]

        if scan_type in URL_SCAN_TYPES:
            return UrlGenericScan.objects.filter(type=scan_type).order_by("-last_scan_moment")[0:30]

    def item_title(self, item):
        calculation = get_severity(item)
        if not calculation:
            return ""

        rating = (
            _("Perfect")
            if not any([calculation["high"], calculation["medium"], calculation["low"]])
            else _("High")
            if calculation["high"]
            else _("Medium")
            if calculation["medium"]
            else _("Low")
        )

        badge = (
            "✅"
            if not any([calculation["high"], calculation["medium"], calculation["low"]])
            else "🔴"
            if calculation["high"]
            else "🔶"
            if calculation["medium"]
            else "🍋"
        )

        if item.type in URL_SCAN_TYPES:
            # url generic scan:
            return "%s %s - %s" % (badge, rating, item.url.url)
        else:
            # endpoint scan
            return "%s %s - %s" % (badge, rating, item.endpoint.url.url)

    def item_description(self, item):
        calculation = get_severity(item)
        return _(calculation.get("explanation", ""))

    def item_pubdate(self, item):
        return item.last_scan_moment

    # item_link is only needed if NewsItem has no get_absolute_url method.
    def item_link(self, item):
        if item.type in URL_SCAN_TYPES:
            # url generic scan:
            return "%s/#updates/%s/%s" % (config.PROJECT_WEBSITE, item.last_scan_moment, item.url.url)
        else:
            # endpoint scan
            return "%s/#updates/%s/%s" % (config.PROJECT_WEBSITE, item.last_scan_moment, item.endpoint.url.url)


def latest_updates(organization_id):
    """

    :param request:
    :param organization_id: the id will always be "correct", whereas name will have all kinds of terribleness:
    multiple organizations that have the same name in different branches, organizations with generic names etc.
    Finding an organization by name is tricky. Therefore ID.

    We're not filtering any further: given this might result in turning a blind eye to low or medium vulnerabilities.
    :return:
    """

    try:
        # todo: check that the organization is displayed on the map
        organization = Organization.objects.all().filter(pk=organization_id).get()
    except ObjectDoesNotExist:
        return {}

    dataset = {
        "scans": [],
        "render_date": datetime.now(pytz.utc).isoformat(),
        "remark": remark,
    }

    # semi-union, given not all columns are the same. (not python/django-esque solution)
    generic_endpoint_scans = list(
        EndpointGenericScan.objects.filter(
            endpoint__url__organization=organization, type__in=ENDPOINT_SCAN_TYPES
        ).order_by("-rating_determined_on")[0:60]
    )
    url_endpoint_scans = list(
        UrlGenericScan.objects.filter(url__organization=organization, type__in=URL_SCAN_TYPES).order_by(
            "-rating_determined_on"
        )[0:60]
    )

    scans = generic_endpoint_scans + url_endpoint_scans

    scans = sorted(scans, key=lambda k: getattr(k, "rating_determined_on", datetime.now(pytz.utc)), reverse=True)

    for scan in scans:
        scan_type = scan.type
        calculation = get_severity(scan)
        if scan_type in URL_SCAN_TYPES:
            # url scans
            dataset["scans"].append(
                {
                    "organization": organization.name,
                    "organization_id": organization.pk,
                    "url": scan.url.url,
                    "service": "%s" % scan.url.url,
                    "protocol": scan_type,
                    "port": "",
                    "ip_version": "",
                    "scan_type": scan_type,
                    "explanation": calculation.get("explanation", ""),  # sometimes you dont get one.
                    "high": calculation.get("high", 0),
                    "medium": calculation.get("medium", 0),
                    "low": calculation.get("low", 0),
                    "rating_determined_on_humanized": naturaltime(scan.rating_determined_on),
                    "rating_determined_on": scan.rating_determined_on,
                    "last_scan_humanized": naturaltime(scan.last_scan_moment),
                    "last_scan_moment": scan.last_scan_moment.isoformat(),
                }
            )

        else:
            # endpoint scans
            dataset["scans"].append(
                {
                    "organization": organization.name,
                    "organization_id": organization.pk,
                    "url": scan.endpoint.url.url,
                    "service": "%s/%s (IPv%s)" % (scan.endpoint.protocol, scan.endpoint.port, scan.endpoint.ip_version),
                    "protocol": scan.endpoint.protocol,
                    "port": scan.endpoint.port,
                    "ip_version": scan.endpoint.ip_version,
                    "scan_type": scan_type,
                    "explanation": calculation.get("explanation", ""),  # sometimes you dont get one.
                    "high": calculation.get("high", 0),
                    "medium": calculation.get("medium", 0),
                    "low": calculation.get("low", 0),
                    "rating_determined_on_humanized": naturaltime(scan.rating_determined_on),
                    "rating_determined_on": scan.rating_determined_on,
                    "last_scan_humanized": naturaltime(scan.last_scan_moment),
                    "last_scan_moment": scan.last_scan_moment.isoformat(),
                }
            )

    return dataset