websecmap/map/logic/latest.py
from collections import defaultdict
from django.contrib.humanize.templatetags.humanize import naturaltime
from django.db.models import Count, Q
from django.utils import timezone
from websecmap.map.logic.map_defaults import get_country, get_organization_type, remark
from websecmap.map.report import PUBLISHED_ENDPOINT_SCAN_TYPES, PUBLISHED_URL_SCAN_TYPES
from websecmap.reporting.severity import get_severity
from websecmap.scanners.models import EndpointGenericScan, UrlGenericScan
def get_all_latest_scans(country, organization_type):
dataset = {
"scans": defaultdict(list),
"render_date": timezone.now().isoformat(),
"remark": remark,
}
filtered_organization_type = get_organization_type(organization_type)
filtered_country = get_country(country)
# Really get the latest, without double results that apply for multiple organizations.
# Do not show anything that is dead, on any level.
for scan_type in PUBLISHED_ENDPOINT_SCAN_TYPES:
scans = (
EndpointGenericScan.objects.filter(
type=scan_type,
is_the_latest_scan=True,
)
.annotate(
n_urls=Count(
"endpoint",
filter=Q(
endpoint__is_dead=False,
endpoint__url__not_resolvable=False,
endpoint__url__is_dead=False,
endpoint__url__organization__is_dead=False,
endpoint__url__organization__country=filtered_country,
endpoint__url__organization__type_id=filtered_organization_type,
),
)
)
.filter(n_urls__gte=1)
.order_by("-rating_determined_on")[0:6]
)
print(scans.query)
for scan in scans:
calculation = get_severity(scan)
dataset["scans"][scan_type].append(
{
"url": scan.endpoint.url.url,
"service": f"{scan.endpoint.protocol}/{scan.endpoint.port} (IPv{scan.endpoint.ip_version})",
"protocol": scan.endpoint.protocol,
"port": scan.endpoint.port,
"ip_version": scan.endpoint.ip_version,
"explanation": calculation.get("explanation", ""),
"high": calculation.get("high", 0),
"medium": calculation.get("medium", 0),
"low": calculation.get("low", 0),
"last_scan_humanized": naturaltime(scan.last_scan_moment),
"last_scan_moment": scan.last_scan_moment.isoformat(),
}
)
for scan_type in PUBLISHED_URL_SCAN_TYPES:
scans = (
UrlGenericScan.objects.filter(
type=scan_type,
is_the_latest_scan=True,
)
.annotate(
n_urls=Count(
"url",
filter=Q(
url__organization__is_dead=False,
url__organization__country=filtered_country,
url__organization__type_id=filtered_organization_type,
url__is_dead=False,
url__not_resolvable=False,
),
)
)
.filter(n_urls=1)
.order_by("-rating_determined_on")[0:6]
)
for scan in scans:
calculation = get_severity(scan)
# url scans
dataset["scans"][scan_type].append(
{
"url": scan.url.url,
"service": f"{scan.url.url}",
"protocol": scan_type,
"port": "-",
"ip_version": "-",
"explanation": calculation.get("explanation", ""),
"high": calculation.get("high", 0),
"medium": calculation.get("medium", 0),
"low": calculation.get("low", 0),
"last_scan_humanized": naturaltime(scan.last_scan_moment),
"last_scan_moment": scan.last_scan_moment.isoformat(),
}
)
return dataset