fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/etl/submission_loader_helpers/file_c.py

Summary

Maintainability
A
0 mins
Test Coverage
A
93%
import logging
import numpy as np
import pandas as pd
import re

from collections import deque, defaultdict
from datetime import datetime
from django.db import connections
from django.utils.functional import cached_property

from usaspending_api.awards.models import FinancialAccountsByAwards
from usaspending_api.common.helpers.dict_helpers import upper_case_dict_values
from usaspending_api.common.helpers.etl_helpers import update_c_to_d_linkages
from usaspending_api.etl.broker_etl_helpers import dictfetchall
from usaspending_api.etl.management.load_base import load_data_into_model
from usaspending_api.etl.submission_loader_helpers.bulk_create_manager import BulkCreateManager
from usaspending_api.etl.submission_loader_helpers.disaster_emergency_fund_codes import get_disaster_emergency_fund
from usaspending_api.etl.submission_loader_helpers.object_class import get_object_class_row
from usaspending_api.etl.submission_loader_helpers.program_activities import get_program_activity
from usaspending_api.etl.submission_loader_helpers.treasury_appropriation_account import (
    bulk_treasury_appropriation_account_tas_lookup,
    get_treasury_appropriation_account_tas_lookup,
)


logger = logging.getLogger("script")


class PublishedAwardFinancialIterator:
    def __init__(self, submission_attributes, chunk_size):
        self.submission_attributes = submission_attributes
        self.chunk_size = chunk_size

        # For managing state.
        self._last_id = None
        self._current_chunk = None

    def _retrieve_and_prepare_next_chunk(self):
        sql = f"""
            select  c.*
            {PublishedAwardFinancial.get_from_where(self.submission_attributes.submission_id)}
            {"" if self._last_id is None else f"and published_award_financial_id > {self._last_id}"}
            order   by c.published_award_financial_id
            limit   {self.chunk_size}
        """

        award_financial_frame = pd.read_sql(sql, connections["data_broker"])

        if award_financial_frame.size > 0:
            award_financial_frame["object_class"] = award_financial_frame.apply(get_object_class_row, axis=1)
            award_financial_frame["program_activity"] = award_financial_frame.apply(
                get_program_activity, axis=1, submission_attributes=self.submission_attributes
            )
            award_financial_frame = award_financial_frame.replace({np.nan: None})

            self._last_id = award_financial_frame["published_award_financial_id"].max()
            self._current_chunk = deque(award_financial_frame.to_dict(orient="records"))

        else:
            self._last_id = None
            self._current_chunk = None

    def __next__(self):
        if not self._current_chunk:
            self._retrieve_and_prepare_next_chunk()
            if not self._current_chunk:
                raise StopIteration
        return self._current_chunk.popleft()


class PublishedAwardFinancial:
    """ Abstract away the messy details of how we retrieve and prepare published_award_financial rows. """

    def __init__(self, submission_attributes, db_cursor, chunk_size):
        self.submission_attributes = submission_attributes
        self.db_cursor = db_cursor
        self.chunk_size = chunk_size

    @cached_property
    def count(self):
        sql = f"select count(*) {self.get_from_where(self.submission_attributes.submission_id)}"
        self.db_cursor.execute(sql)
        return self.db_cursor.fetchall()[0][0]

    @property
    def account_nums(self):
        sql = f"""
            select distinct c.account_num
            {self.get_from_where(self.submission_attributes.submission_id)}
            and c.account_num is not null
        """
        self.db_cursor.execute(sql)
        return dictfetchall(self.db_cursor)

    @staticmethod
    def get_from_where(submission_id):
        return f"""
            from    published_award_financial c
                    inner join submission s on s.submission_id = c.submission_id
            where   s.submission_id = {submission_id} and
                    (
                        COALESCE(c.transaction_obligated_amou, 0) != 0
                        or COALESCE(c.gross_outlay_amount_by_awa_cpe, 0) != 0
                        or COALESCE(c.ussgl487200_downward_adjus_cpe, 0) != 0
                        or COALESCE(c.ussgl497200_downward_adjus_cpe, 0) != 0
                    )
        """

    def __iter__(self):
        return PublishedAwardFinancialIterator(self.submission_attributes, self.chunk_size)


def get_file_c(submission_attributes, db_cursor, chunk_size):
    return PublishedAwardFinancial(submission_attributes, db_cursor, chunk_size)


def load_file_c(submission_attributes, db_cursor, published_award_financial, skip_c_to_d_linkage):
    """
    Process and load file C broker data.
    Note: this should run AFTER the D1 and D2 files are loaded because we try to join to those records to retrieve some
    additional information about the awarding sub-tier agency.
    """

    if published_award_financial.count == 0:
        logger.warning("No File C (award financial) data found, skipping...")
        return

    # this matches the file b reverse directive, but am repeating it here to ensure that we don't overwrite it as we
    # change up the order of file loading
    reverse = re.compile(r"(_(cpe|fyb)$)|^transaction_obligated_amount$")
    skipped_tas = defaultdict(int)  # tracks count of rows skipped due to "missing" TAS
    total_rows = published_award_financial.count
    start_time = datetime.now()

    bulk_treasury_appropriation_account_tas_lookup(published_award_financial.account_nums, db_cursor)

    _save_file_c_rows(published_award_financial, total_rows, start_time, skipped_tas, submission_attributes, reverse)

    if skip_c_to_d_linkage:
        logger.info("Skipping c_to_d_linkage process as requested.")
    else:
        update_c_to_d_linkages("contract", False, submission_attributes.submission_id)
        update_c_to_d_linkages("assistance", False, submission_attributes.submission_id)

    for tas, count in skipped_tas.items():
        logger.info(f"Skipped {count:,} rows due to {tas}")

    total_tas_skipped = sum([count for count in skipped_tas.values()])

    if total_tas_skipped > 0:
        logger.info(f"SKIPPED {total_tas_skipped:,} ROWS of File C (missing TAS)")
    else:
        logger.info("All File C records in Broker loaded into USAspending")


def _save_file_c_rows(published_award_financial, total_rows, start_time, skipped_tas, submission_attributes, reverse):
    save_manager = BulkCreateManager(FinancialAccountsByAwards)
    for index, row in enumerate(published_award_financial, 1):
        if not (index % 1000):
            logger.info(f"C File Load: Loading row {index:,} of {total_rows:,} ({datetime.now() - start_time})")

        upper_case_dict_values(row)

        # Check and see if there is an entry for this TAS
        treasury_account, tas_rendering_label = get_treasury_appropriation_account_tas_lookup(row.get("account_num"))
        if treasury_account is None:
            skipped_tas[tas_rendering_label] += 1
            continue

        award_financial_data = FinancialAccountsByAwards()

        value_map_faba = {
            "submission": submission_attributes,
            "reporting_period_start": submission_attributes.reporting_period_start,
            "reporting_period_end": submission_attributes.reporting_period_end,
            "treasury_account": treasury_account,
            "object_class": row.get("object_class"),
            "program_activity": row.get("program_activity"),
            "disaster_emergency_fund": get_disaster_emergency_fund(row),
            "distinct_award_key": create_distinct_award_key(row),
        }

        save_manager.append(
            load_data_into_model(award_financial_data, row, value_map=value_map_faba, save=False, reverse=reverse)
        )

    save_manager.save_stragglers()


def create_distinct_award_key(row):
    parent_award_id = row.get("parent_award_id") or ""
    return f"{row.get('piid') or ''}|{parent_award_id}|{row.get('fain') or ''}|{row.get('uri') or ''}".upper()