fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/database_scripts/job_archive/backfill_business_categories.py

Summary

Maintainability
A
0 mins
Test Coverage
F
0%
"""
Jira Ticket Number(s): DEV-3753

    Move business_categories from legal_entity to transaction_normalized.

Expected CLI:

    $ python3 usaspending_api/database_scripts/job_archive/backfill_business_categories.py

Purpose:

    Recalculate business categories for all transactions.  We COULD just copy them from legal_entity,
    but I'd sleep better at night knowing we were getting a fresh start even though it'll take
    longer to update them this way.

Life expectancy:

    Once Sprint 95 has been rolled out to production this script is safe to delete... although I
    would recommend keeping it around for a few additional sprints for reference.

"""
import asyncio
import asyncpg
import logging

from os import environ
from pathlib import Path


# Import our USAspending Timer component.  This will not work if we ever add
# any Django specific stuff to the timing_helpers.py file.
exec(Path("usaspending_api/common/helpers/timing_helpers.py").read_text())


CONNECTION_STRING = environ["DATABASE_URL"]
CHUNK_SIZE = 500000
PIPELINES = 5
END_OF_QUEUE = (None, None)

# Modified from usaspending_api/broker/management/sql/update_business_categories.sql
SQL = """
    drop table if exists temp_dev_3753_business_categories;


    create temporary table
        temp_dev_3753_business_categories (
            transaction_id bigint not null,
            business_categories text[] not null
        );


    insert into
        temp_dev_3753_business_categories (
            transaction_id,
            business_categories
        )
    select
        tn.id,
        compile_fpds_business_categories(
            fpds.contracting_officers_deter,
            fpds.corporate_entity_tax_exemp,
            fpds.corporate_entity_not_tax_e,
            fpds.partnership_or_limited_lia,
            fpds.sole_proprietorship,
            fpds.manufacturer_of_goods,
            fpds.subchapter_s_corporation,
            fpds.limited_liability_corporat,
            fpds.for_profit_organization,
            fpds.alaskan_native_owned_corpo,
            fpds.american_indian_owned_busi,
            fpds.asian_pacific_american_own,
            fpds.black_american_owned_busin,
            fpds.hispanic_american_owned_bu,
            fpds.native_american_owned_busi,
            fpds.native_hawaiian_owned_busi,
            fpds.subcontinent_asian_asian_i,
            fpds.tribally_owned_business,
            fpds.other_minority_owned_busin,
            fpds.minority_owned_business,
            fpds.women_owned_small_business,
            fpds.economically_disadvantaged,
            fpds.joint_venture_women_owned,
            fpds.joint_venture_economically,
            fpds.woman_owned_business,
            fpds.service_disabled_veteran_o,
            fpds.veteran_owned_business,
            fpds.c8a_program_participant,
            fpds.the_ability_one_program,
            fpds.dot_certified_disadvantage,
            fpds.emerging_small_business,
            fpds.federally_funded_research,
            fpds.historically_underutilized,
            fpds.labor_surplus_area_firm,
            fpds.sba_certified_8_a_joint_ve,
            fpds.self_certified_small_disad,
            fpds.small_agricultural_coopera,
            fpds.small_disadvantaged_busine,
            fpds.community_developed_corpor,
            fpds.domestic_or_foreign_entity,
            fpds.foreign_owned_and_located,
            fpds.foreign_government,
            fpds.international_organization,
            fpds.domestic_shelter,
            fpds.hospital_flag,
            fpds.veterinary_hospital,
            fpds.foundation,
            fpds.community_development_corp,
            fpds.nonprofit_organization,
            fpds.educational_institution,
            fpds.other_not_for_profit_organ,
            fpds.state_controlled_instituti,
            fpds.c1862_land_grant_college,
            fpds.c1890_land_grant_college,
            fpds.c1994_land_grant_college,
            fpds.private_university_or_coll,
            fpds.minority_institution,
            fpds.historically_black_college,
            fpds.tribal_college,
            fpds.alaskan_native_servicing_i,
            fpds.native_hawaiian_servicing,
            fpds.hispanic_servicing_institu,
            fpds.school_of_forestry,
            fpds.veterinary_college,
            fpds.us_federal_government,
            fpds.federal_agency,
            fpds.us_government_entity,
            fpds.interstate_entity,
            fpds.us_state_government,
            fpds.council_of_governments,
            fpds.city_local_government,
            fpds.county_local_government,
            fpds.inter_municipal_local_gove,
            fpds.municipality_local_governm,
            fpds.township_local_government,
            fpds.us_local_government,
            fpds.local_government_owned,
            fpds.school_district_local_gove,
            fpds.us_tribal_government,
            fpds.indian_tribe_federally_rec,
            fpds.housing_authorities_public,
            fpds.airport_authority,
            fpds.port_authority,
            fpds.transit_authority,
            fpds.planning_commission
        )
    from
        transaction_normalized as tn
        inner join transaction_fpds as fpds on
            fpds.transaction_id = tn.id and
            tn.is_fpds is true
    where
        tn.id between {min_id} and {max_id};


    insert into
        temp_dev_3753_business_categories (
            transaction_id,
            business_categories
        )
    select
        tn.id,
        compile_fabs_business_categories(
            fabs.business_types
        )
    from
        transaction_normalized as tn
        inner join transaction_fabs as fabs on
            fabs.transaction_id = tn.id and
            tn.is_fpds is false
    where
        tn.id between {min_id} and {max_id};


    alter table temp_dev_3753_business_categories add primary key (transaction_id);


    update  subaward as s
    set     business_categories = t.business_categories
    from    vw_awards as a
            inner join temp_dev_3753_business_categories as t on t.transaction_id = a.latest_transaction_id
    where   s.award_id = a.id and
            s.business_categories is distinct from t.business_categories;


    update  transaction_normalized as tn
    set     business_categories = t.business_categories
    from    temp_dev_3753_business_categories as t
    where   tn.id = t.transaction_id and
            tn.business_categories is distinct from t.business_categories;
"""


