fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/search/v2/views/spending_over_time.py

Summary

Maintainability
B
5 hrs
Test Coverage
A
97%
import copy
import logging
from calendar import monthrange
from collections import OrderedDict
from datetime import datetime, timezone
from typing import Tuple

from django.conf import settings
from django.db.models import F, IntegerField, Sum, Value
from elasticsearch_dsl import A, Search
from elasticsearch_dsl.response import AggResponse
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView

from usaspending_api.awards.v2.filters.sub_award import subaward_filter
from usaspending_api.common.api_versioning import API_TRANSFORM_FUNCTIONS, api_transformations
from usaspending_api.common.cache_decorator import cache_response
from usaspending_api.common.elasticsearch.search_wrappers import AwardSearch, TransactionSearch
from usaspending_api.common.exceptions import InvalidParameterException
from usaspending_api.common.helpers.fiscal_year_helpers import (
    bolster_missing_time_periods,
    clean_subaward_spending_over_time_results,
    generate_date_range,
    generate_fiscal_month,
    generate_fiscal_year,
)
from usaspending_api.common.helpers.generic_helper import (
    get_generic_filters_message,
    min_and_max_from_date_ranges,
)
from usaspending_api.common.helpers.orm_helpers import FiscalMonth, FiscalQuarter
from usaspending_api.common.query_with_filters import QueryWithFilters
from usaspending_api.common.validator.award_filter import AWARD_FILTER_W_FILTERS
from usaspending_api.common.validator.pagination import PAGINATION
from usaspending_api.common.validator.tinyshield import TinyShield
from usaspending_api.search.filters.elasticsearch.filter import _QueryType
from usaspending_api.search.filters.time_period.decorators import NewAwardsOnlyTimePeriod
from usaspending_api.search.filters.time_period.query_types import AwardSearchTimePeriod, TransactionSearchTimePeriod

logger = logging.getLogger(__name__)

API_VERSION = settings.API_VERSION
GROUPING_LOOKUP = {
    "calendar_year": "calendar_year",
    "cy": "calendar_year",
    "quarter": "quarter",
    "q": "quarter",
    "fiscal_year": "fiscal_year",
    "fy": "fiscal_year",
    "month": "month",
    "m": "month",
}


