websecmap/scanners/scanner/autoexplain_trust_microsoft.py
import logging
import ssl
from datetime import datetime, timedelta
from typing import List
from celery import group
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509.oid import NameOID
from django.db.models import Q
from websecmap.celery import app
from websecmap.organizations.models import Url
from websecmap.scanners import plannedscan
from websecmap.scanners.autoexplain import add_bot_explanation
from websecmap.scanners.models import EndpointGenericScan, Endpoint
from websecmap.scanners.scanner import unique_and_random, finish_those_that_wont_be_scanned
log = logging.getLogger(__package__)
SCANNER = "autoexplain_trust_microsoft"
applicable_subdomains = {
"lyncdiscover": ["*.online.lync.com", "meet.lync.com", "*.infra.lync.com", "sched.lync.com", "*.lync.com"],
"sip": ["sipfed.online.lync.com", "*.online.lync.com", "*.infra.lync.com", "*.lync.com"],
"enterpriseenrollment": [
"manage.microsoft.com",
"admin.manage.microsoft.com",
"EnterpriseEnrollment-s.manage.microsoft.com",
"r.manage.microsoft.com",
"p.manage.microsoft.com",
"i.manage.microsoft.com",
"a.manage.microsoft.com",
],
"enterpriseregistration": ["*.enterpriseregistration.windows.net", "enterpriseregistration.windows.net"],
"msoid": [
"*.accesscontrol.windows.net",
"*.accesscontrol.windows-ppe.net",
"*.b2clogin.com",
"*.cpim.windows.net",
"*.microsoftaik.azure.net",
"*.microsoftaik-int.azure-int.net",
"*.windows-ppe.net",
"aadg.windows.net",
"aadgv6.ppe.windows.net",
"aadgv6.windows.net",
"account.live.com",
"account.live-int.com",
"api.password.ccsctp.com",
"api.passwordreset.microsoftonline.com",
"autologon.microsoftazuread-sso.com",
"becws.ccsctp.com",
"clientconfig.microsoftonline-p.net",
"clientconfig.microsoftonline-p-int.net",
"companymanager.ccsctp.com",
"companymanager.microsoftonline.com",
"cpim.windows.net",
"device.login.microsoftonline.com",
"device.login.windows-ppe.net",
"directoryproxy.ppe.windows.net",
"directoryproxy.windows.net",
"graph.ppe.windows.net",
"graph.windows.net",
"graphstore.windows.net",
"login.live.com",
"login.live-int.com",
"login.microsoft.com",
"login.microsoftonline.com",
"login.microsoftonline-p.com",
"login.microsoftonline-pst.com",
"login.microsoft-ppe.com",
"login.windows.net",
"logincert.microsoftonline.com",
"logincert.microsoftonline-int.com",
"login-us.microsoftonline.com",
"microsoftaik.azure.net",
"microsoftaik-int.azure-int.net",
"nexus.microsoftonline-p.com",
"nexus.microsoftonline-p-int.com",
"pas.windows.net",
"pas.windows-ppe.net",
"password.ccsctp.com",
"passwordreset.activedirectory.windowsazure.us",
"passwordreset.microsoftonline.com",
"provisioning.microsoftonline.com",
"signup.live.com",
"signup.live-int.com",
"sts.windows.net",
"xml.login.live.com",
"xml.login.live-int.com",
"*.login.microsoftonline.com",
"login.microsoftonline-int.com",
"accesscontrol.aadtst3.windows-int.net",
"*.accesscontrol.aadtst3.windows-int.net",
"api.login.microsoftonline.com",
"*.r.login.microsoftonline.com",
"*.r.login.microsoft.com",
"*.login.microsoft.com",
],
}
query = EndpointGenericScan.objects.all().filter(
type="tls_qualys_certificate_trusted",
is_the_latest_scan=True,
comply_or_explain_is_explained=False,
endpoint__protocol="https",
endpoint__is_dead=False,
rating="not trusted",
)
@app.task(queue="storage")
def plan_scan():
# todo: no url.is_dead = False?
scans = query.filter(
endpoint__url__in=get_relevant_microsoft_domains_from_database(),
)
urls = [_scan.endpoint.url for _scan in scans]
plannedscan.request(activity="scan", scanner=SCANNER, urls=unique_and_random(urls))
@app.task(queue="storage")
def compose_planned_scan_task(**kwargs):
urls = plannedscan.pickup(activity="scan", scanner=SCANNER, amount=kwargs.get("amount", 25))
return compose_scan_task(urls)
def compose_scan_task(urls):
"""
Adds explanations to microsoft specific infrastructure, which in itself is secure, but will be reported as being
not trusted. These are limited to a set of subdomains and a specific test.
We _really_ do not like exceptions like these, yet, trust is managed per device. And if the device is configured
for a certain domain: it is trusted internally, which is good enough for us.
This infra is used a lot by the dutch government, so instead of managing hundreds of exceptions by hand.
"""
# Only check this on the latest scans, do not alter existing explanations.
scans = query.filter(endpoint__url__in=urls).only("id", "endpoint__url__id")
finish_those_that_wont_be_scanned(SCANNER, scans, urls)
tasks = [
scan.si(scan_id=endpoint_generic_scan.pk)
| plannedscan.finish.si("scan", SCANNER, endpoint_generic_scan.endpoint.url.pk)
for endpoint_generic_scan in list(set(scans))
]
return group(tasks)
# todo: this needs to be split out onto several workers, now it can hinder the storage queue.
@app.task(queue="storage")
def scan(scan_id):
epgs = EndpointGenericScan.objects.all().filter(id=scan_id).first()
if not epgs:
return
certificate = retrieve_certificate(url=epgs.endpoint.url.url, port=epgs.endpoint.port)
matches_exception_policy = certificate_matches_microsoft_exception_policy(
certificate, epgs, applicable_subdomains, trusted_organization="Microsoft Corporation"
)
if not matches_exception_policy:
return
# when all checks pass, and indeed the SSL_ERROR_BAD_CERT_DOMAIN was found, the finding is explained
log.debug(f"Scan {epgs} fits all criteria to be auto explained for incorrect cert usage.")
add_bot_explanation(epgs, "trusted_on_local_device_with_custom_trust_policy", timedelta(days=365 * 10))
autoexplain_trust_microsoft_and_include_their_webserver_headers(epgs)
def get_relevant_microsoft_domains_from_database() -> List[Url]:
# Warning: only returns the url id inside the url object due to optimization.
# Fix #294: A subdomain can be sub-sub-sub domain. So perform a few more queries and get be sure that
# all subdomains are accounted for.
possible_urls = []
for subdomain in applicable_subdomains.keys():
possible_urls += list(
Url.objects.all()
.filter(Q(computed_subdomain__startswith=f"{subdomain}.") | Q(computed_subdomain=f"{subdomain}"))
.filter(is_dead=False, not_resolvable=False)
.only("id")
)
return possible_urls
def retrieve_certificate(url: str, port: int = 443) -> [x509.Certificate, None]:
try:
pem_data = ssl.get_server_certificate((url, port))
except Exception:
# One gazillion network errors and transmission issues can occur here.
return
# load_pem_x509_certificate takes bytes, not string. Vague error happens otherwise, IDE does not type check here.
return x509.load_pem_x509_certificate(pem_data.encode(), default_backend())
def certificate_matches_microsoft_exception_policy(
certificate: x509.Certificate, scan, applicable_subdomains, trusted_organization
) -> bool:
"""
It's possible to fake all checks with a self signed certificate. The reason we _still_ do it like this, is that
it will be mean headlines when a government organization issues self signed certificates in the name of Microsoft.
That would be just too funny. It's possible to check the entire trust chain with cert_chain_resolver and similar
tools. At the moment news headlines outvalue better checks :)
:param certificate:
:param scan:
:param applicable_subdomains:
:param trusted_organization:
:return:
"""
if not certificate:
log.debug(f"Could not retrieve certificate for {scan.endpoint.url.url}.")
return False
if certificate.not_valid_before > datetime.now():
log.debug(
f"Certificate for {scan.endpoint.url.url} is not valid before {certificate.not_valid_before}. "
f"Not trusted."
)
return False
if datetime.now() > certificate.not_valid_after:
log.debug(f"Certificate for {scan.endpoint.url.url} has expired {certificate.not_valid_after}. Not trusted.")
return False
# Likely subdomain:
# lyncdiscover.site.example.com
# lyncdiscover.example.com
if "." in scan.endpoint.url.computed_subdomain:
fragments = scan.endpoint.url.computed_subdomain.split(".")
microsoft_service = fragments[0]
else:
microsoft_service = scan.endpoint.url.computed_subdomain
# check if the issuer matches
# <Name(C=US,ST=Washington,L=Redmond,O=Microsoft Corporation,OU=Microsoft IT,CN=Microsoft IT TLS CA 5)>
_names = certificate.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
names = [name.value for name in _names]
if names[0] not in applicable_subdomains[microsoft_service]:
log.debug(f"Certificate for {scan.endpoint.url.url} not in accepted names, value: {names}. Not trusted.")
return False
# check if the common name or alt name of the certificate matches the subdomain
_names = certificate.issuer.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)
names = [name.value for name in _names]
if trusted_organization not in names:
log.debug(
f"Certificate for {scan.endpoint.url.url} not handed out by {trusted_organization} but by "
f"{names}. Not trusted."
)
return False
return True
def autoexplain_trust_microsoft_and_include_their_webserver_headers(a_scan):
# Find a neighboring http endpoint, also there the headers are not relevant as the same security
# protocol is used. DNS can only switch on addresses and ip-versions. But not protocol and port.
http_endpoint = Endpoint.objects.all().filter(
# We don't care about what port is applied here.
protocol="http",
ip_version=a_scan.endpoint.ip_version,
url=a_scan.endpoint.url,
is_dead=False,
url__is_dead=False,
url__not_resolvable=False,
)
relevant_endpoints = list(set(http_endpoint))
for endpoint in relevant_endpoints + [a_scan.endpoint]:
intended_for_devices = "service_intended_for_devices_not_browsers"
# Also retrieve all http security headers, they are never correct. The only thing that is actually
# tested is the encryption quality here.
header_scans = [
"http_security_header_strict_transport_security",
"http_security_header_x_content_type_options",
"http_security_header_x_frame_options",
"http_security_header_x_xss_protection",
]
for header_scan in header_scans:
latest_scan = get_latest_endpoint_scan(endpoint=endpoint, scan_type=header_scan)
if not latest_scan:
continue
if not latest_scan.comply_or_explain_explanation == intended_for_devices:
add_bot_explanation(latest_scan, intended_for_devices, timedelta(days=365 * 10))
def get_latest_endpoint_scan(endpoint, scan_type):
return (
EndpointGenericScan.objects.all()
.filter(endpoint=endpoint, is_the_latest_scan=True, type=scan_type, comply_or_explain_is_explained=False)
.first()
)