fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/etl/elasticsearch_loader_helpers/transform_data.py

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
import logging
from time import perf_counter
from typing import Callable, Dict, List, Optional

from django.conf import settings

from usaspending_api.etl.elasticsearch_loader_helpers import aggregate_key_functions as funcs
from usaspending_api.etl.elasticsearch_loader_helpers.utilities import (
    TaskSpec,
    convert_json_array_to_list_of_str,
    convert_json_data_to_dict,
    format_log,
)

logger = logging.getLogger("script")


def transform_award_data(worker: TaskSpec, records: List[dict]) -> List[dict]:
    converters = {
        "covid_spending_by_defc": convert_json_data_to_dict,
        "iija_spending_by_defc": convert_json_data_to_dict,
    }
    agg_key_creations = {
        "funding_subtier_agency_agg_key": lambda x: x["funding_subtier_agency_code"],
        "funding_toptier_agency_agg_key": lambda x: x["funding_toptier_agency_code"],
        "pop_congressional_agg_key": lambda x: x["pop_congressional_code"],
        "pop_congressional_cur_agg_key": lambda x: x["pop_congressional_code_current"],
        "pop_county_agg_key": lambda x: x["pop_county_code"],
        "pop_state_agg_key": lambda x: x["pop_state_code"],
        "recipient_agg_key": funcs.award_recipient_agg_key,
        "recipient_location_congressional_agg_key": lambda x: x["recipient_location_congressional_code"],
        "recipient_location_congressional_cur_agg_key": lambda x: x["recipient_location_congressional_code_current"],
        "recipient_location_county_agg_key": lambda x: x["recipient_location_county_code"],
        "recipient_location_state_agg_key": lambda x: x["recipient_location_state_code"],
    }
    drop_fields = [
        "recipient_levels",
        "funding_toptier_agency_id",
        "funding_subtier_agency_id",
        "recipient_location_state_name",
        "recipient_location_state_fips",
        "recipient_location_state_population",
        "recipient_location_county_population",
        "recipient_location_congressional_population",
        "pop_state_name",
        "pop_state_fips",
        "pop_state_population",
        "pop_county_population",
        "pop_congressional_population",
    ]
    return transform_data(worker, records, converters, agg_key_creations, drop_fields, settings.ES_ROUTING_FIELD)


def transform_transaction_data(worker: TaskSpec, records: List[dict]) -> List[dict]:
    converters = {
        "federal_accounts": convert_json_array_to_list_of_str,
    }
    agg_key_creations = {
        "recipient_agg_key": funcs.transaction_recipient_agg_key,
        "awarding_subtier_agency_agg_key": lambda x: x["awarding_sub_tier_agency_c"],
        "awarding_toptier_agency_agg_key": lambda x: x["awarding_agency_code"],
        "funding_subtier_agency_agg_key": lambda x: x["funding_sub_tier_agency_co"],
        "funding_toptier_agency_agg_key": lambda x: x["funding_agency_code"],
        "naics_agg_key": lambda x: x["naics_code"],
        "psc_agg_key": lambda x: x["product_or_service_code"],
        "pop_country_agg_key": lambda x: x["pop_country_code"],
        "pop_state_agg_key": lambda x: x["pop_state_code"],
        "pop_county_agg_key": funcs.pop_county_agg_key,
        "pop_congressional_agg_key": funcs.pop_congressional_agg_key,
        "pop_congressional_cur_agg_key": funcs.pop_congressional_cur_agg_key,
        "recipient_location_state_agg_key": lambda x: x["recipient_location_state_code"],
        "recipient_location_congressional_agg_key": funcs.recipient_location_congressional_agg_key,
        "recipient_location_congressional_cur_agg_key": funcs.recipient_location_congressional_cur_agg_key,
        "recipient_location_county_agg_key": funcs.recipient_location_county_agg_key,
        "recipient_location_country_agg_key": lambda x: x["recipient_location_country_code"],
    }
    drop_fields = [
        "pop_state_name",
        "pop_state_fips",
        "pop_state_population",
        "pop_county_population",
        "pop_congressional_population",
        "recipient_location_state_name",
        "recipient_location_state_fips",
        "recipient_location_state_population",
        "recipient_location_county_population",
        "recipient_location_congressional_population",
        "recipient_levels",
        "funding_toptier_agency_id",
    ]
    return transform_data(worker, records, converters, agg_key_creations, drop_fields, settings.ES_ROUTING_FIELD)


