usaspending_api/search/v2/views/spending_by_award.py
import copy
from ast import literal_eval
from sys import maxsize
from typing import List
from django.conf import settings
from django.db.models import F
from django.utils.text import slugify
from rest_framework.response import Response
from rest_framework.views import APIView
import logging
from usaspending_api.awards.models import Award
from usaspending_api.recipient.models import RecipientProfile
from usaspending_api.references.models import Agency, ToptierAgencyPublishedDABSView
from usaspending_api.awards.v2.filters.sub_award import subaward_filter
from usaspending_api.awards.v2.lookups.lookups import (
assistance_type_mapping,
contract_subaward_mapping,
contract_type_mapping,
grant_subaward_mapping,
idv_type_mapping,
loan_type_mapping,
non_loan_assistance_type_mapping,
procurement_type_mapping,
)
from usaspending_api.awards.v2.lookups.elasticsearch_lookups import (
contracts_mapping,
idv_mapping,
loan_mapping,
non_loan_assist_mapping,
CONTRACT_SOURCE_LOOKUP,
IDV_SOURCE_LOOKUP,
NON_LOAN_ASST_SOURCE_LOOKUP,
LOAN_SOURCE_LOOKUP,
)
from usaspending_api.common.elasticsearch.search_wrappers import AwardSearch
from usaspending_api.common.api_versioning import api_transformations, API_TRANSFORM_FUNCTIONS
from usaspending_api.common.cache_decorator import cache_response
from usaspending_api.common.helpers.api_helper import raise_if_award_types_not_valid_subset, raise_if_sort_key_not_valid
from usaspending_api.common.query_with_filters import QueryWithFilters
from usaspending_api.common.helpers.generic_helper import (
get_generic_filters_message,
)
from usaspending_api.common.validator.award_filter import AWARD_FILTER_NO_RECIPIENT_ID
from usaspending_api.common.validator.pagination import PAGINATION
from usaspending_api.common.validator.tinyshield import TinyShield
from usaspending_api.common.recipient_lookups import annotate_prime_award_recipient_id
from usaspending_api.common.exceptions import UnprocessableEntityException
from usaspending_api.search.filters.elasticsearch.filter import _QueryType
from usaspending_api.search.filters.time_period.decorators import NewAwardsOnlyTimePeriod
from usaspending_api.search.filters.time_period.query_types import AwardSearchTimePeriod
from usaspending_api.submissions.models import SubmissionAttributes
logger = logging.getLogger(__name__)
GLOBAL_MAP = {
"award": {
"award_semaphore": "type",
"internal_id_fields": {"internal_id": "award_id"},
"elasticsearch_type_code_to_field_map": {
**{award_type: CONTRACT_SOURCE_LOOKUP for award_type in contract_type_mapping},
**{award_type: IDV_SOURCE_LOOKUP for award_type in idv_type_mapping},
**{award_type: LOAN_SOURCE_LOOKUP for award_type in loan_type_mapping},
**{award_type: NON_LOAN_ASST_SOURCE_LOOKUP for award_type in non_loan_assistance_type_mapping},
},
},
"subaward": {
"minimum_db_fields": {"subaward_number", "piid", "fain", "prime_award_group", "award_id"},
"api_to_db_mapping_list": [contract_subaward_mapping, grant_subaward_mapping],
"award_semaphore": "prime_award_group",
"award_id_fields": ["award__piid", "award__fain"],
"internal_id_fields": {"internal_id": "subaward_number", "prime_award_internal_id": "award_id"},
"generated_award_field": ("prime_award_generated_internal_id", "prime_award_internal_id"),
"type_code_to_field_map": {"procurement": contract_subaward_mapping, "grant": grant_subaward_mapping},
"annotations": {"_prime_award_recipient_id": annotate_prime_award_recipient_id},
"filter_queryset_func": subaward_filter,
},
}
@api_transformations(api_version=settings.API_VERSION, function_list=API_TRANSFORM_FUNCTIONS)
class SpendingByAwardVisualizationViewSet(APIView):
"""
This route takes award filters and fields, and returns the fields of the filtered awards.
"""
endpoint_doc = "usaspending_api/api_contracts/contracts/v2/search/spending_by_award.md"
@cache_response()
def post(self, request):
"""Return all awards matching the provided filters and limits"""
self.original_filters = request.data.get("filters")
json_request, models = self.validate_request_data(request.data)
self.is_subaward = json_request["subawards"]
self.constants = GLOBAL_MAP["subaward"] if self.is_subaward else GLOBAL_MAP["award"]
filters = json_request.get("filters", {})
self.filters = filters
self.fields = json_request["fields"]
self.pagination = {
"limit": json_request["limit"],
"lower_bound": (json_request["page"] - 1) * json_request["limit"],
"page": json_request["page"],
"sort_key": json_request.get("sort") or self.fields[0],
"sort_order": json_request["order"],
"upper_bound": json_request["page"] * json_request["limit"] + 1,
}
if self.if_no_intersection(): # Like an exception, but API response is a HTTP 200 with a JSON payload
return Response(self.populate_response(results=[], has_next=False, models=models))
raise_if_award_types_not_valid_subset(self.filters["award_type_codes"], self.is_subaward)
raise_if_sort_key_not_valid(
self.pagination["sort_key"], self.fields, self.filters["award_type_codes"], self.is_subaward
)
if self.is_subaward:
raw_response = self.create_response_for_subawards(self.construct_queryset(), models)
else:
self.last_record_unique_id = json_request.get("last_record_unique_id")
self.last_record_sort_value = json_request.get("last_record_sort_value")
raw_response = self.construct_es_response_for_prime_awards(self.query_elasticsearch())
return Response(raw_response)
@staticmethod
def validate_request_data(request_data):
program_activities_rule = {
"name": "program_activities",
"type": "array",
"key": "filters|program_activities",
"array_type": "object",
"object_keys_min": 1,
"object_keys": {
"name": {"type": "text", "text_type": "search"},
"code": {
"type": "integer",
},
},
}
models = [
{"name": "fields", "key": "fields", "type": "array", "array_type": "text", "text_type": "search", "min": 1},
{"name": "subawards", "key": "subawards", "type": "boolean", "default": False},
{
"name": "object_class",
"key": "filter|object_class",
"type": "array",
"array_type": "text",
"text_type": "search",
},
{
"name": "program_activity",
"key": "filter|program_activity",
"type": "array",
"array_type": "integer",
"array_max": maxsize,
},
{
"name": "last_record_unique_id",
"key": "last_record_unique_id",
"type": "integer",
"required": False,
"allow_nulls": True,
},
{
"name": "last_record_sort_value",
"key": "last_record_sort_value",
"type": "text",
"text_type": "search",
"required": False,
"allow_nulls": True,
},
program_activities_rule,
]
models.extend(copy.deepcopy(AWARD_FILTER_NO_RECIPIENT_ID))
models.extend(copy.deepcopy(PAGINATION))
for m in models:
if m["name"] in ("award_type_codes", "fields"):
m["optional"] = False
tiny_shield = TinyShield(models)
if "filters" in request_data and "program_activities" in request_data["filters"]:
tiny_shield.enforce_object_keys_min(request_data, program_activities_rule)
return tiny_shield.block(request_data), models
def if_no_intersection(self):
# "Special case" behavior: there will never be results when the website provides this value
return "no intersection" in self.filters["award_type_codes"]
def construct_queryset(self):
sort_by_fields = self.get_sort_by_fields()
database_fields = self.get_database_fields()
base_queryset = self.constants["filter_queryset_func"](self.filters)
queryset = self.annotate_queryset(base_queryset)
queryset = self.custom_queryset_order_by(queryset, sort_by_fields, self.pagination["sort_order"])
return queryset.values(*list(database_fields))[self.pagination["lower_bound"] : self.pagination["upper_bound"]]
def create_response_for_subawards(self, queryset, models):
results = []
rows = list(queryset)
for record in rows[: self.pagination["limit"]]:
row = {k: record[v] for k, v in self.constants["internal_id_fields"].items()}
for field in self.fields:
row[field] = record.get(
self.constants["type_code_to_field_map"][record[self.constants["award_semaphore"]]].get(field)
)
if "Award ID" in self.fields:
for id_type in self.constants["award_id_fields"]:
if record[id_type]:
row["Award ID"] = record[id_type]
break
results.append(row)
results = self.add_award_generated_id_field(results)
return self.populate_response(results=results, has_next=len(rows) > self.pagination["limit"], models=models)
def add_award_generated_id_field(self, records):
"""Obtain the generated_unique_award_id and add to response"""
dest, source = self.constants["generated_award_field"]
internal_ids = [record[source] for record in records]
award_ids = Award.objects.filter(id__in=internal_ids).values_list("id", "generated_unique_award_id")
award_ids = {internal_id: guai for internal_id, guai in award_ids}
for record in records:
record[dest] = award_ids.get(record[source]) # defensive, in case there is a discrepancy
return records
def get_sort_by_fields(self):
if self.pagination["sort_key"] == "Award ID":
sort_by_fields = self.constants["award_id_fields"]
else:
if set(self.filters["award_type_codes"]) <= set(procurement_type_mapping):
sort_by_fields = [contract_subaward_mapping[self.pagination["sort_key"]]]
elif set(self.filters["award_type_codes"]) <= set(assistance_type_mapping):
sort_by_fields = [grant_subaward_mapping[self.pagination["sort_key"]]]
return sort_by_fields
def get_elastic_sort_by_fields(self):
if self.pagination["sort_key"] == "Award ID":
sort_by_fields = ["display_award_id"]
else:
if set(self.filters["award_type_codes"]) <= set(contract_type_mapping):
sort_by_fields = [contracts_mapping[self.pagination["sort_key"]]]
elif set(self.filters["award_type_codes"]) <= set(loan_type_mapping):
sort_by_fields = [loan_mapping[self.pagination["sort_key"]]]
elif set(self.filters["award_type_codes"]) <= set(idv_type_mapping):
sort_by_fields = [idv_mapping[self.pagination["sort_key"]]]
elif set(self.filters["award_type_codes"]) <= set(non_loan_assistance_type_mapping):
sort_by_fields = [non_loan_assist_mapping[self.pagination["sort_key"]]]
sort_by_fields.append("award_id")
return sort_by_fields
def get_database_fields(self):
values = copy.copy(self.constants["minimum_db_fields"])
for field in self.fields:
for mapping in self.constants["api_to_db_mapping_list"]:
if mapping.get(field):
values.add(mapping.get(field))
return values
def annotate_queryset(self, queryset):
for field, function in self.constants["annotations"].items():
queryset = function(field, queryset)
return queryset
def custom_queryset_order_by(self, queryset, sort_field_names, order):
"""Explicitly set NULLS LAST in the ordering to encourage the usage of the indexes."""
if order == "desc":
order_by_list = [F(field).desc(nulls_last=True) for field in sort_field_names]
else:
order_by_list = [F(field).asc(nulls_last=True) for field in sort_field_names]
return queryset.order_by(*order_by_list)
def populate_response(self, results: list, has_next: bool, models: List[dict]) -> dict:
return {
"limit": self.pagination["limit"],
"results": results,
"page_metadata": {"page": self.pagination["page"], "hasNext": has_next},
"messages": get_generic_filters_message(self.original_filters.keys(), [elem["name"] for elem in models]),
}
def query_elasticsearch(self) -> list:
filter_options = {}
time_period_obj = AwardSearchTimePeriod(
default_end_date=settings.API_MAX_DATE, default_start_date=settings.API_SEARCH_MIN_DATE
)
new_awards_only_decorator = NewAwardsOnlyTimePeriod(
time_period_obj=time_period_obj, query_type=_QueryType.AWARDS
)
filter_options["time_period_obj"] = new_awards_only_decorator
filter_query = QueryWithFilters.generate_awards_elasticsearch_query(self.filters, **filter_options)
sort_field = self.get_elastic_sort_by_fields()
covid_sort_fields = {"COVID-19 Obligations": "obligation", "COVID-19 Outlays": "outlay"}
iija_sort_fields = {"Infrastructure Obligations": "obligation", "Infrastructure Outlays": "outlay"}
if "iija_spending_by_defc" in sort_field:
sort_field.remove("iija_spending_by_defc")
sorts = [
{
f"iija_spending_by_defc.{iija_sort_fields[self.pagination['sort_key']]}": {
"mode": "sum",
"order": self.pagination["sort_order"],
"nested": {
"path": "iija_spending_by_defc",
},
}
}
]
if self.filters.get("def_codes") is not None:
sorts[0][f"iija_spending_by_defc.{iija_sort_fields[self.pagination['sort_key']]}"]["nested"].update(
{"filter": {"terms": {"iija_spending_by_defc.defc": self.filters.get("def_codes", [])}}}
)
sorts.extend([{field: self.pagination["sort_order"]} for field in sort_field])
elif "covid_spending_by_defc" in sort_field:
sort_field.remove("covid_spending_by_defc")
sorts = [
{
f"covid_spending_by_defc.{covid_sort_fields[self.pagination['sort_key']]}": {
"mode": "sum",
"order": self.pagination["sort_order"],
"nested": {
"path": "covid_spending_by_defc",
},
}
}
]
if self.filters.get("def_codes") is not None:
sorts[0][f"covid_spending_by_defc.{covid_sort_fields[self.pagination['sort_key']]}"]["nested"].update(
{"filter": {"terms": {"covid_spending_by_defc.defc": self.filters.get("def_codes", [])}}}
)
sorts.extend([{field: self.pagination["sort_order"]} for field in sort_field])
else:
sorts = [{field: self.pagination["sort_order"]} for field in sort_field]
record_num = (self.pagination["page"] - 1) * self.pagination["limit"]
# random page jumping was removed due to performance concerns
if (self.last_record_sort_value is None and self.last_record_unique_id is not None) or (
self.last_record_sort_value is not None and self.last_record_unique_id is None
):
# malformed request
raise Exception(
"Using search_after functionality in Elasticsearch requires both"
" last_record_sort_value and last_record_unique_id."
)
if record_num >= settings.ES_AWARDS_MAX_RESULT_WINDOW and (
self.last_record_unique_id is None and self.last_record_sort_value is None
):
raise UnprocessableEntityException(
f"Page #{self.pagination['page']} with limit {self.pagination['limit']} is over the maximum result"
f" limit {settings.ES_AWARDS_MAX_RESULT_WINDOW}. Please provide the 'last_record_sort_value' and"
" 'last_record_unique_id' to paginate sequentially."
)
# Search_after values are provided in the API request - use search after
if self.last_record_sort_value is not None and self.last_record_unique_id is not None:
search = (
AwardSearch()
.filter(filter_query)
.sort(*sorts)
.extra(search_after=[self.last_record_sort_value, self.last_record_unique_id])[
: self.pagination["limit"] + 1
] # add extra result to check for next page
)
# no values, within result window, use regular elasticsearch
else:
search = AwardSearch().filter(filter_query).sort(*sorts)[record_num : record_num + self.pagination["limit"]]
response = search.handle_execute()
return response
# For an unknown reason, ES tends to return the awarding agency toptier codes as integers or floats, instead of as
# text. This function casts the code back to a string and appends any leading zeroes that were lost.
def get_agency_database_id(self, code):
code = str(code).zfill(3)
agency_id = Agency.objects.filter(toptier_agency__toptier_code=code, toptier_flag=True).first()
submission = SubmissionAttributes.objects.filter(toptier_code=code).first()
if submission is None or agency_id is None:
return None
return agency_id.id
def get_agency_slug(self, code):
code = str(code).zfill(3)
submission = ToptierAgencyPublishedDABSView.objects.filter(toptier_code=code).first()
if submission is None:
return None
return slugify(submission.name)
def construct_es_response_for_prime_awards(self, response) -> dict:
results = []
should_return_display_award_id = "Award ID" in self.fields
should_return_recipient_id = "recipient_id" in self.fields
for res in response:
hit = res.to_dict()
row = {k: hit[v] for k, v in self.constants["internal_id_fields"].items()}
# Parsing API response values from ES query result JSON
# We parse the `hit` (result from elasticsearch) to get the award type, use the type to determine
# which lookup dict to use, and then use that lookup to retrieve the correct value requested from `fields`
for field in self.fields:
row[field] = hit.get(
self.constants["elasticsearch_type_code_to_field_map"][hit[self.constants["award_semaphore"]]].get(
field
)
)
if "Awarding Agency" in self.fields:
row["agency_code"] = hit["awarding_toptier_agency_code"]
row["internal_id"] = int(row["internal_id"])
if row.get("Loan Value"):
row["Loan Value"] = float(row["Loan Value"])
if row.get("Subsidy Cost"):
row["Subsidy Cost"] = float(row["Subsidy Cost"])
if row.get("Award Amount"):
row["Award Amount"] = float(row["Award Amount"])
if row.get("Total Outlays"):
row["Total Outlays"] = float(row["Total Outlays"])
if row.get("Awarding Agency"):
code = row.pop("agency_code")
row["awarding_agency_id"] = self.get_agency_database_id(code)
row["agency_slug"] = self.get_agency_slug(code)
if row.get("COVID-19 Obligations"):
row["COVID-19 Obligations"] = sum(
[
(
x["obligation"]
if (self.filters.get("def_codes") is not None and x["defc"] in self.filters["def_codes"])
or self.filters.get("def_codes") is None
else 0
)
for x in row.get("COVID-19 Obligations")
]
)
if row.get("COVID-19 Outlays"):
row["COVID-19 Outlays"] = sum(
[
(
x["outlay"]
if (self.filters.get("def_codes") is not None and x["defc"] in self.filters["def_codes"])
or self.filters.get("def_codes") is None
else 0
)
for x in row.get("COVID-19 Outlays")
]
)
if row.get("Infrastructure Obligations"):
row["Infrastructure Obligations"] = sum(
[
(
x["obligation"]
if (self.filters.get("def_codes") is not None and x["defc"] in self.filters["def_codes"])
or self.filters.get("def_codes") is None
else 0
)
for x in row.get("Infrastructure Obligations")
]
)
if row.get("Infrastructure Outlays"):
row["Infrastructure Outlays"] = sum(
[
(
x["outlay"]
if (self.filters.get("def_codes") is not None and x["defc"] in self.filters["def_codes"])
or self.filters.get("def_codes") is None
else 0
)
for x in row.get("Infrastructure Outlays")
]
)
if row.get("def_codes"):
if self.filters.get("def_codes"):
row["def_codes"] = list(filter(lambda x: x in self.filters.get("def_codes"), row["def_codes"]))
row["generated_internal_id"] = hit["generated_unique_award_id"]
if should_return_display_award_id:
row["Award ID"] = hit["display_award_id"]
if should_return_recipient_id:
row["recipient_id"] = self.get_recipient_hash_with_level(hit)
results.append(row)
last_record_unique_id = None
last_record_sort_value = None
offset = 1
if self.last_record_unique_id is not None:
has_next = len(results) > self.pagination["limit"]
offset = 2
else:
has_next = (
response.hits.total.value - (self.pagination["page"] - 1) * self.pagination["limit"]
> self.pagination["limit"]
)
if len(response) > 0 and has_next:
last_record_unique_id = response[len(response) - offset].meta.sort[1]
last_record_sort_value = response[len(response) - offset].meta.sort[0]
return {
"limit": self.pagination["limit"],
"results": results[: self.pagination["limit"]],
"page_metadata": {
"page": self.pagination["page"],
"hasNext": has_next,
"last_record_unique_id": last_record_unique_id,
"last_record_sort_value": str(last_record_sort_value),
},
"messages": get_generic_filters_message(
self.original_filters.keys(), [elem["name"] for elem in AWARD_FILTER_NO_RECIPIENT_ID]
),
}
def get_recipient_hash_with_level(self, award_doc):
recipient_info = award_doc.get("recipient_agg_key").split("/")
recipient_hash = recipient_info[0] if recipient_info else None
if len(recipient_info) > 1 and recipient_info[1].upper() != "NONE":
recipient_levels = literal_eval(recipient_info[1])
else:
recipient_levels = []
if recipient_hash is None or len(recipient_levels) == 0:
return None
recipient_level = RecipientProfile.return_one_level(recipient_levels)
return f"{recipient_hash}-{recipient_level}"