fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/search/v2/views/search_elasticsearch.py

Summary

Maintainability
A
3 hrs
Test Coverage
A
93%
import copy
import logging

from django.conf import settings
from rest_framework.response import Response
from rest_framework.views import APIView

from usaspending_api.common.api_versioning import api_transformations, API_TRANSFORM_FUNCTIONS
from usaspending_api.common.cache_decorator import cache_response
from usaspending_api.common.elasticsearch.search_wrappers import TransactionSearch
from usaspending_api.common.exceptions import (
    InvalidParameterException,
    UnprocessableEntityException,
)
from usaspending_api.common.helpers.generic_helper import get_simple_pagination_metadata, get_generic_filters_message
from usaspending_api.common.query_with_filters import QueryWithFilters
from usaspending_api.common.validator.award_filter import (
    AWARD_FILTER,
    AWARD_FILTER_NO_RECIPIENT_ID,
    AWARD_FILTER_W_FILTERS,
)
from usaspending_api.common.validator.pagination import PAGINATION
from usaspending_api.common.validator.tinyshield import TinyShield
from usaspending_api.search.v2.elasticsearch_helper import (
    get_number_of_unique_terms_for_transactions,
    spending_by_transaction_count,
)
from usaspending_api.search.v2.es_sanitization import es_minimal_sanitize
from usaspending_api.search.v2.elasticsearch_helper import spending_by_transaction_sum_and_count
from usaspending_api.awards.v2.lookups.elasticsearch_lookups import TRANSACTIONS_SOURCE_LOOKUP, TRANSACTIONS_LOOKUP
from elasticsearch_dsl import A

logger = logging.getLogger(__name__)

API_VERSION = settings.API_VERSION


@api_transformations(api_version=API_VERSION, function_list=API_TRANSFORM_FUNCTIONS)
class SpendingByTransactionVisualizationViewSet(APIView):
    """
    This route takes keyword search fields, and returns the fields of the searched term.
    """

    endpoint_doc = "usaspending_api/api_contracts/contracts/v2/search/spending_by_transaction.md"

    @cache_response()
    def post(self, request):
        program_activities_rule = [
            {
                "name": "program_activities",
                "type": "array",
                "key": "filters|program_activities",
                "array_type": "object",
                "object_keys_min": 1,
                "object_keys": {
                    "name": {"type": "text", "text_type": "search"},
                    "code": {
                        "type": "integer",
                    },
                },
            }
        ]
        models = [
            {
                "name": "fields",
                "key": "fields",
                "type": "array",
                "array_type": "text",
                "text_type": "search",
                "optional": False,
            }
        ]
        models.extend(copy.deepcopy(AWARD_FILTER_W_FILTERS))
        models.extend(copy.deepcopy(PAGINATION))
        models.extend(copy.deepcopy(program_activities_rule))
        self.models = models
        for m in models:
            if m["name"] in ("award_type_codes", "sort"):
                m["optional"] = False
        tiny_shield = TinyShield(models)
        validated_payload = tiny_shield.block(request.data)
        if "filters" in validated_payload and "program_activities" in validated_payload["filters"]:
            tiny_shield.enforce_object_keys_min(validated_payload, program_activities_rule[0])

        record_num = (validated_payload["page"] - 1) * validated_payload["limit"]
        if record_num >= settings.ES_TRANSACTIONS_MAX_RESULT_WINDOW:
            raise UnprocessableEntityException(
                "Page #{page} of size {limit} is over the maximum result limit ({es_limit}). Consider using custom data downloads to obtain large data sets.".format(
                    page=validated_payload["page"],
                    limit=validated_payload["limit"],
                    es_limit=settings.ES_TRANSACTIONS_MAX_RESULT_WINDOW,
                )
            )

        payload_sort_key = validated_payload["sort"]
        if payload_sort_key not in validated_payload["fields"]:
            raise InvalidParameterException("Sort value not found in fields: {}".format(payload_sort_key))

        permitted_sort_values = TRANSACTIONS_LOOKUP
        if payload_sort_key not in TRANSACTIONS_LOOKUP:
            raise InvalidParameterException(
                f"Sort value is not currently supported: {payload_sort_key}. Allowed values are: [{', '.join(permitted_sort_values.keys())}]"
            )

        if "filters" in validated_payload and "no intersection" in validated_payload["filters"]["award_type_codes"]:
            # "Special case": there will never be results when the website provides this value
            return Response(
                {
                    "limit": validated_payload["limit"],
                    "results": [],
                    "page_metadata": {
                        "page": validated_payload["page"],
                        "next": None,
                        "previous": None,
                        "hasNext": False,
                        "hasPrevious": False,
                    },
                }
            )
        sorts = {TRANSACTIONS_LOOKUP[payload_sort_key]: validated_payload["order"]}
        lower_limit = (validated_payload["page"] - 1) * validated_payload["limit"]
        upper_limit = (validated_payload["page"]) * validated_payload["limit"] + 1
        if "keywords" in validated_payload["filters"]:
            validated_payload["filters"]["keyword_search"] = [
                es_minimal_sanitize(x) for x in validated_payload["filters"]["keywords"]
            ]
            validated_payload["filters"].pop("keywords")
        filter_query = QueryWithFilters.generate_transactions_elasticsearch_query(validated_payload["filters"])
        search = TransactionSearch().filter(filter_query).sort(sorts)[lower_limit:upper_limit]
        response = search.handle_execute()
        return Response(self.build_elasticsearch_result(validated_payload, response))

    def build_elasticsearch_result(self, request, response) -> dict:
        results = []
        for res in response:
            hit = res.to_dict()
            # Parsing API response values from ES query result JSON
            # We parse the `hit` (result from elasticsearch) to get the award type, use the type to determine
            # which lookup dict to use, and then use that lookup to retrieve the correct value requested from `fields`
            row = {}
            for field in request["fields"]:
                row[field] = hit.get(TRANSACTIONS_SOURCE_LOOKUP[field])
            row["generated_internal_id"] = hit["generated_unique_award_id"]
            row["internal_id"] = hit["award_id"]

            results.append(row)

        metadata = get_simple_pagination_metadata(len(response), request["limit"], request["page"])

        return {
            "limit": request["limit"],
            "results": results[: request["limit"]],
            "page_metadata": metadata,
            "messages": get_generic_filters_message(request["filters"].keys(), [elem["name"] for elem in self.models]),
        }


