fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/etl/management/commands/load_query_to_delta.py

Summary

Maintainability
A
0 mins
Test Coverage
A
90%
from django.core.management.base import BaseCommand
from pyspark.sql import SparkSession

from usaspending_api.common.etl.spark import create_ref_temp_views
from usaspending_api.common.helpers.spark_helpers import (
    configure_spark_session,
    get_active_spark_session,
    get_broker_jdbc_url,
    get_jdbc_connection_properties,
    get_jvm_logger,
)
from usaspending_api.config import CONFIG
from usaspending_api.recipient.delta_models import (
    recipient_lookup_load_sql_string_list,
    RECIPIENT_LOOKUP_POSTGRES_COLUMNS,
    recipient_profile_create_sql_string,
    recipient_profile_load_sql_strings,
    RECIPIENT_PROFILE_POSTGRES_COLUMNS,
    rpt_recipient_lookup_create_sql_string,
    RPT_RECIPIENT_LOOKUP_DELTA_COLUMNS,
    SAM_RECIPIENT_COLUMNS,
    SAM_RECIPIENT_POSTGRES_COLUMNS,
    sam_recipient_create_sql_string,
    sam_recipient_load_sql_string,
    RPT_RECIPIENT_PROFILE_DELTA_COLUMNS,
)
from usaspending_api.recipient.models import RecipientLookup, RecipientProfile
from usaspending_api.search.delta_models.award_search import (
    AWARD_SEARCH_COLUMNS,
    award_search_create_sql_string,
    award_search_load_sql_string,
    AWARD_SEARCH_POSTGRES_COLUMNS,
    AWARD_SEARCH_POSTGRES_GOLD_COLUMNS,
)
from usaspending_api.search.delta_models.subaward_search import (
    SUBAWARD_SEARCH_COLUMNS,
    subaward_search_create_sql_string,
    subaward_search_load_sql_string,
    SUBAWARD_SEARCH_POSTGRES_COLUMNS,
    SUBAWARD_SEARCH_POSTGRES_VECTORS,
)
from usaspending_api.search.models import AwardSearch, SubawardSearch, SummaryStateView, TransactionSearch
from usaspending_api.transactions.delta_models import (
    transaction_search_create_sql_string,
    transaction_search_load_sql_string,
    transaction_current_cd_lookup_create_sql_string,
    transaction_current_cd_lookup_load_sql_string,
    TRANSACTION_SEARCH_POSTGRES_COLUMNS,
    TRANSACTION_SEARCH_POSTGRES_GOLD_COLUMNS,
    TRANSACTION_CURRENT_CD_LOOKUP_COLUMNS,
    SUMMARY_STATE_VIEW_COLUMNS,
    summary_state_view_create_sql_string,
    summary_state_view_load_sql_string,
    SUMMARY_STATE_VIEW_POSTGRES_COLUMNS,
)

