fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/references/management/commands/load_offices.py

Summary

Maintainability
A
0 mins
Test Coverage
A
93%
import logging
from datetime import datetime, timezone

from django.core.management.base import BaseCommand
from django.db import DEFAULT_DB_ALIAS, IntegrityError, connections, transaction

from usaspending_api.common.operations_reporter import OpsReporter
from usaspending_api.etl.broker_etl_helpers import dictfetchall
from usaspending_api.references.models.office import Office

logger = logging.getLogger("script")
Reporter = OpsReporter(iso_start_datetime=datetime.now(timezone.utc).isoformat(), job_name="load_offices.py")


class Command(BaseCommand):
    help = "Drop and recreate all office reference data"

    def handle(self, *args, **options):
        logger.info("Starting ETL script")
        try:
            self.process_data()
            logger.info("Office ETL finished successfully!")
        except IntegrityError:
            logger.warning("Unique constraint violated. Continuing with pipeline execution.")
            raise SystemExit(3)  # Raise exit code 3 for unique constraint violation

    @transaction.atomic()
    def process_data(self):
        broker_cursor = connections["data_broker"].cursor()

        logger.info("Extracting data from Broker")
        broker_cursor.execute(self.broker_fetch_sql)
        office_values = dictfetchall(broker_cursor)

        logger.info("Deleting all existing office reference data")
        deletes = Office.objects.all().delete()
        logger.info(f"Deleted {deletes[0]:,} records")

        logger.info("Transforming new office records")
        total_objs = [Office(**values) for values in office_values]

        logger.info("Loading new office records into database")
        new_rec_count = len(Office.objects.bulk_create(total_objs))
        logger.info(f"Loaded: {new_rec_count:,} records")
        logger.info("Committing transaction to database")

        logger.info(f"Deleting all existing offices that are not linked to a transaction in {DEFAULT_DB_ALIAS}")
        # Identify offices that do not correspond to any transactions, using only USAS DB
        with connections[DEFAULT_DB_ALIAS].cursor() as cursor:
            cursor.execute(self.usas_unlinked_offices_sql)

    @property
    def broker_fetch_sql(self):
        return f"""
            SELECT
                office_code,
                office_name,
                sub_tier_code,
                agency_code,
                contract_awards_office,
                contract_funding_office,
                financial_assistance_awards_office,
                financial_assistance_funding_office
            FROM office
        """

    @property
    def usas_unlinked_offices_sql(self):
        return """
        DROP TABLE IF EXISTS temp_unique_office_codes_from_source;
        CREATE TEMPORARY TABLE temp_unique_office_codes_from_source (
            awarding_office_code TEXT,
            funding_office_code TEXT,
            UNIQUE (awarding_office_code, funding_office_code)
        );
        INSERT INTO temp_unique_office_codes_from_source
        SELECT * FROM (
                    SELECT DISTINCT awarding_office_code, funding_office_code FROM source_assistance_transaction
                    UNION
                    SELECT DISTINCT awarding_office_code, funding_office_code FROM source_procurement_transaction
                ) s;
        CREATE INDEX awarding_office_code_idx_temp ON temp_unique_office_codes_from_source (awarding_office_code);
        CREATE INDEX funding_office_code_idx_temp ON temp_unique_office_codes_from_source (funding_office_code);
        DELETE FROM office WHERE office_code IN (
            SELECT DISTINCT office_code
            FROM office AS o
            LEFT JOIN temp_unique_office_codes_from_source s
            ON s.awarding_office_code = o.office_code
                OR s.funding_office_code = o.office_code

            WHERE s.awarding_office_code IS NULL
                AND s.funding_office_code IS NULL
        );
        DROP TABLE IF EXISTS temp_unique_office_codes_from_source;
        """