fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/database_scripts/job_archive/backfill_solicitation_date.py

Summary

Maintainability
F
4 days
Test Coverage
F
0%
"""
Jira Ticket Number(s): DEV-3653

    Backfill missing solicitation date values due to timing issue of previous backfill

Expected CLI:

    $ python3 usaspending_api/database_scripts/job_archive/backfill_solicitation_date.py

Purpose:

    Fetch solicitation date from Broker table:
        - detached_award_procurement
    And populate FPDS transaction table columns:
        - solicitation_date
    And ensure transactions are included in monthly archive delta files
        - transaction_delta
"""
import logging
import math
import psycopg2
import time

from datetime import datetime, timezone
from os import environ


# DEFINE THESE ENVIRONMENT VARIABLES BEFORE RUNNING!
SPENDING_CONNECTION_STRING = environ["DATABASE_URL"]
BROKER_CONNECTION_STRING = environ["DATA_BROKER_DATABASE_URL"]


BROKER_FPDS_SELECT_SQL = """
SELECT
    detached_award_procurement_id,
    solicitation_date::date
FROM
    detached_award_procurement
WHERE
    detached_award_procurement_id IN %s
"""

SPENDING_FPDS_UPDATE_SQL = """
UPDATE
    transaction_fpds AS fpds SET
        solicitation_date = broker.solicitation_date
FROM
    (VALUES {}) AS broker(
        detached_award_procurement_id,
        solicitation_date
    )
WHERE
    fpds.detached_award_procurement_id = broker.detached_award_procurement_id
    AND (
        fpds.solicitation_date IS DISTINCT FROM broker.solicitation_date
    )
RETURNING fpds.transaction_id
"""

ADD_TRANSACTIONS_TO_DELTA = """
INSERT INTO transaction_delta (transaction_id, created_at)
VALUES {} ON CONFLICT (transaction_id) DO NOTHING
"""


GET_FPDS_IDS_SQL = """
SELECT detached_award_procurement_id
FROM detached_award_procurement
WHERE solicitation_date IS NOT NULL
"""

CHUNK_SIZE = 25000


class Timer:
    def __enter__(self):
        self.start = time.perf_counter()
        return self

    def __exit__(self, *args, **kwargs):
        self.end = time.perf_counter()
        self.elapsed = self.end - self.start
        self.elapsed_as_string = self.pretty_print(self.elapsed)

    def estimated_remaining_runtime(self, ratio):
        end = time.perf_counter()
        elapsed = end - self.start
        est = max((elapsed / ratio) - elapsed, 0.0)
        return self.pretty_print(est)

    @staticmethod
    def pretty_print(elapsed):
        f, s = math.modf(elapsed)
        m, s = divmod(s, 60)
        h, m = divmod(m, 60)
        return "%d:%02d:%02d.%04d" % (h, m, s, f * 10000)


def build_spending_update_query(query_base, update_data):
    values_string = ""
    for count, row in enumerate(update_data, 1):
        values_string += "("
        values_string += ",".join(["%s"] * len(row))
        values_string += ")"
        if count != len(update_data):
            values_string += ","
    return query_base.format(values_string)


def print_no_rows_to_update(transaction_type):
    logging.info("[{}] No rows to update".format(transaction_type))


def run_broker_select_query(transaction_sql, id_tuple):
    with broker_connection.cursor() as select_cursor:
        select_cursor.execute(transaction_sql, [id_tuple])
        return select_cursor.fetchall()


def run_spending_update_query(transaction_sql, transaction_type, broker_data):
    with spending_connection.cursor() as update_cursor:
        update_query = build_spending_update_query(transaction_sql, broker_data)
        with Timer() as t:
            update_cursor.execute(update_query, [col for row in broker_data for col in row])
        row_count = update_cursor.rowcount
        logging.info("[{}] {:,} rows updated in {}".format(transaction_type, row_count, t.elapsed_as_string))
        if row_count > 0:
            now = datetime.now(timezone.utc)
            insert_values = ",".join(["({},'{}')".format(id[0], now) for id in update_cursor.fetchall()])
            update_cursor.execute(ADD_TRANSACTIONS_TO_DELTA.format(insert_values))
            logging.info("[{}] {:,} rows added to transaction_delta".format(transaction_type, update_cursor.rowcount))

        return row_count


if __name__ == "__main__":
    log_format = "[%(asctime)s] [%(levelname)s] - %(message)s"
    logging.getLogger()
    logging.basicConfig(level=logging.INFO, format=log_format, datefmt="%Y/%m/%d %H:%M:%S (%Z)")

    fabs_row_count, fpds_row_count = 0, 0

    with Timer() as overall_timer:
        with psycopg2.connect(dsn=SPENDING_CONNECTION_STRING) as spending_connection, psycopg2.connect(
            dsn=BROKER_CONNECTION_STRING
        ) as broker_connection:
            spending_connection.autocommit = True

            logging.info("Running FPDS backfill from Broker to USAspending")
            logging.info("Finding Detached_Award_Procurement_IDs for FPDS...")
            with broker_connection.cursor() as cursor:
                cursor.execute(GET_FPDS_IDS_SQL)
                fpds_ids = tuple(id[0] for id in cursor.fetchall())
                fpds_total = len(fpds_ids)

            logging.info("Total Detached_Award_Procurement_IDs: {:,}".format(fpds_total))

            with Timer() as chunk_timer:
                for i in range(0, fpds_total, CHUNK_SIZE):
                    max_index = i + CHUNK_SIZE if i + CHUNK_SIZE < fpds_total else fpds_total
                    fpds_ids_batch = tuple(fpds_ids[i:max_index])

                    logging.info("Fetching {}-{} out of {} records from broker".format(i, max_index, fpds_total))
                    broker_fpds_data = run_broker_select_query(BROKER_FPDS_SELECT_SQL, fpds_ids_batch)
                    if broker_fpds_data:
                        updated_row_count = run_spending_update_query(
                            SPENDING_FPDS_UPDATE_SQL, "FPDS", broker_fpds_data
                        )
                        fpds_row_count += updated_row_count
                    else:
                        print_no_rows_to_update("FPDS")

            logging.info(
                "Finished running FPDS backfill. Took {} to update {:,} rows".format(
                    chunk_timer.elapsed_as_string, fpds_row_count
                )
            )

    logging.info(
        "Finished. Overall run time to update {:,} rows: {}".format(
            fabs_row_count + fpds_row_count, overall_timer.elapsed_as_string
        )
    )