fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/download/management/commands/populate_monthly_files.py

Summary

Maintainability
B
5 hrs
Test Coverage
C
76%
import logging
import datetime
import json
import boto3
import re

from django.conf import settings
from django.core.management.base import BaseCommand
from usaspending_api.awards.v2.lookups.lookups import procurement_type_mapping, assistance_type_mapping
from usaspending_api.common.helpers.dict_helpers import order_nested_object
from usaspending_api.common.helpers.fiscal_year_helpers import generate_fiscal_year
from usaspending_api.common.helpers.s3_helpers import multipart_upload
from usaspending_api.common.sqs.sqs_handler import get_sqs_queue
from usaspending_api.download.filestreaming import download_generation
from usaspending_api.download.helpers import pull_modified_agencies_cgacs
from usaspending_api.download.lookups import JOB_STATUS_DICT
from usaspending_api.download.models.download_job import DownloadJob
from usaspending_api.download.v2.request_validations import AwardDownloadValidator
from usaspending_api.references.models import ToptierAgency

logger = logging.getLogger(__name__)

award_mappings = {
    "contracts": list(procurement_type_mapping.keys()),
    "assistance": list(assistance_type_mapping.keys()),
}


class Command(BaseCommand):
    def download(
        self,
        file_name,
        prime_award_types=None,
        agency=None,
        sub_agency=None,
        date_type=None,
        start_date=None,
        end_date=None,
        columns=[],
        file_format="csv",
        monthly_download=False,
        cleanup=False,
        use_sqs=False,
    ):
        date_range = {}
        if start_date:
            date_range["start_date"] = start_date
        if end_date:
            date_range["end_date"] = end_date
        json_request = {
            "constraint_type": "year",
            "filters": {
                "prime_award_types": prime_award_types,
                "agency": str(agency),
                "date_type": date_type,
                "date_range": date_range,
            },
            "columns": columns,
            "file_format": file_format,
        }
        award_download = AwardDownloadValidator(json_request)
        validated_request = award_download.json_request
        download_job = DownloadJob.objects.create(
            job_status_id=JOB_STATUS_DICT["ready"],
            file_name=file_name,
            json_request=json.dumps(order_nested_object(validated_request)),
            monthly_download=True,
        )

        if not use_sqs:
            # Note: Because of the line below, it's advised to only run this script on a separate instance as this will
            #       modify your bulk download settings.
            settings.BULK_DOWNLOAD_S3_BUCKET_NAME = settings.MONTHLY_DOWNLOAD_S3_BUCKET_NAME
            download_generation.generate_download(download_job=download_job)
            if cleanup:
                # Get all the files that have the same prefix except for the update date
                file_name_prefix = file_name[:-12]  # subtracting the 'YYYYMMDD.zip'
                for key in self.bucket.objects.filter(Prefix=file_name_prefix):
                    if key.key == file_name:
                        # ignore the one we just uploaded
                        continue
                    key.delete()
                    logger.info("Deleting {} from bucket".format(key.key))
        else:
            queue = get_sqs_queue(queue_name=settings.BULK_DOWNLOAD_SQS_QUEUE_NAME)
            queue.send_message(MessageBody=str(download_job.download_job_id))

    def upload_placeholder(self, file_name, empty_file):
        bucket = settings.BULK_DOWNLOAD_S3_BUCKET_NAME
        region = settings.USASPENDING_AWS_REGION

        logger.info("Uploading {}".format(file_name))
        multipart_upload(bucket, region, empty_file, file_name)

    def add_arguments(self, parser):
        parser.add_argument(
            "--local",
            action="store_true",
            dest="local",
            default=False,
            help="Generate all the files locally. Note they will still be uploaded to the S3.",
        )
        parser.add_argument(
            "--clobber",
            action="store_true",
            dest="clobber",
            default=False,
            help="Uploads files regardless if they have already been uploaded that day.",
        )
        parser.add_argument(
            "--use_modified_list",
            action="store_true",
            dest="use_modified_list",
            default=False,
            help="Uses the modified agency list instead of the standard agency list",
        )
        parser.add_argument(
            "--agencies",
            dest="agencies",
            nargs="+",
            default=None,
            type=str,
            help="Specific toptier agency database ids (overrides use_modified_list)."
            " Note 'all' may be provided to account for the downloads that comprise "
            " all agencies for a fiscal_year.",
        )
        parser.add_argument(
            "--award_types",
            dest="award_types",
            nargs="+",
            default=["assistance", "contracts"],
            type=str,
            help="Specific award types, must be 'contracts' and/or 'assistance'",
        )
        parser.add_argument(
            "--fiscal_years", dest="fiscal_years", nargs="+", default=None, type=int, help="Specific Fiscal Years"
        )
        parser.add_argument(
            "--placeholders",
            action="store_true",
            dest="placeholders",
            default=False,
            help="Upload empty files as placeholders.",
        )
        parser.add_argument(
            "--cleanup",
            action="store_true",
            dest="cleanup",
            default=False,
            help="Deletes the previous version of the newly generated file after uploading"
            " (only applies if --local is also provided).",
        )
        parser.add_argument(
            "--empty-assistance-file",
            dest="empty_assistance_file",
            default="",
            help="Empty assistance file for uploading",
        )
        parser.add_argument(
            "--empty-contracts-file", dest="empty_contracts_file", default="", help="Empty contracts file for uploading"
        )

    def handle(self, *args, **options):
        """Run the application."""

        # Make sure
        #   settings.BULK_DOWNLOAD_S3_BUCKET_NAME
        #   settings.BULK_DOWNLOAD_SQS_QUEUE_NAME
        #   settings.USASPENDING_AWS_REGION
        # are properly configured!

        local = options["local"]
        clobber = options["clobber"]
        use_modified_list = options["use_modified_list"]
        agencies = options["agencies"]
        award_types = options["award_types"]
        for award_type in award_types:
            if award_type not in ["contracts", "assistance"]:
                raise Exception("Unacceptable award type: {}".format(award_type))
        fiscal_years = options["fiscal_years"]
        placeholders = options["placeholders"]
        cleanup = options["cleanup"]
        empty_assistance_file = options["empty_assistance_file"]
        empty_contracts_file = options["empty_contracts_file"]
        if placeholders and (not empty_assistance_file or not empty_contracts_file):
            raise Exception("Placeholder arg provided but empty files not provided")

        current_date = datetime.date.today()
        updated_date_timestamp = datetime.datetime.strftime(current_date, "%Y%m%d")

        toptier_agencies = ToptierAgency.objects.all()
        include_all = True
        if use_modified_list:
            used_cgacs = set(pull_modified_agencies_cgacs())
            toptier_agencies = ToptierAgency.objects.filter(toptier_code__in=used_cgacs)
        if agencies:
            if "all" in agencies:
                agencies.remove("all")
            else:
                include_all = False
            toptier_agencies = ToptierAgency.objects.filter(toptier_agency_id__in=agencies)
        toptier_agencies = list(toptier_agencies.values("name", "toptier_agency_id", "toptier_code"))
        # Adding 'all' to prevent duplication of code
        if include_all:
            toptier_agencies.append({"name": "All", "toptier_agency_id": "all", "toptier_code": "All"})
        if not fiscal_years:
            fiscal_years = range(2001, generate_fiscal_year(current_date) + 1)

        # moving it to self.bucket as it may be used in different cases
        bucket_name = settings.MONTHLY_DOWNLOAD_S3_BUCKET_NAME
        region_name = settings.USASPENDING_AWS_REGION
        self.bucket = boto3.resource("s3", region_name=region_name).Bucket(bucket_name)

        if not clobber:
            reuploads = []
            for key in self.bucket.objects.all():
                re_match = re.findall("(.*)_Full_{}.zip".format(updated_date_timestamp), key.key)
                if re_match:
                    reuploads.append(re_match[0])

        logger.info("Generating {} files...".format(len(toptier_agencies) * len(fiscal_years) * 2))
        for agency in toptier_agencies:
            for fiscal_year in fiscal_years:
                start_date = "{}-10-01".format(fiscal_year - 1)
                end_date = "{}-09-30".format(fiscal_year)
                for award_type in award_types:
                    file_name = f"FY{fiscal_year}_{agency['toptier_code']}_{award_type.capitalize()}"
                    full_file_name = f"{file_name}_Full_{updated_date_timestamp}.zip"
                    if not clobber and file_name in reuploads:
                        logger.info(f"Skipping already uploaded: {full_file_name}")
                        continue
                    if placeholders:
                        empty_file = empty_contracts_file if award_type == "contracts" else empty_assistance_file
                        self.upload_placeholder(file_name=full_file_name, empty_file=empty_file)
                    else:
                        self.download(
                            file_name=full_file_name,
                            prime_award_types=award_mappings[award_type],
                            agency=agency["toptier_agency_id"],
                            date_type="action_date",
                            start_date=start_date,
                            end_date=end_date,
                            monthly_download=True,
                            cleanup=cleanup,
                            use_sqs=(not local),
                        )
        logger.info("Populate Monthly Files complete")