fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/broker/management/commands/update_awarding_agencies.py

Summary

Maintainability
A
1 hr
Test Coverage
F
0%
import logging

from django.core.management.base import BaseCommand
from datetime import datetime
from usaspending_api.awards.models import TransactionNormalized, TransactionFABS, TransactionFPDS
from usaspending_api.common.helpers.timing_helpers import timer
from usaspending_api.references.models import Agency
from usaspending_api.search.models import AwardSearch

logger = logging.getLogger("script")


agency_no_sub_map = {
    (agency.toptier_agency.toptier_code, agency.subtier_agency.subtier_code): agency
    for agency in Agency.objects.filter(subtier_agency__isnull=False)
}
agency_cgac_only_map = {
    agency.toptier_agency.toptier_code: agency for agency in Agency.objects.filter(subtier_agency__isnull=True)
}


class Command(BaseCommand):

    help = "Updates empty awarding and funding agency fields on transactions and awards due to subtier/toptier mapping"

    @staticmethod
    def update_awarding_funding_agency(fiscal_year=None, file_type=None, page=1, limit=500000):

        """
        Uses the TransactionFPDS or TransactionFABS is present to update missing awarding and funding agency
        in TransactionNormalized and Awards
        """

        offset = (page - 1) * limit

        range_low = offset
        range_high = offset + limit

        if file_type == "D1":
            # List of Transaction FPDS mapping transaction ids, cgac code, and subtier code
            # Filters out FPDS transactions where the transaction is equal to the fiscal year
            transaction_cgac_subtier_map = [
                {
                    "transaction_id": transaction_FPDS["transaction_id"],
                    "awarding_toptier_code": transaction_FPDS["awarding_agency_code"],
                    "funding_toptier_code": transaction_FPDS["funding_agency_code"],
                    "awarding_subtier_code": transaction_FPDS["awarding_sub_tier_agency_c"],
                    "funding_subtier_code": transaction_FPDS["funding_sub_tier_agency_co"],
                }
                for transaction_FPDS in TransactionFPDS.objects.filter(transaction__fiscal_year=fiscal_year).values(
                    "transaction_id",
                    "awarding_agency_code",
                    "funding_agency_code",
                    "awarding_sub_tier_agency_c",
                    "funding_sub_tier_agency_co",
                )[range_low:range_high]
            ]
        elif file_type == "D2":
            # List of Transaction FABS mapping transaction ids, cgac code, and subtier code
            # Filters out FABS transactions where the where the transaction is equal to the fiscal year
            transaction_cgac_subtier_map = [
                {
                    "transaction_id": transaction_FABS["transaction_id"],
                    "awarding_toptier_code": transaction_FABS["awarding_agency_code"],
                    "funding_toptier_code": transaction_FABS["funding_agency_code"],
                    "awarding_subtier_code": transaction_FABS["awarding_sub_tier_agency_c"],
                    "funding_subtier_code": transaction_FABS["funding_sub_tier_agency_co"],
                }
                for transaction_FABS in TransactionFABS.objects.filter(transaction__fiscal_year=fiscal_year).values(
                    "transaction_id",
                    "awarding_agency_code",
                    "funding_agency_code",
                    "awarding_sub_tier_agency_c",
                    "funding_sub_tier_agency_co",
                )[range_low:range_high]
            ]

        total_rows = len(transaction_cgac_subtier_map)

        logger.info("Processing " + str(total_rows) + " rows of transaction data")
        logger.info("Rows range from {} to {}".format(range_low, range_high))

        # Go through each D1 or D2 transaction to update awarding/funding agency if missing

        index = 1

        start_time = datetime.now()
        for row in transaction_cgac_subtier_map:

            if not (index % 100):
                logger.info(
                    "Updating agencies: Loading row {} of {} ({})".format(
                        str(index), str(total_rows), datetime.now() - start_time
                    )
                )

            index += 1

            # Find corresponding transaction
            transaction = TransactionNormalized.objects.filter(id=row["transaction_id"]).first()

            # Skips transaction if unable to find it in Transaction Normalized
            if transaction is None:
                logger.error("Unable to find Transaction {}".format(str(row["transaction_id"])))
                continue

            # Find the agency that this award transaction belongs to. If it doesn't exist, create it.
            awarding_agency = agency_no_sub_map.get((row["awarding_toptier_code"], row["awarding_subtier_code"]))

            if awarding_agency is None:
                awarding_agency = agency_cgac_only_map.get(row["awarding_toptier_code"])

            funding_agency = agency_no_sub_map.get((row["funding_toptier_code"], row["funding_subtier_code"]))

            if funding_agency is None:
                funding_agency = agency_cgac_only_map.get(row["funding_toptier_code"])

            # If unable to get agency moves on to the next transaction
            if awarding_agency is None and funding_agency is None:
                logger.error(
                    "Unable to find awarding agency CGAC {} Subtier {} and funding agency CGAC {} Subtier {}".format(
                        row["awarding_toptier_code"],
                        row["awarding_subtier_code"],
                        row["funding_toptier_code"],
                        row["awarding_subtier_code"],
                    )
                )
                continue

            if awarding_agency is None:
                logger.error(
                    "Unable to find awarding agency for CGAC {} Subtier {}".format(
                        row["awarding_toptier_code"], row["awarding_subtier_code"]
                    )
                )

            elif funding_agency is None:
                pass

            transaction.awarding_agency = awarding_agency
            transaction.funding_agency = funding_agency

            award = AwardSearch.objects.filter(award_id=transaction.award.id).first()

            if award is None:
                logger.error("Unable to find Award {}".format(str(transaction.award.id)))
                continue

            award.awarding_agency_id = awarding_agency.id

            award.funding_agency_id = funding_agency.id

            try:
                transaction.save()
                award.save()

            except Exception as e:
                logger.error(
                    "Unable to save Transaction {} and Award {}:{}".format(str(transaction.id), str(award.id), str(e))
                )

    def add_arguments(self, parser):

        parser.add_argument(
            "--fiscal_year",
            dest="fiscal_year",
            nargs="+",
            type=int,
            help="Year for which to run awarding agency clean up on",
        )

        parser.add_argument(
            "--assistance",
            action="store_true",
            dest="assistance",
            default=False,
            help="Runs the award only for Award Financial Assistance (Assistance) data",
        )

        parser.add_argument(
            "--contracts",
            action="store_true",
            dest="contracts",
            default=False,
            help="Runs the historical loader only for Award Procurement (Contract) data",
        )

        parser.add_argument("--page", dest="page", nargs="+", type=int, help="Page for batching and parallelization")

        parser.add_argument("--limit", dest="limit", nargs="+", type=int, help="Limit for batching and parallelization")

    def handle(self, *args, **options):
        logger.info("Starting updating awarding agencies...")

        fiscal_year = options.get("fiscal_year")[0]

        page = options.get("page")
        limit = options.get("limit")

        page = page[0] if page else 1
        limit = limit[0] if limit else 500000

        if options.get("contracts", None):
            with timer("D1 (contracts/FPDS) awarding/funding agencies updates", logger.info):
                self.update_awarding_funding_agency(fiscal_year, "D1", page=page, limit=limit)

        elif options.get("assistance", None):
            with timer("D2 (assistance/FABS) awarding/funding agencies updates", logger.info):
                self.update_awarding_funding_agency(fiscal_year, "D2", page=page, limit=limit)

        else:
            logger.error("Not a valid data type: --assistance,--contracts")

        logger.info("Finished")