fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/references/management/commands/loadcfda.py

Summary

Maintainability
A
0 mins
Test Coverage
B
87%
import logging
import pandas as pd

from datetime import datetime
from datetime import timezone
from time import perf_counter

from django.core.management.base import BaseCommand

from usaspending_api.common.retrieve_file_from_uri import RetrieveFileFromUri
from usaspending_api.common.retrieve_file_from_uri import SCHEMA_HELP_TEXT
from usaspending_api.common.operations_reporter import OpsReporter
from usaspending_api.references.models import Cfda


logger = logging.getLogger("script")
Reporter = OpsReporter(iso_start_datetime=datetime.now(timezone.utc).isoformat(), job_name="loadcfda.py")


DATA_CLEANING_MAP = {
    "program_title": "program_title",
    "program_number": "program_number",
    "popular_name_(020)": "popular_name",
    "federal_agency_(030)": "federal_agency",
    "authorization_(040)": "authorization",
    "objectives_(050)": "objectives",
    "types_of_assistance_(060)": "types_of_assistance",
    "uses_and_use_restrictions_(070)": "uses_and_use_restrictions",
    "applicant_eligibility_(081)": "applicant_eligibility",
    "beneficiary_eligibility_(082)": "beneficiary_eligibility",
    "credentials/documentation_(083)": "credentials_documentation",
    "preapplication_coordination_(091)": "pre_application_coordination",
    "application_procedures_(092)": "application_procedures",
    "award_procedure_(093)": "award_procedure",
    "deadlines_(094)": "deadlines",
    "range_of_approval/disapproval_time_(095)": "range_of_approval_disapproval_time",
    "appeals_(096)": "appeals",
    "renewals_(097)": "renewals",
    "formula_and_matching_requirements_(101)": "formula_and_matching_requirements",
    "length_and_time_phasing_of_assistance_(102)": "length_and_time_phasing_of_assistance",
    "reports_(111)": "reports",
    "audits_(112)": "audits",
    "records_(113)": "records",
    "account_identification_(121)": "account_identification",
    "obligations_(122)": "obligations",
    "range_and_average_of_financial_assistance_(123)": "range_and_average_of_financial_assistance",
    "program_accomplishments_(130)": "program_accomplishments",
    "regulations__guidelines__and_literature_(140)": "regulations_guidelines_and_literature",
    "regional_or__local_office_(151)": "regional_or_local_office",
    "headquarters_office_(152)": "headquarters_office",
    "website_address_(153)": "website_address",
    "related_programs_(160)": "related_programs",
    "examples_of_funded_projects_(170)": "examples_of_funded_projects",
    "criteria_for_selecting_proposals_(180)": "criteria_for_selecting_proposals",
    "url": "url",
    "recovery": "recovery",
    "omb_agency_code": "omb_agency_code",
    "omb_bureau_code": "omb_bureau_code",
    "published_date": "published_date",
    "archived_date": "archived_date",
}


class Command(BaseCommand):

    help = "Load new CFDA data into references_cfda from the provided source CSV file"

    def add_arguments(self, parser):
        arg_help = "A RFC URL to the CFDA data file. ({})"
        parser.add_argument("cfda-data-uri", type=str, help=arg_help.format(SCHEMA_HELP_TEXT))

    def handle(self, *args, **options):
        logger.info("Loading data into pandas DataFrames")
        start = perf_counter()
        external_data_df = load_from_url(options["cfda-data-uri"])
        database_df = load_cfda_table_into_pandas()

        logger.info("Remodeling DataFrames for comparison")
        external_data_df = fully_order_pandas_dataframe(external_data_df, "program_number")
        database_df = fully_order_pandas_dataframe(database_df, "program_number")

        logger.info("Comparing DataFrames")
        raise_status_code_3 = not load_cfda(database_df, external_data_df)

        Reporter["duration"] = perf_counter() - start
        Reporter["end_status"] = 3 if raise_status_code_3 else 0

        logger.info(Reporter.json_dump())
        if raise_status_code_3:
            raise SystemExit(3)


def load_from_url(rfc_path_string):
    with RetrieveFileFromUri(rfc_path_string).get_file_object() as data_file_handle:
        return load_cfda_csv_into_pandas(data_file_handle)


def load_cfda_csv_into_pandas(data_file_handle):
    df = pd.read_csv(data_file_handle, dtype=str, encoding="cp1252", encoding_errors="ignore", na_filter=False)
    df.rename(columns=clean_col_names, inplace=True)

    for field in DATA_CLEANING_MAP.keys():
        if field not in list(df.columns):
            raise ValueError("{} is required for loading table".format(field))

    df = df[list(DATA_CLEANING_MAP.keys())]  # toss out any columns from the csv that aren't in the fieldMap parameter
    df = df.rename(columns=DATA_CLEANING_MAP)  # rename columns as specified in fieldMap
    df["data_source"] = "USA"
    return df


def clean_col_names(field):
    """Define some data-munging functions that can be applied to pandas
    dataframes as necessary"""
    return str(field).lower().strip().replace(" ", "_").replace(",", "_")


def load_cfda_table_into_pandas():
    database_records = list(Cfda.objects.all().values())
    if not database_records:
        return pd.DataFrame()
    df = pd.DataFrame(database_records, dtype=str)
    del df["id"]
    del df["create_date"]
    del df["update_date"]
    return df


def fully_order_pandas_dataframe(df, sort_column):
    if df.empty:
        return df
    df.sort_values(sort_column, inplace=True)  # sort the rows using the provided column
    df = df[sorted(df.columns.tolist())]  # order the dataframe columns
    df.reset_index(drop=True, inplace=True)  # reset the pandas indexes so they match the new row order
    return df


def load_cfda(original_df, new_df):
    if new_df.equals(original_df):
        logger.info("Skipping CFDA load, no new data")
        return False

    Reporter["new_record_count"], Reporter["updated_record_count"] = 0, 0

    logger.info("Inserting new CFDA data")
    for row in new_df.itertuples():
        record = row._asdict()
        del record["Index"]
        _, created = Cfda.objects.update_or_create(program_number=record["program_number"], defaults=record)
        if created:
            Reporter["new_record_count"] += 1
        else:
            Reporter["updated_record_count"] += 1
    logger.info("Completed data load")
    return True