fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/awards/management/commands/generate_unlinked_awards_download.py

Summary

Maintainability
A
0 mins
Test Coverage
F
0%
import logging

from datetime import datetime, timezone
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from pathlib import Path
from usaspending_api.common.etl.spark import create_ref_temp_views

from usaspending_api.common.helpers.s3_helpers import upload_download_file_to_s3
from usaspending_api.common.helpers.spark_helpers import configure_spark_session, get_active_spark_session
from usaspending_api.common.helpers.download_csv_strategies import (
    SparkToCSVStrategy,
)
from enum import Enum
from usaspending_api.awards.management.sql.spark.unlinked_contracts_file_d1 import file_d1_sql_string
from usaspending_api.awards.management.sql.spark.unlinked_awards_summary_file import summary_file
from usaspending_api.awards.management.sql.spark.unlinked_assistance_file_d2 import file_d2_sql_string
from usaspending_api.awards.management.sql.spark.unlinked_accounts_file_c import file_c_sql_string
from usaspending_api.download.filestreaming.file_description import build_file_description, save_file_description
from usaspending_api.download.filestreaming.zip_file import append_files_to_zip_file
from usaspending_api.references.models.toptier_agency import ToptierAgency


class ComputeTypeEnum(Enum):
    SPARK = "spark"


logger = logging.getLogger("script")