logging.basicConfig(
    level=logging.INFO, format="[%(asctime)s] [%(levelname)s] - %(message)s", datefmt="%Y-%m-%d %H:%M:%S %Z"
)


# Simplify instantiations of Timer to automatically use the correct logger.
class Timer(Timer):  # noqa - because we're using trickery to import this
    def __init__(self, message=None):
        super().__init__(message=message, success_logger=logging.info, failure_logger=logging.error)


async def update_sql_functions(pool):
    with Timer() as t:
        sql = Path("usaspending_api/broker/management/sql/create_business_categories_functions.sql").read_text()
        async with pool.acquire() as connection:
            await connection.execute(sql)
    logging.info("Updated SQL functions in {}".format(t))


def id_ranges(min_id, max_id):
    for n in range(min_id, max_id + 1, CHUNK_SIZE + 1):
        yield n, min(n + CHUNK_SIZE, max_id)


async def get_min_max_ids(pool):
    with Timer() as t:
        sql = "select min(id), max(id) from transaction_normalized"
        async with pool.acquire() as connection:
            min_id, max_id = await connection.fetchrow(sql)
    logging.info("Found min transaction id = {} and max transaction id = {} in {}".format(min_id, max_id, t))
    return min_id, max_id


async def create_chunk_queue(min_id, max_id, loop):
    queue = asyncio.Queue(loop=loop)
    for min_id, max_id in id_ranges(min_id, max_id):
        await queue.put((min_id, max_id))

    # Effectively a sentinel value to let chunk processors know they've reached the end of the queue.
    await queue.put(END_OF_QUEUE)

    return queue


async def process_queue(queue, original_queue_size, processed_queue, pool):
    while True:

        min_id, max_id = await queue.get()
        if min_id is None:

            # Need to add the sentinel value back to the queue so other pipelines can pick it up
            # else they'll wait forever for nothing.
            await queue.put(END_OF_QUEUE)
            break

        with Timer() as t:
            sql = SQL.format(min_id=min_id, max_id=max_id)
            async with pool.acquire() as connection:
                result = await connection.execute(sql)

        await processed_queue.put((min_id, max_id))

        affected = int(result.split()[-1])

        ratio = processed_queue.qsize() / original_queue_size
        estimated_remaining_runtime = overall.as_string(overall.estimated_remaining_runtime(ratio))
        message = "Updated {:,} transactions from {} through {} in {} - estimated remaining runtime is {}".format(
            affected, min_id, max_id, t, estimated_remaining_runtime
        )
        logging.info(message)


async def run_pipelines(queue, original_queue_size, processed_queue, loop, pool):
    tasks = [
        asyncio.ensure_future(process_queue(queue, original_queue_size, processed_queue, pool), loop=loop)
        for _ in range(PIPELINES)
    ]
    await asyncio.gather(*tasks)


async def main(loop):
    async with asyncpg.create_pool(dsn=CONNECTION_STRING, loop=loop) as pool:
        await update_sql_functions(pool)
        min_id, max_id = await get_min_max_ids(pool)
        to_process_queue = await create_chunk_queue(min_id, max_id, loop)
        processed_queue = asyncio.Queue(loop=loop)
        await run_pipelines(to_process_queue, to_process_queue.qsize() - 1, processed_queue, loop, pool)


if __name__ == "__main__":
    with Timer("Overall run") as overall:
        _loop = asyncio.get_event_loop()
        try:
            _loop.run_until_complete(main(_loop))
        finally:
            _loop.close()