@api_transformations(api_version=API_VERSION, function_list=API_TRANSFORM_FUNCTIONS)
class SpendingOverTimeVisualizationViewSet(APIView):
    """
    This route takes award filters, and returns spending by time. The amount of time is denoted by the "group" value.
    """

    endpoint_doc = "usaspending_api/api_contracts/contracts/v2/search/spending_over_time.md"

    @staticmethod
    def validate_request_data(json_data: dict) -> Tuple[dict, dict]:
        program_activities_rule = [
            {
                "name": "program_activities",
                "type": "array",
                "key": "filters|program_activities",
                "object_keys_min": 1,
                "array_type": "object",
                "object_keys": {
                    "name": {"type": "text", "text_type": "search"},
                    "code": {
                        "type": "integer",
                    },
                },
            }
        ]
        models = [
            {"name": "subawards", "key": "subawards", "type": "boolean", "default": False},
            {
                "name": "group",
                "key": "group",
                "type": "enum",
                "enum_values": list(GROUPING_LOOKUP.keys()),
                "default": "fy",
                "optional": False,  # allow to be optional in the future
            },
            {
                "name": "spending_level",
                "key": "spending_level",
                "type": "enum",
                "enum_values": ["awards", "transactions", "subawards"],
                "optional": True,
                "default": "transactions",
            },
        ]
        models.extend(copy.deepcopy(AWARD_FILTER_W_FILTERS))
        models.extend(copy.deepcopy(program_activities_rule))
        models.extend(copy.deepcopy(PAGINATION))
        tiny_shield = TinyShield(models)
        validated_data = tiny_shield.block(json_data)
        if "filters" in validated_data and "program_activities" in validated_data["filters"]:
            tiny_shield.enforce_object_keys_min(validated_data, program_activities_rule[0])

        if validated_data.get("filters", None) is None:
            raise InvalidParameterException("Missing request parameters: filters")

        return validated_data, models

    def database_data_layer_for_subawards(self) -> tuple:
        queryset = subaward_filter(self.filters)
        obligation_column = "subaward_amount"

        # Note: SubawardSearch already has an "fy" field that corresponds to the prime award's fiscal year.
        #       This, however, needs "fy" to be the fiscal year of the sub_action_date (i.e. "sub_fiscal_year").
        #       And so, Django gets confused simply aliasing it to "fy" as "fy" is already a field in the model.
        #       To get around that, we're doing this little dance of annotate() and values().
        queryset = queryset.annotate(prime_fy=F("fy"))

        month_quarter_cols = []
        if self.group == "month":
            queryset = queryset.annotate(month=FiscalMonth("sub_action_date"))
            month_quarter_cols.append("month")
        elif self.group == "quarter":
            queryset = queryset.annotate(quarter=FiscalQuarter("sub_action_date"))
            month_quarter_cols.append("quarter")

        first_values = ["sub_fiscal_year"] + month_quarter_cols
        second_values = ["obligation_amount", "total_outlays", "subaward_type"] + month_quarter_cols
        second_values_dict = {"fy": F("sub_fiscal_year")}
        order_by_cols = ["fy"] + month_quarter_cols
        queryset = (
            queryset.values(*first_values)
            .annotate(obligation_amount=Sum(obligation_column))
            .annotate(total_outlays=Value(None, output_field=IntegerField()))
            .values(*second_values, **second_values_dict)
            .order_by(*order_by_cols)
        )

        return queryset, order_by_cols

    def apply_elasticsearch_aggregations(self, search: Search) -> None:
        """
        Takes in an instance of the elasticsearch-dsl.Search object and applies the necessary
        aggregations in a specific order to get expected results.

        Args:
            search: An instance of the Elasticsearch Search object.
        """

        if isinstance(search, AwardSearch):
            category_field = "category.keyword"

            if self.group == "fiscal_year":
                group_by_time_period_agg = A("terms", field="fiscal_year")
            else:
                group_by_time_period_agg = A(
                    "date_histogram",
                    field="action_date",
                    interval="year" if (self.group == "calendar_year") else self.group,
                    format="yyyy-MM-dd",
                )
        elif isinstance(search, TransactionSearch):
            category_field = "award_category"
            group_by_time_period_agg = A(
                "date_histogram",
                field="action_date" if self.group == "calendar_year" else "fiscal_action_date",
                interval="year" if (self.group == "fiscal_year" or self.group == "calendar_year") else self.group,
                format="yyyy-MM-dd",
            )

        """
        The individual aggregations that are needed; with two different sum aggregations to handle issues with
        summing together floats.
        """
        sum_as_cents_agg_outlay = A("sum", field="total_outlays", script={"source": "_value * 100"})
        sum_as_dollars_agg_outlay = A(
            "bucket_script",
            buckets_path={"sum_as_cents_outlay": "sum_as_cents_outlay"},
            script="params.sum_as_cents_outlay / 100",
        )

        sum_as_cents_agg_obligation = A(
            "sum", field="generated_pragmatic_obligation", script={"source": "_value * 100"}
        )
        sum_as_dollars_agg_obligation = A(
            "bucket_script",
            buckets_path={"sum_as_cents_obligation": "sum_as_cents_obligation"},
            script="params.sum_as_cents_obligation / 100",
        )

        """
        Putting the aggregations together; in order for the aggregations to the correct structure they
        unfortunately need to be one after the other. This allows for nested aggregations as opposed to sibling.
        We also add another aggregation breakdown where inside the "group_by_time_period" bucket, we further group
        by Category, which returns us buckets talling up the award data for a given award type
        """
        search.aggs.bucket("group_by_time_period", group_by_time_period_agg)
        search.aggs["group_by_time_period"].bucket("group_by_category", "terms", field=category_field).metric(
            "sum_as_cents_outlay", sum_as_cents_agg_outlay
        ).pipeline("sum_as_dollars_outlay", sum_as_dollars_agg_outlay).metric(
            "sum_as_cents_obligation", sum_as_cents_agg_obligation
        ).pipeline(
            "sum_as_dollars_obligation", sum_as_dollars_agg_obligation
        )

    def parse_elasticsearch_bucket(self, bucket: dict) -> dict:
        """
        Takes a dictionary representing one of the Elasticsearch buckets returned from the aggregation
        and returns a dictionary representation used in the API response.

        It should be noted that `key_as_string` is the name given by `date_histogram` to represent the key
        for each bucket which is a date as a string.

        Default time_period is set to "fiscal_year", however "quarter" and "month" also includes
        "fiscal_year" in the response object. When "calendar_year" is passed in as a group filter
        for the API, do not have to worry about any other time period.
        """
        time_period = {}

        if self.group == "fiscal_year" and self.spending_level == "awards":
            key_as_date = datetime.strptime(str(bucket["key"]), "%Y")
        else:
            key_as_date = datetime.strptime(bucket["key_as_string"], "%Y-%m-%d")

        time_period["calendar_year" if self.group == "calendar_year" else "fiscal_year"] = str(key_as_date.year)
        if self.group == "quarter":
            quarter = (key_as_date.month - 1) // 3 + 1
            time_period["quarter"] = str(quarter)
        elif self.group == "month":
            time_period["month"] = str(key_as_date.month)

        # The given time_period bucket contains buckets for the differnt categories, so extracting those.
        categories_breakdown = bucket["group_by_category"]["buckets"]

        # Initialize a dictionary to hold the query results for each obligation type.
        obligation_dictionary = {
            "Contract_Obligations": 0,
            "Direct_Obligations": 0,
            "Grant_Obligations": 0,
            "Idv_Obligations": 0,
            "Loan_Obligations": 0,
            "Other_Obligations": 0,
        }

        # Mapping of category keys to their corresponding obligation types.
        obligation_map = {
            "contract": "Contract_Obligations",
            "direct payment": "Direct_Obligations",
            "grant": "Grant_Obligations",
            "idv": "Idv_Obligations",
            "loans": "Loan_Obligations",
            "other": "Other_Obligations",
            "insurance": "Other_Obligations",
        }

        outlay_map = {
            "contract": "Contract_Outlays",
            "direct payment": "Direct_Outlays",
            "grant": "Grant_Outlays",
            "idv": "Idv_Outlays",
            "loans": "Loan_Outlays",
            "other": "Other_Outlays",
            "insurance": "Other_Outlays",
        }

        # Outlays are only supported on Awards
        outlay_dictionary = {v: 0 if self.spending_level == "awards" else None for v in outlay_map.values()}

        # Populate the category dictionary based on the award breakdown for a given bucket.
        for category in categories_breakdown:
            key = category["key"]
            if key in obligation_map:
                obligation_dictionary[obligation_map[key]] += category.get("sum_as_dollars_obligation", {"value": 0})[
                    "value"
                ]

            if self.spending_level == "awards" and key in outlay_map:
                outlay_dictionary[outlay_map[key]] += category.get("sum_as_dollars_outlay", {"value": 0})["value"]

        response_object = {
            "aggregated_amount": sum(obligation_dictionary.values()),
            "time_period": time_period,
            **obligation_dictionary,
            "total_outlays": sum(outlay_dictionary.values()) if self.spending_level == "awards" else None,
            **outlay_dictionary,
        }

        return response_object

    def build_elasticsearch_result_transactions(self, agg_response: AggResponse, time_periods: list) -> list:
        """
        In this function we are just taking the elasticsearch aggregate response and looping through the
        buckets to create a results object for each time interval.

        Using a min_date, max_date, and a frequency indicator generates either a list of dictionaries
        containing fiscal year information (fiscal year, fiscal quarter, and fiscal month) or a list
        of dictionaries containing calendar year information (calendar year). The following are the format
        of date_range based on the frequency:
            * "calendar_year" returns a list of dictionaries containing {calendar year}
            * "fiscal_year" returns list of dictionaries containing {fiscal year}
            * "quarter" returns a list of dictionaries containing {fiscal year and quarter}
            * "month" returns a list of dictionaries containg {fiscal year and month}
        NOTE the generate_date_range() can also generate non fiscal date range (calendar ranges) as well.
        """

        results = []
        min_date, max_date = min_and_max_from_date_ranges(time_periods)

        date_range = generate_date_range(min_date, max_date, self.group)
        date_buckets = agg_response.group_by_time_period.buckets
        parsed_bucket = None

        for fiscal_date in date_range:
            if date_buckets and parsed_bucket is None:
                parsed_bucket = self.parse_elasticsearch_bucket(date_buckets.pop(0))

            if self.group == "calendar_year":
                time_period = {"calendar_year": str(fiscal_date["calendar_year"])}
            else:
                time_period = {"fiscal_year": str(fiscal_date["fiscal_year"])}

            if self.group == "quarter":
                time_period["quarter"] = str(fiscal_date["fiscal_quarter"])
            elif self.group == "month":
                time_period["month"] = str(fiscal_date["fiscal_month"])

            if parsed_bucket is not None and time_period == parsed_bucket["time_period"]:
                results.append(parsed_bucket)
                parsed_bucket = None
            else:
                results.append(
                    {
                        "aggregated_amount": 0,
                        "time_period": time_period,
                        "Contract_Obligations": 0,
                        "Direct_Obligations": 0,
                        "Grant_Obligations": 0,
                        "Idv_Obligations": 0,
                        "Loan_Obligations": 0,
                        "Other_Obligations": 0,
                        "total_outlays": None,
                        "Contract_Outlays": None,
                        "Direct_Outlays": None,
                        "Grant_Outlays": None,
                        "Idv_Outlays": None,
                        "Loan_Outlays": None,
                        "Other_Outlays": None,
                    }
                )

        return results

    def build_elasticsearch_result_awards(self, agg_response: AggResponse, time_periods: list) -> list:
        """
        In this function we are just taking the elasticsearch aggregate response and looping through the
        buckets to create a results object for each time interval.

        Using a min_date, max_date, and a frequency indicator generates either a list of dictionaries
        containing fiscal year information (fiscal year, fiscal quarter, and fiscal month) or a list
        of dictionaries containing calendar year information (calendar year). The following are the format
        of date_range based on the frequency:
            * "calendar_year" returns a list of dictionaries containing {calendar year}
            * "fiscal_year" returns list of dictionaries containing {fiscal year}
            * "quarter" returns a list of dictionaries containing {fiscal year and quarter}
            * "month" returns a list of dictionaries containg {fiscal year and month}
        NOTE the generate_date_range() can also generate non fiscal date range (calendar ranges) as well.
        """

        results = []
        date_buckets = agg_response.group_by_time_period.buckets
        parsed_bucket = None

        if date_buckets is not None:
            for bucket in date_buckets:
                parsed_bucket = self.parse_elasticsearch_bucket(bucket.to_dict())
                results.append(parsed_bucket)

        return results

    def query_elasticsearch_for_transactions(self, time_periods: list) -> list:
        """Get spending over time amounts based on Transactions"""

        filter_options = {}
        time_period_obj = TransactionSearchTimePeriod(
            default_end_date=settings.API_MAX_DATE, default_start_date=settings.API_SEARCH_MIN_DATE
        )
        new_awards_only_decorator = NewAwardsOnlyTimePeriod(
            time_period_obj=time_period_obj, query_type=_QueryType.TRANSACTIONS
        )
        filter_options["time_period_obj"] = new_awards_only_decorator

        filter_query = QueryWithFilters.generate_transactions_elasticsearch_query(self.filters, **filter_options)
        search = TransactionSearch().filter(filter_query)
        self.apply_elasticsearch_aggregations(search)
        response = search.handle_execute()
        overall_results = self.build_elasticsearch_result_transactions(response.aggs, time_periods)

        return overall_results

    def query_elasticsearch_for_awards(self, time_periods: list) -> list:
        """Get spending over time amounts based on Awards"""

        filter_options = {}
        time_period_obj = AwardSearchTimePeriod(
            default_end_date=settings.API_MAX_DATE, default_start_date=settings.API_SEARCH_MIN_DATE
        )
        new_awards_only_decorator = NewAwardsOnlyTimePeriod(
            time_period_obj=time_period_obj, query_type=_QueryType.AWARDS
        )
        filter_options["time_period_obj"] = new_awards_only_decorator

        filter_query = QueryWithFilters.generate_awards_elasticsearch_query(self.filters, **filter_options)
        search = AwardSearch().filter(filter_query)
        self.apply_elasticsearch_aggregations(search)
        response = search.handle_execute()
        overall_results = self.build_elasticsearch_result_awards(response.aggs, time_periods)

        return overall_results

    @cache_response()
    def post(self, request: Request) -> Response:
        self.original_filters = request.data.get("filters")
        json_request, models = self.validate_request_data(request.data)
        self.group = GROUPING_LOOKUP[json_request["group"]]
        self.subawards = json_request["subawards"] or json_request.get("spending_level") == "subawards"
        self.spending_level = json_request.get("spending_level").lower()
        self.filters = json_request["filters"]

        # time_period is optional so we're setting a default window from API_SEARCH_MIN_DATE to end of the current FY.
        # Otherwise, users will see blank results for years
        current_fy = generate_fiscal_year(datetime.now(timezone.utc))
        if self.group == "fiscal_year":
            end_date = "{}-09-30".format(current_fy)
        elif self.group == "calendar_year":
            date = datetime.now(timezone.utc)  # current date
            end_date = f"{date.year}-12-31"  # last day of current year
        else:
            current_fiscal_month = generate_fiscal_month(datetime.now(timezone.utc))
            days_in_month = monthrange(current_fy, current_fiscal_month)[1]
            end_date = f"{current_fy}-{current_fiscal_month}-{days_in_month}"

        default_time_period = {"start_date": settings.API_SEARCH_MIN_DATE, "end_date": end_date}

        # if time periods have been passed in use those, otherwise use the one calculated above
        time_periods = self.filters.get("time_period", [default_time_period])

        if self.subawards:
            db_results, order_by_cols = self.database_data_layer_for_subawards()
            results = bolster_missing_time_periods(
                filter_time_periods=time_periods,
                queryset=db_results,
                date_range_type=order_by_cols[-1],
                columns={
                    "obligation_amount": "obligation_amount",
                    "total_outlays": "total_outlays",
                    "subaward_type": "subaward_type",
                },
            )
            results = clean_subaward_spending_over_time_results(results, order_by_cols[-1])

        elif self.spending_level == "transactions":
            results = self.query_elasticsearch_for_transactions(time_periods)
        elif self.spending_level == "awards":
            results = self.query_elasticsearch_for_awards(time_periods)

        raw_response = OrderedDict(
            [
                ("group", self.group),
                ("results", results),
                ("spending_level", self.spending_level),
                (
                    "messages",
                    [
                        *get_generic_filters_message(self.original_filters.keys(), [elem["name"] for elem in models]),
                        (
                            "The 'subawards' field will be deprecated in the future. "
                            "Set 'spending_level' to 'subawards' instead. See documentation for more information."
                        ),
                        (
                            "You may see additional month, quarter and year results when searching for "
                            "Awards or Subawards. This is due to Awards or Subawards overlapping with the "
                            "time period specified but having an 'action date' outside of that time period."
                        ),
                    ],
                ),
            ]
        )

        return Response(raw_response)