TABLE_SPEC = {
    "award_search": {
        "model": AwardSearch,
        "is_from_broker": False,
        "source_query": award_search_load_sql_string,
        "source_database": None,
        "source_table": None,
        "destination_database": "rpt",
        "swap_table": "award_search",
        "swap_schema": "rpt",
        "partition_column": "award_id",
        "partition_column_type": "numeric",
        "is_partition_column_unique": True,
        "delta_table_create_sql": award_search_create_sql_string,
        "source_schema": AWARD_SEARCH_POSTGRES_COLUMNS,
        "custom_schema": "recipient_hash STRING, federal_accounts STRING, cfdas ARRAY<STRING>,"
        " tas_components ARRAY<STRING>",
        "column_names": list(AWARD_SEARCH_COLUMNS),
        "postgres_seq_name": None,
        "tsvectors": None,
        "postgres_partition_spec": None,
    },
    "award_search_gold": {
        "model": AwardSearch,
        "is_from_broker": False,
        "source_query": award_search_load_sql_string,
        "source_database": None,
        "source_table": None,
        "destination_database": "rpt",
        "swap_table": "award_search",
        "swap_schema": "rpt",
        "partition_column": "award_id",
        "partition_column_type": "numeric",
        "is_partition_column_unique": True,
        "delta_table_create_sql": award_search_create_sql_string,
        "source_schema": AWARD_SEARCH_POSTGRES_GOLD_COLUMNS,
        "custom_schema": "recipient_hash STRING, federal_accounts STRING, cfdas ARRAY<STRING>,"
        " tas_components ARRAY<STRING>",
        "column_names": list(AWARD_SEARCH_POSTGRES_GOLD_COLUMNS),
        "postgres_seq_name": None,
        "tsvectors": None,
        "postgres_partition_spec": None,
    },
    "recipient_lookup": {
        "model": RecipientLookup,
        "is_from_broker": False,
        "source_query": recipient_lookup_load_sql_string_list,
        "source_database": None,
        "source_table": None,
        "destination_database": "rpt",
        "swap_table": "recipient_lookup",
        "swap_schema": "rpt",
        "partition_column": "recipient_hash",
        "partition_column_type": "string",
        "is_partition_column_unique": True,
        "delta_table_create_sql": rpt_recipient_lookup_create_sql_string,
        "source_schema": RECIPIENT_LOOKUP_POSTGRES_COLUMNS,
        "custom_schema": "recipient_hash STRING",
        "column_names": list(RPT_RECIPIENT_LOOKUP_DELTA_COLUMNS),
        "postgres_seq_name": "recipient_lookup_id_seq",
        "tsvectors": None,
        "postgres_partition_spec": None,
    },
    "recipient_profile": {
        "model": RecipientProfile,
        "is_from_broker": False,
        "source_query": recipient_profile_load_sql_strings,
        "source_database": None,
        "source_table": None,
        "destination_database": "rpt",
        "swap_table": "recipient_profile",
        "swap_schema": "rpt",
        "partition_column": "recipient_hash",  # This isn't used for anything
        "partition_column_type": "string",
        "is_partition_column_unique": False,
        "delta_table_create_sql": recipient_profile_create_sql_string,
        "source_schema": RECIPIENT_PROFILE_POSTGRES_COLUMNS,
        "custom_schema": "recipient_hash STRING",
        "column_names": list(RPT_RECIPIENT_PROFILE_DELTA_COLUMNS),
        "postgres_seq_name": "recipient_profile_id_seq",
        "tsvectors": None,
        "postgres_partition_spec": None,
    },
    "summary_state_view": {
        "model": SummaryStateView,
        "is_from_broker": False,
        "source_query": summary_state_view_load_sql_string,
        "source_database": None,
        "source_table": None,
        "destination_database": "rpt",
        "swap_table": "summary_state_view",
        "swap_schema": "rpt",
        "partition_column": "duh",
        "partition_column_type": "string",
        "is_partition_column_unique": True,
        "delta_table_create_sql": summary_state_view_create_sql_string,
        "source_schema": SUMMARY_STATE_VIEW_POSTGRES_COLUMNS,
        "custom_schema": "duh STRING",
        "column_names": list(SUMMARY_STATE_VIEW_COLUMNS),
        "postgres_seq_name": None,
        "tsvectors": None,
        "postgres_partition_spec": None,
    },
    "sam_recipient": {
        "model": None,
        "is_from_broker": True,
        "source_query": sam_recipient_load_sql_string,
        "source_database": None,
        "source_table": None,
        "destination_database": "int",
        "swap_table": "duns",
        "swap_schema": "int",
        "partition_column": "broker_duns_id",
        "partition_column_type": "string",
        "is_partition_column_unique": True,
        "delta_table_create_sql": sam_recipient_create_sql_string,
        "source_schema": SAM_RECIPIENT_POSTGRES_COLUMNS,
        "custom_schema": None,
        "column_names": list(SAM_RECIPIENT_COLUMNS),
        "postgres_seq_name": None,
        "tsvectors": None,
        "postgres_partition_spec": None,
    },
    "transaction_search": {
        "model": TransactionSearch,
        "is_from_broker": False,
        "source_query": transaction_search_load_sql_string,
        "source_database": None,
        "source_table": None,
        "destination_database": "rpt",
        "swap_table": "transaction_search",
        "swap_schema": "rpt",
        "partition_column": "transaction_id",
        "partition_column_type": "numeric",
        "is_partition_column_unique": True,
        "delta_table_create_sql": transaction_search_create_sql_string,
        "source_schema": TRANSACTION_SEARCH_POSTGRES_COLUMNS,
        "custom_schema": "recipient_hash STRING, federal_accounts STRING, parent_recipient_hash STRING",
        "column_names": list(TRANSACTION_SEARCH_POSTGRES_COLUMNS),
        "postgres_seq_name": None,
        "tsvectors": None,
        "postgres_partition_spec": None,
    },
    "transaction_search_gold": {
        "model": TransactionSearch,
        "is_from_broker": False,
        "source_query": transaction_search_load_sql_string,
        "source_database": None,
        "source_table": None,
        "destination_database": "rpt",
        "swap_table": "transaction_search",
        "swap_schema": "rpt",
        "partition_column": "transaction_id",
        "partition_column_type": "numeric",
        "is_partition_column_unique": True,
        "delta_table_create_sql": transaction_search_create_sql_string,
        "source_schema": TRANSACTION_SEARCH_POSTGRES_GOLD_COLUMNS,
        "custom_schema": "recipient_hash STRING, federal_accounts STRING, parent_recipient_hash STRING",
        "column_names": list(TRANSACTION_SEARCH_POSTGRES_GOLD_COLUMNS),
        "postgres_seq_name": None,
        "tsvectors": None,
        "postgres_partition_spec": {
            "partition_keys": ["is_fpds"],
            "partitioning_form": "LIST",
            "partitions": [
                {"table_suffix": "_fpds", "partitioning_clause": "FOR VALUES IN (TRUE)"},
                {"table_suffix": "_fabs", "partitioning_clause": "FOR VALUES IN (FALSE)"},
            ],
        },
    },
    "transaction_current_cd_lookup": {
        "model": None,
        "is_from_broker": False,
        "source_query": transaction_current_cd_lookup_load_sql_string,
        "source_database": None,
        "source_table": None,
        "destination_database": "int",
        "swap_table": "transaction_current_cd_lookup",
        "swap_schema": "int",
        "partition_column": "transaction_id",
        "partition_column_type": "numeric",
        "is_partition_column_unique": True,
        "delta_table_create_sql": transaction_current_cd_lookup_create_sql_string,
        "source_schema": TRANSACTION_CURRENT_CD_LOOKUP_COLUMNS,
        "custom_schema": "",
        "column_names": list(TRANSACTION_CURRENT_CD_LOOKUP_COLUMNS),
        "postgres_seq_name": None,
        "tsvectors": None,
        "postgres_partition_spec": None,
    },
    "subaward_search": {
        "model": SubawardSearch,
        "is_from_broker": False,
        "source_query": subaward_search_load_sql_string,
        "source_database": None,
        "source_table": None,
        "destination_database": "rpt",
        "swap_table": "subaward_search",
        "swap_schema": "rpt",
        "partition_column": "broker_subaward_id",
        "partition_column_type": "numeric",
        "is_partition_column_unique": True,
        "delta_table_create_sql": subaward_search_create_sql_string,
        "source_schema": SUBAWARD_SEARCH_POSTGRES_COLUMNS,
        "custom_schema": "treasury_account_identifiers ARRAY<INTEGER>",
        "column_names": list(SUBAWARD_SEARCH_COLUMNS),
        "postgres_seq_name": None,
        "tsvectors": SUBAWARD_SEARCH_POSTGRES_VECTORS,
        "postgres_partition_spec": None,
    },
}