@api_transformations(api_version=API_VERSION, function_list=API_TRANSFORM_FUNCTIONS)
class SpendingByTransactionGroupedVisualizationViewSet(APIView):
    """
    This route provides transactions grouped by their prime awards. Additionally, allows
    the transactions to be filtered.
    """

    endpoint_doc = "usaspending_api/api_contracts/contracts/v2/search/spending_by_transaction_grouped.md"

    @cache_response()
    def post(self, request):
        models = [
            {
                "name": "fields",
                "key": "fields",
                "type": "array",
                "array_type": "text",
                "text_type": "search",
                "optional": False,
            }
        ]
        models.extend(copy.deepcopy(AWARD_FILTER))
        models.extend(copy.deepcopy(PAGINATION))
        for m in models:
            if m["name"] in ("keywords", "award_type_codes", "sort"):
                m["optional"] = False
        validated_payload = TinyShield(models).block(request.data)

        record_num = (validated_payload["page"] - 1) * validated_payload["limit"]
        if record_num >= settings.ES_TRANSACTIONS_MAX_RESULT_WINDOW:
            raise UnprocessableEntityException(
                "Page #{page} of size {limit} is over the maximum result limit ({es_limit}). Consider using custom data downloads to obtain large data sets.".format(
                    page=validated_payload["page"],
                    limit=validated_payload["limit"],
                    es_limit=settings.ES_TRANSACTIONS_MAX_RESULT_WINDOW,
                )
            )

        valid_sort_keys = {
            "Matching Transaction Obligation": "Matching Transaction Obligation",
            "Prime Award ID": "display_award_id",
        }
        payload_sort_key = validated_payload["sort"]
        if payload_sort_key not in valid_sort_keys.keys():
            raise InvalidParameterException(
                f"Sort value is not currently supported: {payload_sort_key}. Allowed values are: [{', '.join(valid_sort_keys.keys())}]"
            )

        if "filters" in validated_payload and "no intersection" in validated_payload["filters"]["award_type_codes"]:
            # "Special case": there will never be results when the website provides this value
            return Response(
                {
                    "limit": validated_payload["limit"],
                    "results": [],
                    "page_metadata": {
                        "page": validated_payload["page"],
                        "next": None,
                        "previous": None,
                        "hasNext": False,
                        "hasPrevious": False,
                    },
                }
            )

        lower_limit = (validated_payload["page"] - 1) * validated_payload["limit"]
        upper_limit = (validated_payload["page"]) * validated_payload["limit"] + 1
        validated_payload["filters"]["keyword_search"] = [
            es_minimal_sanitize(x) for x in validated_payload["filters"]["keywords"]
        ]
        validated_payload["filters"].pop("keywords")
        filter_query = QueryWithFilters.generate_transactions_elasticsearch_query(validated_payload["filters"])
        search = TransactionSearch().filter(filter_query)

        bucket_count = get_number_of_unique_terms_for_transactions(filter_query, "display_award_id")
        group_by_prime_award = A("terms", field="display_award_id")
        search.aggs.bucket("group_by_prime_award", group_by_prime_award).metric(
            "Matching Transaction Obligation", A("sum", field="federal_action_obligation")
        )

        agg_response = search.handle_execute()
        agg_buckets = agg_response.aggregations.group_by_prime_award.buckets
        agg_buckets = sorted(
            agg_buckets,
            key=lambda prime_award: prime_award["Matching Transaction Obligation"]["value"],
            reverse=True if validated_payload["order"] == "desc" else False,
        )[lower_limit:upper_limit]
        results = []
        for prime_award in agg_buckets:
            prime_award_result = {"children": []}
            display_award_id = prime_award["key"]
            hit_validated_payload = {}
            hit_validated_payload["filters"] = validated_payload["filters"]
            hit_validated_payload["filters"]["award_ids"] = [f"{display_award_id}"]

            hit_filter_query = QueryWithFilters.generate_transactions_elasticsearch_query(
                hit_validated_payload["filters"]
            )
            hit_search = TransactionSearch().filter(hit_filter_query).sort({"federal_action_obligation": "asc"})[0:10]
            hit_response = hit_search.handle_execute()
            prime_award_result["Prime Award ID"] = display_award_id
            prime_award_result["Matching Transaction Count"] = prime_award["doc_count"]
            prime_award_result["Matching Transaction Obligation"] = float(
                prime_award["Matching Transaction Obligation"]["value"]
            )
            for i in range(0, len(hit_response)):
                hit = hit_response[i].to_dict()
                row = {}
                for field in validated_payload["fields"]:
                    row[field] = hit.get(TRANSACTIONS_SOURCE_LOOKUP[field])
                row["generated_internal_id"] = hit["generated_unique_award_id"]
                row["internal_id"] = hit["award_id"]
                prime_award_result["children"].append(row)

            results.append(prime_award_result)

        has_next = bucket_count > validated_payload["limit"]
        has_previous = validated_payload["page"] > 1

        metadata = {
            "page": validated_payload["page"],
            "next": validated_payload["page"] + 1 if has_next else None,
            "previous": validated_payload["page"] - 1 if has_previous else None,
            "hasNext": has_next,
            "hasPrevious": has_previous,
        }

        return Response(
            {
                "limit": validated_payload["limit"],
                "results": results[: validated_payload["limit"]],
                "page_metadata": metadata,
                "messages": get_generic_filters_message(
                    validated_payload["filters"].keys(), [elem["name"] for elem in AWARD_FILTER_NO_RECIPIENT_ID]
                ),
            }
        )


