18F/domain-scan

View on GitHub
gather

Summary

Maintainability
Test Coverage
#!/usr/bin/env python3

import os
import glob
import sys
import re
import csv
import requests
import logging
import importlib

from utils import utils

# some metadata about the scan itself
start_time = utils.local_now()
start_command = str.join(" ", sys.argv)

# Applied if --ignore-www is enabled.
strip_www = re.compile("^www\.")

# Applied to all domains.
strip_protocol = re.compile("^https?://")
strip_wildcard = re.compile("^(\*.)+")
strip_redacted = re.compile("^(\?\.)+")


def run(options=None, cache_dir="./cache", results_dir="./results"):

    sources = options["gatherers"]

    suffixes = options.get("suffix")
    suffix_pattern = utils.suffix_pattern(suffixes)

    # Clear out existing result CSVs, to avoid inconsistent data.
    for result in glob.glob("%s/*.csv" % results_dir):
        os.remove(result)

    # Opt in to include parent (second-level) domains.
    include_parents = options.get("include_parents", False)

    # Opt into stripping www. prefixes from hostnames, effectively
    # collapsing www.[host] and [host] into one record.
    ignore_www = options.get("ignore_www", False)

    # --parents should be a CSV whose first column is parent domains
    # that will act as a whitelist for which subdomains to gather.
    parents = get_parent_domains(options, cache_dir=cache_dir)

    # De-duping hostnames. This will cause the system to hold all
    # hostnames in memory at once, but oh well.
    hostnames_cache = {}

    for source in sources:
        extra = {}

        try:
            gatherer_module = importlib.import_module(
                "gatherers.%s" % source)
            gatherer = gatherer_module.Gatherer(suffixes, options, extra)
        except ImportError:
            # If it's not a registered module, allow it to be "hot registered"
            # as long as the user gave us a flag with that name that can be
            # used as the --url option to the URL module.
            if options.get(source):
                gatherer_module = importlib.import_module("gatherers.url")
                extra['name'] = source
                gatherer = gatherer_module.Gatherer(suffixes, options, extra)
            else:
                exc_type, exc_value, exc_traceback = sys.exc_info()
                logging.error("[%s] Gatherer not found, or had an error during loading.\n\tERROR: %s\n\t%s" % (source, exc_type, exc_value))
                exit(1)

        # Iterate over each hostname.
        for domain in gatherer.gather():

            # Always apply the suffix filter to returned names.
            if not suffix_pattern.search(domain):
                continue

            # Strip off whitespace before pre-processing.
            domain = domain.strip()

            # Cut off protocols, if present.
            domain = strip_protocol.sub("", domain)

            # Cut naive wildcard prefixes out. (from certs)
            domain = strip_wildcard.sub("", domain)

            # Cut off any redaction markers from names. (from certs)
            domain = strip_redacted.sub("", domain)

            # Strip www. prefixes from hostnames, effectively
            # collapsing www.[host] and [host] into one record.
            if ignore_www:
                domain = strip_www.sub("", domain)

            # Strip off whitespace after pre-processing.
            domain = domain.strip()

            base = utils.base_domain_for(domain)

            # Unless --include-parents is specified, exclude them.
            if not include_parents:
                # Always ignore www prefixes for base domains.
                if (domain == base) or (domain == "www.%s" % base):
                    continue

            # Apply --parent domain whitelist, if present.
            if parents:
                if base not in parents:
                    continue

            # Use hostname cache to de-dupe, if seen before.
            if domain not in hostnames_cache:
                hostnames_cache[domain] = [source]
            elif source not in hostnames_cache[domain]:
                hostnames_cache[domain] += [source]

    # Now that we've gone through all sources and logged when each
    # domain appears in each one, go through cache and write
    # all of them to disk.

    # Assemble headers.
    headers = ["Domain", "Base Domain"]
    # Add headers dynamically for each source.
    headers += sources

    # Open CSV file.
    gathered_filename = "%s/%s.csv" % (results_dir, "gathered")
    gathered_file = open(gathered_filename, 'w', newline='')
    gathered_writer = csv.writer(gathered_file)
    gathered_writer.writerow(headers)

    # Write each hostname to disk, with all discovered sources.
    hostnames = list(hostnames_cache.keys())
    hostnames.sort()

    for hostname in hostnames:
        base = utils.base_domain_for(hostname)
        row = [hostname, base]
        for source in sources:
            row += [source in hostnames_cache[hostname]]
        gathered_writer.writerow(row)

    # Close CSV file.
    gathered_file.close()

    # If sort requested, sort in place by domain.
    if options.get("sort"):
        utils.sort_csv(gathered_filename)

    logging.warning("Results written to CSV.")

    # Save metadata.
    end_time = utils.local_now()
    metadata = {
        'start_time': utils.utc_timestamp(start_time),
        'end_time': utils.utc_timestamp(end_time),
        'command': start_command
    }
    utils.write(utils.json_for(metadata), "%s/meta.json" % results_dir)


# Read in parent domains from the first column of a given CSV.
def get_parent_domains(options, cache_dir="./cache"):
    parents = options.get("parents")
    if not parents:
        return None

    # If --parents is a URL, we want to download it now,
    # and then adjust the value to be the path of the cached download.
    if parents.startswith("http:") or parents.startswith("https:"):

        # Though it's saved in cache/, it will be downloaded every time.
        parents_path = os.path.join(cache_dir, "parents.csv")

        try:
            response = requests.get(parents)
            utils.write(response.text, parents_path)
        except:
            logging.error("Parent domains URL not downloaded successfully.")
            print(utils.format_last_exception())
            exit(1)

        parents = parents_path

    parent_domains = []
    with open(parents, encoding='utf-8', newline='') as csvfile:
        for row in csv.reader(csvfile):
            if (not row[0]) or (row[0].lower() == "domain") or (row[0].lower() == "domain name"):
                continue
            parent_domains.append(row[0].lower())

    return parent_domains


if __name__ == '__main__':
    options = utils.options_for_gather()
    utils.configure_logging(options)

    # Support --output flag for changing where cache/ and results/ go.
    cache_dir = utils.cache_dir(options)
    results_dir = utils.results_dir(options)
    utils.mkdir_p(cache_dir)
    utils.mkdir_p(results_dir)

    run(options, cache_dir=cache_dir, results_dir=results_dir)