18F/domain-scan

View on GitHub
scanners/a11y.py

Summary

Maintainability
C
1 day
Test Coverage
import json
import logging
import os
import requests
import yaml

from utils import utils


workers = 3
pa11y = os.environ.get("PA11Y_PATH", "pa11y")

redirects = {}
config = ""


def init(environment, options):
    global redirects
    global config
    cache_dir = options.get("_", {}).get("cache_dir", "./cache")

    redirects_file = options.get("a11y_redirects")
    config_file = options.get("a11y_config")

    # Parse redirects
    if redirects_file:
        if not redirects_file.endswith(".yml"):
            logging.error("--a11y_redirects should be a YML file")
            return False

        # if remote, try to download
        if redirects_file.startswith("http:") or redirects_file.startswith("https:"):
            redirects_path = os.path.join(cache_dir, "a11y_redirects.yml")

            try:
                response = requests.get(redirects_file)
                utils.write(response.text, redirects_path)
            except:
                logging.error("--a11y_redirects URL not downloaded successfully.")
                return False

        # Otherwise, read it off the disk
        else:
            redirects_path = redirects_file

            if (not os.path.exists(redirects_path)):
                logging.error("--a11y_redirects file not found.")
                return False

        with open(redirects_path, 'r') as f:
            redirects = yaml.load(f)

    # Get config
    if config_file:
        if not config_file.endswith(".json"):
            logging.error("--a11y_config should be a json file")
            return False

        # if remote, try to download
        if config_file.startswith("http:") or config_file.startswith("https:"):
            config_path = os.path.join(cache_dir, "a11y_config.json")

            try:
                response = requests.get(config_file)
                utils.write(response.text, config_path)
            except:
                logging.error("--a11y_config URL not downloaded successfully.")
                return False

        config = config_path
    return True


# If we have pshtt data, use it to skip some domains. If redirect
# data says so, adjust scan URL for some domains.
def init_domain(domain, environment, options):
    cache_dir = options.get("_", {}).get("cache_dir", "./cache")
    # If we've got pshtt data, use it to cut down work.
    if (
        utils.domain_is_redirect(domain, cache_dir=cache_dir) or
        utils.domain_not_live(domain, cache_dir=cache_dir)
    ):
        logging.debug("\tSkipping a11y scan based on pshtt data.")
        return False

    # Use redirect/blacklist data to adjust (or stop) scan URL.
    url = get_url_to_scan(domain)

    if not url:
        logging.debug("\tSkipping a11y scan based on redirect/blacklist data.")
        return False

    # Send adjusted URL to scan function.
    return {'url': url}


# Shell out to a11y and run the scan.
def scan(domain, environment, options):
    url = environment.get("url", domain)

    errors = run_a11y_scan(url)

    return {
        'url': url,
        'errors': errors
    }


def to_rows(data):
    rows = []

    for error in data['errors']:
        rows.append([
            data['url'],
            error['typeCode'],
            error['code'],
            error['message'],
            error['context'],
            error['selector']
        ])

    return rows


headers = [
    "redirectedTo",
    "typeCode",
    "code",
    "message",
    "context",
    "selector"
]


def run_a11y_scan(domain):
    command = [pa11y, domain, "--reporter", "json", "--level", "none", "--timeout", "300000"]

    if config:
        command += ["--config", config]

    raw = utils.scan(command)

    if not raw or raw == '[]\n':
        results = [{
            'typeCode': '',
            'code': '',
            'message': '',
            'context': '',
            'selector': '',
            'type': ''
        }]
    else:
        results = json.loads(raw)

    return results


def get_url_to_scan(domain):
    global redirects

    url_to_scan = None

    # Redirects can be blacklists or redirects.
    if domain in redirects:
        # blacklist will force a domain to be skipped
        if redirects[domain]['blacklist']:
            url_to_scan = None
        # otherwise, scan will be redirected to new location
        else:
            url_to_scan = redirects[domain]['redirect']

    # Otherwise, leave domain alone.
    else:
        url_to_scan = domain

    return url_to_scan