fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/agency/v2/views/subcomponents.py

Summary

Maintainability
A
0 mins
Test Coverage
A
97%
from django.db.models import Case, F, Max, OuterRef, Q, Subquery, Sum, TextField, Value, When
from rest_framework.request import Request
from rest_framework.response import Response
from typing import Any
from usaspending_api.accounts.models.appropriation_account_balances import AppropriationAccountBalances
from usaspending_api.agency.v2.views.agency_base import AgencyBase, PaginationMixin
from usaspending_api.common.cache_decorator import cache_response
from usaspending_api.common.helpers.date_helper import now
from usaspending_api.common.helpers.generic_helper import get_pagination_metadata, sort_with_null_last
from usaspending_api.common.helpers.orm_helpers import ConcatAll
from usaspending_api.financial_activities.models import FinancialAccountsByProgramActivityObjectClass
from usaspending_api.references.models import BureauTitleLookup
from usaspending_api.submissions.models import SubmissionAttributes


class SubcomponentList(PaginationMixin, AgencyBase):
    """
    Obtain the count of subcomponents (bureaus) for a specific agency in a single
    fiscal year based on GTAS
    """

    endpoint_doc = "usaspending_api/api_contracts/contracts/v2/agency/toptier_code/sub_components.md"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.params_to_validate = ["toptier_code", "fiscal_year"]

    @cache_response()
    def get(self, request: Request, *args: Any, **kwargs: Any) -> Response:
        self.sortable_columns = ["name", "total_obligations", "total_outlays", "total_budgetary_resources"]
        self.default_sort_column = "total_obligations"
        results = self.format_results(self.get_file_a_queryset(), self.get_file_b_queryset())
        page_metadata = get_pagination_metadata(len(results), self.pagination.limit, self.pagination.page)
        return Response(
            {
                "toptier_code": self.toptier_code,
                "fiscal_year": self.fiscal_year,
                "results": results[self.pagination.lower_limit : self.pagination.upper_limit],
                "messages": self.standard_response_messages,
                "page_metadata": page_metadata,
            }
        )

    def format_results(self, file_a_response, file_b_response):

        # Combine File A and B Responses
        combined_list_dict = {}

        for row in file_a_response:
            combined_list_dict[row["bureau_info"]] = row

        for row in file_b_response:
            if row["bureau_info"] not in combined_list_dict:
                combined_list_dict[row["bureau_info"]] = row
            else:
                combined_list_dict[row["bureau_info"]].update(row)

        combined_response = [value for key, value in combined_list_dict.items()]

        # Format Combined Response
        results = sort_with_null_last(
            to_sort=[
                {
                    "name": x["bureau_info"].split(";")[0] if x.get("bureau_info") is not None else None,
                    "id": x["bureau_info"].split(";")[1] if x.get("bureau_info") is not None else None,
                    "total_obligations": x["total_obligations"] if x["total_obligations"] else None,
                    "total_outlays": x["total_outlays"] if x["total_outlays"] else None,
                    "total_budgetary_resources": x["total_budgetary_resources"]
                    if x["total_budgetary_resources"]
                    else None,
                }
                for x in combined_response
            ],
            sort_key=self.pagination.sort_key,
            sort_order=self.pagination.sort_order,
        )
        return results

    def get_file_a_queryset(self):
        """
        Query Total Budgetary Resources per Bureau from File A for a single Period
        """
        filters, bureau_info_subquery = self.get_common_query_objects("treasury_account_identifier")

        results = (
            (AppropriationAccountBalances.objects.filter(*filters))
            .annotate(bureau_info=bureau_info_subquery)
            .values("bureau_info")
            .annotate(
                total_budgetary_resources=Sum("total_budgetary_resources_amount_cpe"),
            )
            .exclude(bureau_info__isnull=True)
            .values("bureau_info", "total_budgetary_resources")
        )
        return results

    def get_file_b_queryset(self):
        """
        Query Obligations and Outlays per Bureau from File B for a single Period
        """
        filters, bureau_info_subquery = self.get_common_query_objects("treasury_account")

        results = (
            (FinancialAccountsByProgramActivityObjectClass.objects.filter(*filters))
            .annotate(bureau_info=bureau_info_subquery)
            .values("bureau_info")
            .annotate(
                total_obligations=Sum("obligations_incurred_by_program_object_class_cpe"),
                total_outlays=Sum("gross_outlay_amount_by_program_object_class_cpe"),
            )
            .exclude(bureau_info__isnull=True)
            .values("bureau_info", "total_obligations", "total_outlays")
        )
        return results

    def get_common_query_objects(self, treasury_account_keyword):
        latest = (
            SubmissionAttributes.objects.filter(
                submission_window__submission_reveal_date__lte=now(), reporting_fiscal_year=self.fiscal_year
            )
            .values("reporting_fiscal_year")
            .annotate(max_fiscal_period=Max(F("reporting_fiscal_period")))
            .values("max_fiscal_period")
        )
        filters = [
            Q(**{f"{treasury_account_keyword}__federal_account__parent_toptier_agency": self.toptier_agency}),
            Q(submission__reporting_fiscal_year=self.fiscal_year),
            Q(submission__reporting_fiscal_period=latest[0]["max_fiscal_period"]),
        ]
        bureau_info_subquery = Subquery(
            BureauTitleLookup.objects.filter(
                federal_account_code=OuterRef(f"{treasury_account_keyword}__federal_account__federal_account_code")
            )
            .exclude(federal_account_code__isnull=True)
            .annotate(
                bureau_info=Case(
                    When(
                        federal_account_code__startswith="057",
                        then=ConcatAll(Value("Air Force"), Value(";"), Value("air-force")),
                    ),
                    When(
                        federal_account_code__startswith="021",
                        then=ConcatAll(Value("Army"), Value(";"), Value("army")),
                    ),
                    When(
                        federal_account_code__startswith="017",
                        then=ConcatAll(Value("Navy, Marine Corps"), Value(";"), Value("navy-marine-corps")),
                    ),
                    When(
                        federal_account_code__startswith="097",
                        then=ConcatAll(Value("Defense-wide"), Value(";"), Value("defense-wide")),
                    ),
                    default=ConcatAll(F("bureau_title"), Value(";"), F("bureau_slug")),
                    output_field=TextField(),
                )
            )
            .values("bureau_info")
        )

        return filters, bureau_info_subquery