18F/domain-scan

View on GitHub
scanners/pagedata.py

Summary

Maintainability
D
1 day
Test Coverage
import logging
import os
import re
import resource
import time
import urllib.request
from urllib.parse import urlparse

import ijson
import requests

###
# Very simple scanner that gets some basic info from a list of pages on a domain.


# Set a default number of workers for a particular scan type.
# Overridden by a --workers flag. XXX not actually overridden?
workers = 30


user_agent = os.environ.get("PAGEDATA_USER_AGENT", "18F/domain-scan/pagedata.py")


# This is the list of pages that we will be checking.
pages = [
    "/",
    "/code.json",
    "/coronavirus",
    "/data.json",
    "/data",
    "/developer",
    "/digitalstrategy",
    "/open",
    "/privacy",
    "/robots.txt",
    "/sitemap.xml",
    "/cj",
    "/digitalstrategy/datacenteroptimizationstrategicplan.json",
    "/digitalstrategy/FITARAmilestones.json",
    "/digitalstrategy/governanceboards.json",
    "/digitalstrategy/costsavings.json",
    "/digitalstrategy/bureaudirectory.json",
    "/redirecttest-foo-bar-baz",
]


# Optional one-time initialization for all scans.
# If defined, any data returned will be passed to every scan instance and used
# to update the environment dict for that instance
# Will halt scan execution if it returns False or raises an exception.
#
# Run locally.
def init(environment: dict, options: dict) -> dict:
    logging.debug("Init function.")
    return {'pages': pages}


# Required scan function. This is the meat of the scanner, where things
# that use the network or are otherwise expensive would go.
#
# Runs locally or in the cloud (Lambda).
def scan(domain: str, environment: dict, options: dict) -> dict:
    logging.debug("Scan function called with options: %s" % options)

    results = {}

    # Perform the "task".
    for page in environment['pages']:
        url = "https://" + domain + page
        results[page] = {}
        results[page]['opendata_conforms_to'] = ''
        results[page]['codegov_measurementtype'] = ''
        results[page]['json_items'] = str(0)

        headers = {
            'User-Agent': user_agent,
        }

        # try the query and store the responsecode
        try:
            response = requests.head(url, allow_redirects=True, timeout=4, headers=headers)
            results[page]['responsecode'] = str(response.status_code)
        except Exception:
            logging.debug("could not get data from %s%s", domain, page)
            results[page]['responsecode'] = '-1'

        # if it's supposed to be json, try parsing it as a stream
        if page.endswith('.json'):
            counter = 0
            try:
                req = urllib.request.Request(url, headers=headers)
                with urllib.request.urlopen(req, timeout=5) as jsondata:
                    try:
                        parser = ijson.parse(jsondata)
                        for prefix, event, value in parser:
                            # As a catchall, indicate how many items are in the json doc
                            if event == 'string':
                                counter = counter + 1

                            # see if there is a 'conformsTo' field, which indicates that it might
                            # be open-data compliant.
                            if prefix.endswith('.conformsTo') or prefix.endswith('.conformsto'):
                                results[page]['opendata_conforms_to'] = ' '.join([value, results[page]['opendata_conforms_to']])

                            # see if there is a 'measurementType' field, which indicates that it might
                            # be code.gov compliant.  Taken from https://code.gov/about/compliance/inventory-code
                            if prefix.endswith('.measurementType') or prefix.endswith('.measurementtype'):
                                results[page]['codegov_measurementtype'] = ' '.join([value, results[page]['codegov_measurementtype']])
                            if prefix.endswith('measurementType.method') or prefix.endswith('measurementtype.method'):
                                results[page]['codegov_measurementtype'] = ' '.join([value, results[page]['codegov_measurementtype']])
                            if prefix.endswith('measurementType.ifOther') or prefix.endswith('measurementtype.ifOther'):
                                results[page]['codegov_measurementtype'] = ' '.join([value, results[page]['codegov_measurementtype']])

                        results[page]['json_items'] = str(counter)
                        logging.debug('memory usage after parsing json for %s: %d', url, resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
                    except Exception:
                        logging.debug('error parsing json for %s', url)
            except Exception:
                logging.debug('could not open %s', url)

        # Get the content-type
        try:
            results[page]['content_type'] = str(response.headers['Content-Type'])
        except Exception:
            results[page]['content_type'] = ''

        # get the content-length
        try:
            results[page]['content_length'] = str(response.headers['Content-Length'])
        except Exception:
            # sometimes cloudfront seems to have errors or cache misses, so let's try again
            try:
                # sleep a bit to let it have time to cache the page
                time.sleep(0.01)
                newresponse = requests.head(url, allow_redirects=True, timeout=4)
                results[page]['content_length'] = str(newresponse.headers['Content-Length'])
            except Exception:
                results[page]['content_length'] = ''

        # This is the final url that we ended up at, in case of redirects.
        try:
            results[page]['final_url_in_same_domain'] = False
            results[page]['final_url'] = response.url
            if urlparse(response.url).hostname.endswith(domain):
                results[page]['final_url_in_same_domain'] = True
        except Exception:
            results[page]['final_url'] = ''

        # get the page if it's the /data page so that we can scrape it
        if page == '/data':
            try:
                response = requests.get(url, allow_redirects=True, timeout=5, headers=headers)

                # check for "chief data officer"
                try:
                    res = re.findall(r'chief data officer', response.text, flags=re.IGNORECASE)
                    if res:
                        results[page]['contains_chiefdataofficer'] = True
                    else:
                        results[page]['contains_chiefdataofficer'] = False
                except Exception:
                    results[page]['contains_chiefdataofficer'] = False

                # check for "Charter"
                try:
                    res = re.findall(r'Charter', response.text, flags=re.IGNORECASE)
                    if res:
                        results[page]['contains_charter'] = True
                    else:
                        results[page]['contains_charter'] = False
                except Exception:
                    results[page]['contains_charter'] = False
            except Exception:
                logging.debug("got error while scraping %s", domain)

        logging.debug('memory usage after page %s: %d', url, resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)

    logging.debug('memory usage for pagedata %s: %d', "https://" + domain, resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
    logging.warning("pagedata %s Complete!", domain)

    return results


# Required CSV row conversion function. Usually one row, can be more.
#
# Run locally.
def to_rows(data):
    row = []
    for page in headers:
        row.extend([data[page]])
    return [row]


# CSV headers for each row of data. Referenced locally.
headers = pages