class Command(BaseCommand):
    help = "Assemble unlinked award by agency data into CSVs and Zip"
    file_format = "csv"
    filepaths_to_delete = []
    total_download_count = 0
    total_download_columns = 0
    total_download_size = 0
    working_dir_path = Path(settings.CSV_LOCAL_PATH)
    full_timestamp = datetime.strftime(datetime.now(timezone.utc), "%Y-%m-%d_H%HM%MS%S%f")
    _agency_name = None
    _toptier_code = None

    # KEY is the type of compute supported by this command
    # key's VALUE are the strategies required by the compute type
    # These strategies are used to change the behavior of this command
    #   at runtime.
    compute_types = {
        ComputeTypeEnum.SPARK.value: {
            "source_sql_strategy": {
                "summary_file": summary_file,
                "unlinked_contracts_file_d1": file_d1_sql_string,
                "unlinked_assistance_file_d2": file_d2_sql_string,
                "unlinked_accounts_file_c": file_c_sql_string,
            },
            "download_to_csv_strategy": SparkToCSVStrategy(logger=logger),
            "readme_path": Path(settings.UNLINKED_AWARDS_DOWNLOAD_README_FILE_PATH),
        },
    }

    def add_arguments(self, parser):
        parser.add_argument(
            "--compute-type",
            choices=list(self.compute_types.keys()),
            default=ComputeTypeEnum.SPARK.value,
            help="Specify the type of compute to use when executing this command.",
        )
        parser.add_argument(
            "--skip-upload",
            action="store_true",
            help="Don't store the list of IDs for downline ETL. Automatically skipped if --dry-run is provided",
        )

    def handle(self, *args, **options):
        """
        Generates a download data package specific to unlinked awards by agency
        """
        self.upload = not options["skip_upload"]
        self.compute_type_arg = options.get("compute_type")
        self.readme_path = self.compute_types[self.compute_type_arg]["readme_path"]

        if self.compute_type_arg == ComputeTypeEnum.SPARK.value:
            extra_conf = {
                # Config for Delta Lake tables and SQL. Need these to keep Dela table metadata in the metastore
                "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
                "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
                # See comment below about old date and time values cannot be parsed without these
                "spark.sql.legacy.parquet.datetimeRebaseModeInWrite": "LEGACY",  # for dates at/before 1900
                "spark.sql.legacy.parquet.int96RebaseModeInWrite": "LEGACY",  # for timestamps at/before 1900
                "spark.sql.jsonGenerator.ignoreNullFields": "false",  # keep nulls in our json
            }
            self.spark = get_active_spark_session()
            if not self.spark:
                self.spark_created_by_command = True
                self.spark = configure_spark_session(**extra_conf, spark_context=self.spark)
            create_ref_temp_views(self.spark)
        else:
            raise CommandError(
                f"Right now, only compute type {ComputeTypeEnum.SPARK.value} is supported by this command."
            )

        self.download_csv_strategy = self.compute_types[self.compute_type_arg]["download_to_csv_strategy"]
        self.download_source_sql = self.compute_types[self.compute_type_arg]["source_sql_strategy"]

        # Obtain the agencies to generate files for
        toptier_agencies = ToptierAgency.objects.values("name", "toptier_code").distinct()

        # load_delta_table
        # Save queries as delta tables for efficiency
        for delta_table_name, sql_file, final_name in self.download_file_list:
            df = self.spark.sql(sql_file)
            df.write.format(source="delta").mode(saveMode="overwrite").option("overwriteSchema", "True").saveAsTable(
                name=delta_table_name
            )

        for agency in toptier_agencies:
            agency_name = agency["name"]
            for char in settings.UNLINKED_AWARDS_AGENCY_NAME_CHARS_TO_REPLACE:
                agency_name = agency_name.replace(char, "_")
            self._agency_name = agency_name

            self._toptier_code = agency["toptier_code"]
            zip_file_path = self.working_dir_path / f"{self._agency_name}_UnlinkedAwards_{self.full_timestamp}.zip"

            try:
                self.prep_filesystem(zip_file_path)
                self.process_data_copy_jobs(zip_file_path)
                self.complete_zip_and_upload(zip_file_path)
            except Exception:
                logger.exception("Exception encountered. See logs")
                raise
            finally:
                # "best-effort" attempt to cleanup temp files after a failure. Isn't 100% effective
                self.cleanup()

    def _create_data_csv_dest_path(self, file_name) -> Path:
        return self.working_dir_path / file_name

    def process_data_copy_jobs(self, zip_file_path):
        logger.info(f"Creating new unlinked awards download zip file: {zip_file_path}")
        self.filepaths_to_delete.append(zip_file_path)

        for delta_table_name, sql_file, final_name in self.download_file_list:
            df = self.spark.sql(f"select * from {delta_table_name} where toptier_code = '{self._toptier_code}'")
            sql_file = None
            final_path = self._create_data_csv_dest_path(final_name)
            intermediate_data_file_path = final_path.parent / (final_path.name + "_temp")
            data_file_names, count = self.download_to_csv(
                sql_file, final_path, final_name, str(intermediate_data_file_path), zip_file_path, df
            )
            if count <= 0:
                logger.warning(f"Empty data file generated: {final_path}!")

            self.filepaths_to_delete.extend(self.working_dir_path.glob(f"{final_path.stem}*"))

    def complete_zip_and_upload(self, zip_file_path):
        self.finalize_zip_contents(zip_file_path)
        if self.upload:
            logger.info("Upload final zip file to S3")
            upload_download_file_to_s3(zip_file_path, settings.UNLINKED_AWARDS_DOWNLOAD_REDIRECT_DIR)
            logger.info("Marking zip file for deletion in cleanup")
        else:
            logger.warn("Not uploading zip file to S3. Leaving file locally")
            self.filepaths_to_delete.remove(zip_file_path)

    @property
    def download_file_list(self):
        short_timestamp = self.full_timestamp[:-6]
        return [
            (
                "summary_file",
                f"{self.download_source_sql['summary_file']}",
                f"{self._agency_name}_UnlinkedAwardsSummary_{short_timestamp}",
            ),
            (
                "unlinked_contracts_file_d1",
                f"{self.download_source_sql['unlinked_contracts_file_d1']}",
                f"{self._agency_name}_UnlinkedContracts_{short_timestamp}",
            ),
            (
                "unlinked_assistance_file_d2",
                f"{self.download_source_sql['unlinked_assistance_file_d2']}",
                f"{self._agency_name}_UnlinkedFinancialAssistance_{short_timestamp}",
            ),
            (
                "unlinked_accounts_file_c",
                f"{self.download_source_sql['unlinked_accounts_file_c']}",
                f"{self._agency_name}_UnlinkedAccountsByAward_{short_timestamp}",
            ),
        ]

    def cleanup(self):
        for path in self.filepaths_to_delete:
            self.filepaths_to_delete.remove(path)
            logger.info(f"Removing {path}")
            path.unlink()

    def finalize_zip_contents(self, zip_file_path):
        file_description = build_file_description(str(self.readme_path), dict())
        file_description_path = save_file_description(
            str(zip_file_path.parent), self.readme_path.name, file_description
        )
        append_files_to_zip_file([file_description_path], str(zip_file_path))
        self.total_download_size = zip_file_path.stat().st_size

    def prep_filesystem(self, zip_file_path):
        if zip_file_path.exists():
            # Clean up a zip file that might exist from a prior attempt at this download
            self.filepaths_to_delete.remove(zip_file_path)
            zip_file_path.unlink()

        if not zip_file_path.parent.exists():
            zip_file_path.parent.mkdir()

    def download_to_csv(
        self, source_sql, destination_path, destination_file_name, intermediate_data_filename, zip_file_path, source_df
    ):
        return self.download_csv_strategy.download_to_csv(
            source_sql, destination_path, destination_file_name, self.working_dir_path, zip_file_path, source_df
        )