def transform_covid19_faba_data(worker: TaskSpec, records: List[dict]) -> List[dict]:
    logger.info(format_log("Transforming data", name=worker.name, action="Transform"))
    start = perf_counter()
    results = {}

    for record in records:
        es_id_field = record[worker.field_for_es_id]
        disinct_award_key = record.pop("financial_account_distinct_award_key")
        award_id = record.pop("award_id")
        award_type = record.pop("type")
        generated_unique_award_id = record.pop("generated_unique_award_id")
        total_loan_value = record.pop("total_loan_value")
        obligated_sum = record.get("transaction_obligated_amount") or 0  # record value for key may be None
        outlay_sum = (
            (record.get("gross_outlay_amount_by_award_cpe") or 0)
            + (record.get("ussgl487200_down_adj_pri_ppaid_undel_orders_oblig_refund_cpe") or 0)
            + (record.get("ussgl497200_down_adj_pri_paid_deliv_orders_oblig_refund_cpe") or 0)
        )  # record value for any key may be None
        temp_key = disinct_award_key
        if temp_key not in results:
            results[temp_key] = {
                "financial_account_distinct_award_key": disinct_award_key,
                "award_id": award_id,
                "type": award_type,
                "generated_unique_award_id": generated_unique_award_id,
                "total_loan_value": total_loan_value,
                "financial_accounts_by_award": list(),
                "obligated_sum": 0,
                "outlay_sum": 0,
                "_id": es_id_field,
            }
        results[temp_key]["obligated_sum"] += obligated_sum
        if record.get("is_final_balances_for_fy"):
            results[temp_key]["outlay_sum"] += outlay_sum
        results[temp_key]["financial_accounts_by_award"].append(record)

    if len(results) != len(records):
        msg = f"Transformed {len(records)} database records into {len(results)} documents for ingest"
        logger.info(format_log(msg, name=worker.name, action="Transform"))

    msg = f"Transformation operation took {perf_counter() - start:.2f}s"
    logger.info(format_log(msg, name=worker.name, action="Transform"))
    return list(results.values())  # don't need the dict key, return a list of the dict values


def transform_data(
    worker: TaskSpec,
    records: List[dict],
    converters: Dict[str, Callable],
    agg_key_creations: Dict[str, Callable],
    drop_fields: List[str],
    routing_field: Optional[str] = None,
) -> List[dict]:
    logger.info(format_log("Transforming data", name=worker.name, action="Transform"))

    start = perf_counter()

    for record in records:
        for field, converter in converters.items():
            record[field] = converter(record[field])
        for key, transform_func in agg_key_creations.items():
            record[key] = transform_func(record)

        # Route all documents with the same recipient to the same shard
        # This allows for accuracy and early-termination of "top N" recipient category aggregation queries
        # Recipient is are highest-cardinality category with over 2M unique values to aggregate against,
        # and this is needed for performance
        # ES helper will pop any "meta" fields like "routing" from provided data dict and use them in the action
        if routing_field:
            record["routing"] = record[routing_field]

        # Explicitly setting the ES _id field to match the postgres PK value allows
        # bulk index operations to be upserts without creating duplicate documents
        # IF and ONLY IF a routing meta field is not also provided (one whose value differs
        # from the doc _id field). If explicit routing is done, UPSERTs may cause duplicates,
        # so docs must be deleted before UPSERTed. (More info in streaming_post_to_es(...))
        record["_id"] = record[worker.field_for_es_id]

        # Removing data which were used for creating aggregate keys and aren't necessary standalone
        for key in drop_fields:
            record.pop(key)

    duration = perf_counter() - start
    logger.info(format_log(f"Transformation operation took {duration:.2f}s", name=worker.name, action="Transform"))
    return records