csmcallister/fed-a11y

View on GitHub
feda11y/models.py

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
import csv
from io import StringIO
import os

import ujson


def get_data(current_app):
    with open(os.path.join(current_app.static_folder, 'data.json'), 'r') as f:
        data = ujson.load(f)
    return data


def get_hist(current_app):
    with open(os.path.join(current_app.static_folder, 'hist.json'), 'r') as f:
        hist = ujson.load(f)
    return hist


def get_index_data(current_app):
    data = get_data(current_app)
    hist = get_hist(current_app)  
    
    n_domains_with_issues, n_domains_without_issues = count_issues(data)
    n_domains = n_domains_without_issues + n_domains_with_issues
    percent_inaccessible = round(
        (n_domains_with_issues / (n_domains)), 2) * 100
    n_domains = f'{n_domains:,}'
    data = dict(
        n_domains_with_issues=n_domains_with_issues, 
        n_domains_without_issues=n_domains_without_issues,
        percent_inaccessible=percent_inaccessible,
        n_domains=n_domains,
        hist=hist
    )

    return data


def count_issues(data):
    n_domains_with_issues = 0
    n_domains_without_issues = 0
    for domain in data:
        issues = ujson.loads(domain['issues']).get('issues')
        if issues:
            n_domains_with_issues += 1
        else:
            n_domains_without_issues += 1
    return n_domains_with_issues, n_domains_without_issues


def get_last_scan_date(current_app):
    with open(os.path.join(current_app.static_folder, 'hist.json'), 'r') as f:
        data = ujson.load(f)
    last_scan_date = list(data[-1].keys())[0]
    return last_scan_date


def subdomain_filter(data, agency, org, domain, subdomain):
    filtered_data = []
    for row in data:
        agency_match = row.get('agency').lower().strip() == agency
        org_match = row.get('organization').lower().strip() == org
        domain_match = domain in row.get('routeableUrl').lower().strip()
        subdomain_match = row.get('subdomain').lower().strip() == subdomain
        if all([agency_match, org_match, domain_match, subdomain_match]):
            filtered_data.append(row)
    
    return filtered_data


def domain_filter(data, agency, org, domain):
    filtered_data = []
    for row in data:
        agency_match = row.get('agency').lower().strip() == agency
        org_match = row.get('organization').lower().strip() == org
        domain_match = domain in row.get('routeableUrl').lower().strip()
        if all([agency_match, org_match, domain_match]):
            filtered_data.append(row)
    
    return filtered_data


def org_filter(data, agency, org):
    filtered_data = []
    for row in data:
        agency_match = row.get('agency').lower().strip() == agency
        org_match = row.get('organization').lower().strip() == org
        if all([agency_match, org_match]):
            filtered_data.append(row)
    
    return filtered_data


def agency_filter(data, agency):
    return [r for r in data if r.get('agency').lower().strip() == agency]


def data_filter(data, **kwargs):
    agency = kwargs.get('agency', '').lower().strip()
    org = kwargs.get('org', '').lower().strip()
    domain = kwargs.get('domain', '').lower().strip()
    subdomain = kwargs.get('subdomain', '').lower().strip()

    if not any([agency, org, domain, subdomain]):
        return data
    
    if subdomain:
        filtered_data = subdomain_filter(data, agency, org, domain, subdomain)
    elif domain:
        filtered_data = domain_filter(data, agency, org, domain)
    elif org:
        filtered_data = org_filter(data, agency, org)
    elif agency:
        filtered_data = agency_filter(data, agency)

    return filtered_data
    

def data_to_csv(data):
    parsed_items = []
    for item in data:
        parsed_item_rows = []
        parsed_item = dict(
            agency=item.get('agency', ''),
            organization=item.get('organization', ''),
            domain=item.get('domain', ''),
            subdomain=item.get('subdomain', ''),
            routeableUrl=item.get('routeableUrl', ''),
            scanDate=item.get('scanDate', '')
        )
            
        issues = ujson.loads(item.get('issues')).get('issues', [])
        for issue in issues:
            _parsed_item = parsed_item.copy()
            _parsed_item['context'] = issue.get('context', '')
            _parsed_item['code'] = issue.get('code', '')
            _parsed_item['message'] = issue.get('message', '')
            parsed_item_rows.append(_parsed_item)
        if not issues:
            parsed_item.update(dict(context='', code='', message=''))
            parsed_item_rows.append(parsed_item)
        parsed_items.extend(parsed_item_rows)
    
    headers = sorted(parsed_items[0].keys())
    csv_output = StringIO(newline='')
    dict_writer = csv.DictWriter(
        csv_output,
        headers,
        quoting=csv.QUOTE_NONNUMERIC
    )
    dict_writer.writeheader()
    dict_writer.writerows(parsed_items)
    csv_output.seek(0)
    
    return csv_output