fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/awards/delta_models/financial_accounts_by_awards.py

Summary

Maintainability
A
3 hrs
Test Coverage
A
100%
FINANCIAL_ACCOUNTS_BY_AWARDS_COLUMNS = {
    "data_source": "STRING",
    "financial_accounts_by_awards_id": "INTEGER NOT NULL",
    "piid": "STRING",
    "parent_award_id": "STRING",
    "fain": "STRING",
    "uri": "STRING",
    "ussgl480100_undelivered_orders_obligations_unpaid_fyb": "NUMERIC(23,2)",
    "ussgl480100_undelivered_orders_obligations_unpaid_cpe": "NUMERIC(23,2)",
    "ussgl483100_undelivered_orders_oblig_transferred_unpaid_cpe": "NUMERIC(23,2)",
    "ussgl488100_upward_adjust_pri_undeliv_order_oblig_unpaid_cpe": "NUMERIC(23,2)",
    "ussgl490100_delivered_orders_obligations_unpaid_fyb": "NUMERIC(23,2)",
    "ussgl490100_delivered_orders_obligations_unpaid_cpe": "NUMERIC(23,2)",
    "ussgl493100_delivered_orders_oblig_transferred_unpaid_cpe": "NUMERIC(23,2)",
    "ussgl498100_upward_adjust_pri_deliv_orders_oblig_unpaid_cpe": "NUMERIC(23,2)",
    "ussgl480200_undelivered_orders_oblig_prepaid_advanced_fyb": "NUMERIC(23,2)",
    "ussgl480200_undelivered_orders_oblig_prepaid_advanced_cpe": "NUMERIC(23,2)",
    "ussgl483200_undeliv_orders_oblig_transferred_prepaid_adv_cpe": "NUMERIC(23,2)",
    "ussgl488200_up_adjust_pri_undeliv_order_oblig_ppaid_adv_cpe": "NUMERIC(23,2)",
    "ussgl490200_delivered_orders_obligations_paid_cpe": "NUMERIC(23,2)",
    "ussgl490800_authority_outlayed_not_yet_disbursed_fyb": "NUMERIC(23,2)",
    "ussgl490800_authority_outlayed_not_yet_disbursed_cpe": "NUMERIC(23,2)",
    "ussgl498200_upward_adjust_pri_deliv_orders_oblig_paid_cpe": "NUMERIC(23,2)",
    "obligations_undelivered_orders_unpaid_total_cpe": "NUMERIC(23,2)",
    "obligations_delivered_orders_unpaid_total_fyb": "NUMERIC(23,2)",
    "obligations_delivered_orders_unpaid_total_cpe": "NUMERIC(23,2)",
    "gross_outlays_undelivered_orders_prepaid_total_fyb": "NUMERIC(23,2)",
    "gross_outlays_undelivered_orders_prepaid_total_cpe": "NUMERIC(23,2)",
    "gross_outlays_delivered_orders_paid_total_fyb": "NUMERIC(23,2)",
    "gross_outlay_amount_by_award_fyb": "NUMERIC(23,2)",
    "gross_outlay_amount_by_award_cpe": "NUMERIC(23,2)",
    "obligations_incurred_total_by_award_cpe": "NUMERIC(23,2)",
    "ussgl487100_down_adj_pri_unpaid_undel_orders_oblig_recov_cpe": "NUMERIC(23,2)",
    "ussgl497100_down_adj_pri_unpaid_deliv_orders_oblig_recov_cpe": "NUMERIC(23,2)",
    "ussgl487200_down_adj_pri_ppaid_undel_orders_oblig_refund_cpe": "NUMERIC(23,2)",
    "ussgl497200_down_adj_pri_paid_deliv_orders_oblig_refund_cpe": "NUMERIC(23,2)",
    "deobligations_recoveries_refunds_of_prior_year_by_award_cpe": "NUMERIC(23,2)",
    "obligations_undelivered_orders_unpaid_total_fyb": "NUMERIC(23,2)",
    "gross_outlays_delivered_orders_paid_total_cpe": "NUMERIC(23,2)",
    "drv_award_id_field_type": "STRING",
    "drv_obligations_incurred_total_by_award": "NUMERIC(23,2)",
    "transaction_obligated_amount": "NUMERIC(23,2)",
    "reporting_period_start": "DATE",
    "reporting_period_end": "DATE",
    "last_modified_date": "DATE",
    "certified_date": "DATE",
    "create_date": "TIMESTAMP",
    "update_date": "TIMESTAMP",
    "award_id": "LONG",
    "object_class_id": "INTEGER",
    "program_activity_id": "INTEGER",
    "submission_id": "INTEGER NOT NULL",
    "treasury_account_id": "INTEGER",
    "distinct_award_key": "STRING NOT NULL",
    "disaster_emergency_fund_code": "STRING",
}