@api_transformations(api_version=API_VERSION, function_list=API_TRANSFORM_FUNCTIONS)
class TransactionSummaryVisualizationViewSet(APIView):
    """
    This route takes award filters, and returns the number of transactions and summation of federal action obligations.
    """

    endpoint_doc = "usaspending_api/api_contracts/contracts/v2/search/transaction_spending_summary.md"

    @cache_response()
    def post(self, request):
        """
        Returns a summary of transactions which match the award search filter
            Desired values:
                total number of transactions `award_count`
                The federal_action_obligation sum of all those transactions `award_spending`

        *Note* Only deals with prime awards, future plans to include sub-awards.
        """

        models = [
            {
                "name": "keywords",
                "key": "filters|keywords",
                "type": "array",
                "array_type": "text",
                "text_type": "search",
                "optional": False,
                "text_min": 3,
            }
        ]
        validated_payload = TinyShield(models).block(request.data)

        results = spending_by_transaction_sum_and_count(validated_payload)
        return Response({"results": results})


@api_transformations(api_version=API_VERSION, function_list=API_TRANSFORM_FUNCTIONS)
class SpendingByTransactionCountVisualizationViewSet(APIView):
    """
    This route takes transaction search fields, and returns the transaction counts of the searched term.
    """

    endpoint_doc = "usaspending_api/api_contracts/contracts/v2/search/spending_by_transaction_count.md"

    @cache_response()
    def post(self, request):
        models = []
        models.extend(copy.deepcopy(AWARD_FILTER))
        for m in models:
            if m["name"] == "keywords":
                m["optional"] = True
            elif m["name"] == "keyword":
                m["optional"] = True
        validated_payload = TinyShield(models).block(request.data)
        if "keywords" in validated_payload["filters"]:
            validated_payload["filters"]["keyword_search"] = [
                es_minimal_sanitize(x) for x in validated_payload["filters"]["keywords"]
            ]
            validated_payload["filters"].pop("keywords")
        filter_query = QueryWithFilters.generate_transactions_elasticsearch_query(validated_payload["filters"])
        search = TransactionSearch().filter(filter_query)
        results = spending_by_transaction_count(search)
        return Response({"results": results})