class Command(BaseCommand):

    help = """
    This command reads data via a Spark SQL query that relies on delta tables that have already been loaded paired
    with temporary views of tables in a Postgres database. As of now, it only supports a full reload of a table.
    All existing data will be deleted before new data is written.
    """

    # Values defined in the handler
    destination_database: str
    destination_table_name: str
    spark: SparkSession

    def add_arguments(self, parser):
        parser.add_argument(
            "--destination-table",
            type=str,
            required=True,
            help="The destination Delta Table to write the data",
            choices=list(TABLE_SPEC),
        )
        parser.add_argument(
            "--alt-db",
            type=str,
            required=False,
            help="An alternate database (aka schema) in which to create this table, overriding the TABLE_SPEC db",
        )
        parser.add_argument(
            "--alt-name",
            type=str,
            required=False,
            help="An alternate delta table name for the created table, overriding the TABLE_SPEC destination_table "
            "name",
        )

    def handle(self, *args, **options):
        extra_conf = {
            # Config for Delta Lake tables and SQL. Need these to keep Dela table metadata in the metastore
            "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
            "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
            # See comment below about old date and time values cannot parsed without these
            "spark.sql.legacy.parquet.datetimeRebaseModeInWrite": "LEGACY",  # for dates at/before 1900
            "spark.sql.legacy.parquet.int96RebaseModeInWrite": "LEGACY",  # for timestamps at/before 1900
            "spark.sql.jsonGenerator.ignoreNullFields": "false",  # keep nulls in our json
        }

        self.spark = get_active_spark_session()
        spark_created_by_command = False
        if not self.spark:
            spark_created_by_command = True
            self.spark = configure_spark_session(**extra_conf, spark_context=self.spark)  # type: SparkSession

        # Setup Logger
        logger = get_jvm_logger(self.spark, __name__)

        # Resolve Parameters
        destination_table = options["destination_table"]
        table_spec = TABLE_SPEC[destination_table]
        self.destination_database = options["alt_db"] or table_spec["destination_database"]
        self.destination_table_name = options["alt_name"] or destination_table.split(".")[-1]

        # Set the database that will be interacted with for all Delta Lake table Spark-based activity
        logger.info(f"Using Spark Database: {self.destination_database}")
        self.spark.sql(f"use {self.destination_database};")

        # Create User Defined Functions if needed
        if table_spec.get("user_defined_functions"):
            for udf_args in table_spec["user_defined_functions"]:
                self.spark.udf.register(**udf_args)

        create_ref_temp_views(self.spark, create_broker_views=True)

        load_query = table_spec["source_query"]
        if isinstance(load_query, list):
            for index, query in enumerate(load_query):
                logger.info(f"Running query number: {index + 1}\nPreview of query: {query[:100]}")
                self.run_spark_sql(query)
        else:
            self.run_spark_sql(load_query)

        if spark_created_by_command:
            self.spark.stop()

    def run_spark_sql(self, query):
        jdbc_conn_props = get_jdbc_connection_properties()
        self.spark.sql(
            query.format(
                DESTINATION_DATABASE=self.destination_database,
                DESTINATION_TABLE=self.destination_table_name,
                DELTA_LAKE_S3_PATH=CONFIG.DELTA_LAKE_S3_PATH,
                JDBC_DRIVER=jdbc_conn_props["driver"],
                JDBC_FETCHSIZE=jdbc_conn_props["fetchsize"],
                JDBC_URL=get_broker_jdbc_url(),
            )
        )