financial_accounts_by_awards_sql_string = rf"""
    CREATE OR REPLACE TABLE {{DESTINATION_TABLE}} (
        {", ".join([f'{key} {val}' for key, val in FINANCIAL_ACCOUNTS_BY_AWARDS_COLUMNS.items()])}
    )
    USING DELTA
    LOCATION 's3a://{{SPARK_S3_BUCKET}}/{{DELTA_LAKE_S3_PATH}}/{{DESTINATION_DATABASE}}/{{DESTINATION_TABLE}}'
"""

c_to_d_linkage_view_sql_strings = [
    # -----
    # Before starting to link records, we should first unlink FABA records from Awards that no longer
    # exist. This creates a view on FABA with a subset of fields needed for the linkage process;
    # Also upper cases some fields because the UPPER() operation can't be used in some predicates
    # in Spark SQL
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW faba_upper AS (
        SELECT
            faba.financial_accounts_by_awards_id,
            aw.id AS award_id,
            UPPER(faba.piid) AS piid,
            UPPER(faba.parent_award_id) AS parent_award_id,
            UPPER(faba.fain) AS fain,
            UPPER(faba.uri) AS uri
        FROM
            raw.financial_accounts_by_awards AS faba
        LEFT JOIN
            int.awards AS aw
        ON
            faba.award_id = aw.id
    );
    """,
    # -----
    # Selects a subset of fields from the Awards table and uppercases them because the UPPER() operation
    # can't be used in some predicates in Spark SQL
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW awards_upper AS (
        SELECT
            id,
            UPPER(piid) AS piid,
            UPPER(parent_award_piid) AS parent_award_piid,
            UPPER(fain) AS fain,
            UPPER(uri) AS uri
        FROM
            int.awards
    );
    """,
    # -----
    # The first of several views that link FABA records to Award records;
    # When PIID and Parent PIID are populated, update File C contract records to link to a corresponding
    # Award if there is a single exact match based on PIID and Parent PIID
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW c_to_d_piid_and_parent_piid AS (
        SELECT
            financial_accounts_by_awards_id,
            aw.id AS award_id,
            1 AS priority
        FROM
            faba_upper AS faba
        INNER JOIN
            awards_upper AS aw
        ON
            aw.piid = faba.piid
            AND aw.parent_award_piid = faba.parent_award_id
        WHERE
            faba.piid IS NOT NULL
            AND faba.award_id IS NULL
            AND faba.parent_award_id IS NOT NULL
            AND (
                SELECT COUNT(*)
                FROM awards_upper AS aw_sub
                WHERE
                    aw_sub.piid = faba.piid
                    AND aw_sub.parent_award_piid = faba.parent_award_id
            ) = 1
    );
    """,
    # -----
    # When PIID is populated and Parent PIID is NULL, update File C contract records to link to a
    # corresponding award if there is a single exact match based on PIID
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW c_to_d_piid AS (
        SELECT
            financial_accounts_by_awards_id,
            aw.id AS award_id,
            2 AS priority
        FROM
            faba_upper AS faba
        INNER JOIN
            awards_upper AS aw
        ON
            aw.piid = faba.piid
        WHERE
            faba.piid IS NOT NULL
            AND faba.award_id IS NULL
            AND faba.parent_award_id IS NULL
            AND (
                SELECT COUNT(*)
                FROM awards_upper AS aw_sub
                WHERE
                    aw_sub.piid = faba.piid
            ) = 1
    );
    """,
    # -----
    # When only FAIN is populated in File C, update File C assistance records to link to a corresponding
    # award if there is a single exact match based on FAIN.
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW c_to_d_fain AS (
        SELECT
            financial_accounts_by_awards_id,
            aw.id AS award_id,
            3 AS priority
        FROM
            faba_upper AS faba
        INNER JOIN
            awards_upper AS aw
        ON
            aw.fain = faba.fain
        WHERE
            faba.fain IS NOT NULL
            AND faba.uri IS NULL
            AND faba.award_id IS NULL
            AND (
                SELECT COUNT(*)
                FROM awards_upper AS aw_sub
                WHERE aw_sub.fain = faba.fain
            ) = 1
    );
    """,
    # -----
    # When only URI is populated in File C, update File C assistance records to link to a corresponding
    # award if there is a single exact match based on URI.
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW c_to_d_uri AS (
        SELECT
            financial_accounts_by_awards_id,
            aw.id AS award_id,
            4 AS priority
        FROM
            faba_upper AS faba
        INNER JOIN
            awards_upper AS aw
        ON
            aw.uri = faba.uri
        WHERE
            faba.uri IS NOT NULL
            AND faba.fain IS NULL
            AND faba.award_id IS NULL
            AND (
                SELECT COUNT(*)
                FROM awards_upper AS aw_sub
                WHERE aw_sub.uri = faba.uri
            ) = 1
    );
    """,
    # -----
    # When both FAIN and URI are populated in File C, update File C assistance records to link to a
    # corresponding award if there is an single exact match based on FAIN
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW c_to_d_fain_and_uri_fain AS (
        SELECT
            financial_accounts_by_awards_id,
            aw.id AS award_id,
            5 AS priority
        FROM
            faba_upper AS faba
        INNER JOIN
            awards_upper AS aw
        ON
            aw.fain = faba.fain
        WHERE
            faba.fain IS NOT NULL
            AND faba.uri IS NOT NULL
            AND faba.award_id IS NULL
            AND (
                SELECT COUNT(*)
                FROM awards_upper AS aw_sub
                WHERE
                    aw_sub.fain = faba.fain
            ) = 1
    );
    """,
    # -----
    # When both FAIN and URI are populated in File C, update File C assistance records to link to a
    # corresponding award if there is an single exact match based on URI
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW c_to_d_fain_and_uri_uri AS (
        SELECT
            financial_accounts_by_awards_id,
            aw.id AS award_id,
            6 AS priority
        FROM
            faba_upper AS faba
        INNER JOIN
            awards_upper AS aw
        ON
            aw.uri = faba.uri
        WHERE
            faba.fain IS NOT NULL
            AND faba.uri IS NOT NULL
            AND faba.award_id IS NULL
            AND (
                SELECT COUNT(*)
                FROM awards_upper AS aw_sub
                WHERE
                    aw_sub.uri = faba.uri
            ) = 1
    );
    """,
    # -----
    # Determine which FABA records need to be unlinked to an Award because its corresponding Award
    # was deleted;
    # This is necessary in addition to the first SQL statement checking for deletes. The first view
    # was of ALL FABA records and included NULLs on deleted awards, so those FABA records would
    # be included in the previous six `c_to_d_*` linkage views. This second view checking for deletes
    # is adding a new `c_to_d_*` linkage view to be included in the final UNION and MERGE into the
    # `int` version of `financial_accounts_by_awards`
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW c_to_d_deletes AS (
        SELECT
            faba.financial_accounts_by_awards_id,
            aw.id,
            7 AS priority
        FROM
            raw.financial_accounts_by_awards AS faba
        LEFT JOIN
            awards_upper AS aw
        ON
            faba.award_id = aw.id
        WHERE
            aw.id IS NULL
            AND faba.award_id IS NOT NULL
    );
    """,
    # -----
    # Union all the `c_to_d_*` linkage views into a single view
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW union_all AS (
        SELECT * FROM c_to_d_piid_and_parent_piid
        UNION ALL
        SELECT * FROM c_to_d_piid
        UNION ALL
        SELECT * FROM c_to_d_fain
        UNION ALL
        SELECT * FROM c_to_d_uri
        UNION ALL
        SELECT * FROM c_to_d_fain_and_uri_fain
        UNION ALL
        SELECT * FROM c_to_d_fain_and_uri_uri
        UNION ALL
        SELECT * FROM c_to_d_deletes
    );
    """,
    # -----
    # Filter the results of the above view down to a single set of distinct FABA IDs based on view
    # priority. In theory, no filtering should happen here because each view should include a distinct
    # set of FABA IDs. This is a safeguard
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW union_all_priority AS (
        SELECT
        financial_accounts_by_awards_id,
        award_id
        FROM (
        SELECT
            financial_accounts_by_awards_id,
            award_id,
            ROW_NUMBER() OVER (
            PARTITION BY financial_accounts_by_awards_id
            ORDER BY
                priority ASC
            ) as row_num
        FROM union_all
        )
        WHERE row_num = 1
    );
    """,
    # -----
    # Create view of distinct awards that are being linked to
    # -----
    """
    CREATE OR REPLACE TEMPORARY VIEW updated_awards AS (
        SELECT DISTINCT award_id FROM union_all_priority
    );
    """,
]

# List of SQL strings used to drop the views created for File C to D Linkage
c_to_d_linkage_drop_view_sql_strings = [
    "DROP VIEW faba_upper;",
    "DROP VIEW awards_upper;",
    "DROP VIEW c_to_d_piid_and_parent_piid;",
    "DROP VIEW c_to_d_piid;",
    "DROP VIEW c_to_d_fain;",
    "DROP VIEW c_to_d_uri;",
    "DROP VIEW c_to_d_fain_and_uri_fain;",
    "DROP VIEW c_to_d_fain_and_uri_uri;",
    "DROP VIEW c_to_d_deletes;",
    "DROP VIEW union_all;",
    "DROP VIEW union_all_priority;",
    "DROP VIEW updated_awards;",
]