fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/awards/v2/filters/sub_award.py

Summary

Maintainability
D
2 days
Test Coverage
B
87%
import itertools
import logging

from django.db.models import Exists, OuterRef, Q

from usaspending_api.awards.models import TransactionNormalized
from usaspending_api.awards.v2.filters.filter_helpers import combine_date_range_queryset, total_obligation_queryset
from usaspending_api.awards.v2.filters.location_filter_geocode import ALL_FOREIGN_COUNTRIES, create_nested_object
from usaspending_api.common.exceptions import InvalidParameterException
from usaspending_api.references.models import PSC
from usaspending_api.search.filters.postgres.defc import DefCodes
from usaspending_api.search.filters.postgres.psc import PSCCodes
from usaspending_api.search.filters.postgres.tas import TasCodes, TreasuryAccounts
from usaspending_api.search.helpers.matview_filter_helpers import build_award_ids_filter
from usaspending_api.search.models import SubawardSearch
from usaspending_api.search.v2 import elasticsearch_helper
from usaspending_api.settings import API_MAX_DATE, API_MIN_DATE, API_SEARCH_MIN_DATE

logger = logging.getLogger(__name__)


def subaward_download(filters):
    """Used by the Custom download"""
    return subaward_filter(filters, for_downloads=True)


def geocode_filter_subaward_locations(scope: str, values: list) -> Q:
    """
    Function filter querysets for location data in subawards
    scope- place of performance or recipient location mappings
    values- array of location requests
    returns queryset
    """
    or_queryset = Q()

    # Yes, these are mostly the same, but congressional_code is different
    # and I'd rather have them all laid out here versus burying a extra couple lines for congressional_code
    location_mappings = {
        "country_code": {"sub_legal_entity": "country_code", "sub_place_of_perform": "country_co"},
        "zip5": {"sub_legal_entity": "zip5", "sub_place_of_perform": "zip5"},
        "city_name": {"sub_legal_entity": "city_name", "sub_place_of_perform": "city_name"},
        "state_code": {"sub_legal_entity": "state_code", "sub_place_of_perform": "state_code"},
        "county_code": {"sub_legal_entity": "county_code", "sub_place_of_perform": "county_code"},
        "congressional_code": {"sub_legal_entity": "congressional", "sub_place_of_perform": "congressio"},
        "current_congressional_code": {
            "sub_legal_entity": "sub_legal_entity_congressional_current",
            # Due to the rigidness of how we map values to columns
            # it's required that the column start with sub_place_of_perform
            # however, when current congressional codes were implemented
            # the column name chosen did not match this pattern.
            # That's why we have the full column name in the value
            "sub_place_of_perform": "sub_place_of_performance_congressional_current",
        },
    }
    location_mappings = {location_type: field_dict[scope] for location_type, field_dict in location_mappings.items()}

    # creates a dictionary with all of the locations organized by country
    # Counties and congressional districts are nested under state codes
    nested_values = create_nested_object(values)

    # In this for-loop a django Q filter object is created from the python dict
    for country, state_zip in nested_values.items():
        country_qs = None
        if country != ALL_FOREIGN_COUNTRIES:
            country_qs = Q(**{f"{scope}_{location_mappings['country_code']}__exact": country})
        state_qs = Q()

        for state_zip_key, location_values in state_zip.items():
            if state_zip_key == "city":
                state_inner_qs = Q(**{f"{scope}_{location_mappings['city_name']}__in": location_values})
            elif state_zip_key == "zip":
                state_inner_qs = Q(**{f"{scope}_{location_mappings['zip5']}__in": location_values})
            else:
                state_inner_qs = Q(**{f"{scope}_{location_mappings['state_code']}__exact": state_zip_key.upper()})
                county_qs = Q()
                district_qs = Q()
                city_qs = Q()

                if location_values["county"]:
                    county_qs = Q(**{f"{scope}_{location_mappings['county_code']}__in": location_values["county"]})
                if location_values["district_current"]:
                    district_qs = Q(
                        **{
                            f"{location_mappings['current_congressional_code']}__in": location_values[
                                "district_current"
                            ]
                        }
                    )
                if location_values["district_original"]:
                    district_qs = Q(
                        **{
                            f"{scope}_{location_mappings['congressional_code']}__in": location_values[
                                "district_original"
                            ]
                        }
                    )
                if location_values["city"]:
                    city_qs = Q(**{f"{scope}_{location_mappings['city_name']}__in": location_values["city"]})
                state_inner_qs &= county_qs | district_qs | city_qs

            state_qs |= state_inner_qs
        if country_qs:
            or_queryset |= country_qs & state_qs
        else:
            or_queryset |= state_qs
    return or_queryset


# TODO: Performance when multiple false values are initially provided
def subaward_filter(filters, for_downloads=False):
    queryset = SubawardSearch.objects.all()

    recipient_scope_q = Q(sub_legal_entity_country_code="USA") | Q(sub_legal_entity_country_name="UNITED STATES")
    pop_scope_q = Q(sub_place_of_perform_country_co="USA") | Q(sub_place_of_perform_country_name="UNITED STATES")

    for key, value in filters.items():
        if value is None:
            raise InvalidParameterException("Invalid filter: " + key + " has null as its value.")

        key_list = [
            "keywords",
            "elasticsearch_keyword",
            "time_period",
            "award_type_codes",
            "prime_and_sub_award_types",
            "agencies",
            "legal_entities",
            "recipient_search_text",
            "recipient_scope",
            "recipient_locations",
            "recipient_type_names",
            "place_of_performance_scope",
            "place_of_performance_locations",
            "award_amounts",
            "award_ids",
            "program_numbers",
            "naics_codes",
            PSCCodes.underscore_name,
            "contract_pricing_type_codes",
            "set_aside_type_codes",
            "extent_competed_type_codes",
            TasCodes.underscore_name,
            TreasuryAccounts.underscore_name,
            "def_codes",
        ]

        if key not in key_list:
            raise InvalidParameterException("Invalid filter: " + key + " does not exist.")

        if key == "keywords":

            def keyword_parse(keyword):
                # keyword_ts_vector & award_ts_vector are Postgres TS_vectors.
                # keyword_ts_vector = recipient_name + psc_description + subaward_description
                # award_ts_vector = piid + fain + uri + subaward_number
                filter_obj = Q(keyword_ts_vector=keyword) | Q(award_ts_vector=keyword)
                # Commenting out until NAICS is associated with subawards in DAIMS 1.3.1
                # if keyword.isnumeric():
                #     filter_obj |= Q(naics_code__contains=keyword)
                if len(keyword) == 4 and PSC.objects.filter(code__iexact=keyword).exists():
                    filter_obj |= Q(product_or_service_code__iexact=keyword)

                return filter_obj

            filter_obj = Q()
            for keyword in value:
                filter_obj |= keyword_parse(keyword)

            # Search for DUNS
            potential_duns = list(filter((lambda x: len(x) == 9), value))
            if len(potential_duns) > 0:
                filter_obj |= Q(sub_awardee_or_recipient_uniqu__in=potential_duns) | Q(
                    sub_ultimate_parent_unique_ide__in=potential_duns
                )

            # Search for UEI
            potential_ueis = list(filter((lambda x: len(x) == 12), value))
            potential_ueis = [uei.upper() for uei in potential_ueis]
            if len(potential_ueis) > 0:
                filter_obj |= Q(sub_awardee_or_recipient_uei__in=potential_ueis) | Q(
                    sub_ultimate_parent_uei__in=potential_ueis
                )

            queryset = queryset.filter(filter_obj)

        elif key == "elasticsearch_keyword":
            keyword = value
            transaction_ids = elasticsearch_helper.get_download_ids(keyword=keyword, field="transaction_id")
            # flatten IDs
            transaction_ids = list(itertools.chain.from_iterable(transaction_ids))
            logger.info("Found {} transactions based on keyword: {}".format(len(transaction_ids), keyword))
            transaction_ids = [str(transaction_id) for transaction_id in transaction_ids]
            queryset = queryset.filter(latest_transaction__isnull=False)

            # Prepare a SQL snippet to include in the predicate for searching an array of transaction IDs
            # TODO: Now that SubawardSearch has an FK field to TransactionSearch, we don't need the extra (raw SQL)
            #  Look to add a Django filter that does the same as below
            sql_fragment = (
                '"subaward_search"."latest_transaction_id" = ANY(\'{{{}}}\'::int[])'  # int[] -> int array type
            )
            queryset = queryset.extra(where=[sql_fragment.format(",".join(transaction_ids))])

        elif key == "time_period":
            min_date = API_SEARCH_MIN_DATE
            if for_downloads:
                min_date = API_MIN_DATE
            queryset &= combine_date_range_queryset(value, SubawardSearch, min_date, API_MAX_DATE, is_subaward=True)

        elif key == "award_type_codes":
            queryset = queryset.filter(prime_award_type__in=value)

        elif key == "prime_and_sub_award_types":
            award_types = value.get("sub_awards")
            if award_types:
                queryset = queryset.filter(prime_award_group__in=award_types)

        elif key == "agencies":
            # TODO: Make function to match agencies in award filter throwing dupe error
            funding_toptier = Q()
            funding_subtier = Q()
            awarding_toptier = Q()
            awarding_subtier = Q()
            for v in value:
                type = v["type"]
                tier = v["tier"]
                name = v["name"]
                if type == "funding":
                    if tier == "toptier":
                        funding_toptier |= Q(funding_toptier_agency_name=name)
                    elif tier == "subtier":
                        if "toptier_name" in v:
                            funding_subtier |= Q(funding_subtier_agency_name=name) & Q(
                                funding_toptier_agency_name=v["toptier_name"]
                            )
                        else:
                            funding_subtier |= Q(funding_subtier_agency_name=name)

                elif type == "awarding":
                    if tier == "toptier":
                        awarding_toptier |= Q(awarding_toptier_agency_name=name)
                    elif tier == "subtier":
                        if "toptier_name" in v:
                            awarding_subtier |= Q(awarding_subtier_agency_name=name) & Q(
                                awarding_toptier_agency_name=v["toptier_name"]
                            )
                        else:
                            awarding_subtier |= Q(awarding_subtier_agency_name=name)

            awarding_queryfilter = Q()
            funding_queryfilter = Q()

            # Since these are Q filters, no DB hits for boolean checks
            if funding_toptier:
                funding_queryfilter |= funding_toptier
            if funding_subtier:
                funding_queryfilter |= funding_subtier
            if awarding_toptier:
                awarding_queryfilter |= awarding_toptier
            if awarding_subtier:
                awarding_queryfilter |= awarding_subtier

            queryset = queryset.filter(funding_queryfilter & awarding_queryfilter)

        elif key == "legal_entities":
            # This filter key has effectively become obsolete by recipient_search_text
            msg = 'API request included "{}" key. No filtering will occur with provided value "{}"'
            logger.info(msg.format(key, value))

        elif key == "recipient_search_text":

            def recip_string_parse(recipient_string):
                upper_recipient_string = recipient_string.upper()

                # recipient_name_ts_vector is a postgres TS_Vector
                filter_obj = Q(recipient_name_ts_vector=upper_recipient_string)
                if len(upper_recipient_string) == 9 and upper_recipient_string[:5].isnumeric():
                    filter_obj |= Q(sub_awardee_or_recipient_uniqu=upper_recipient_string)
                elif len(upper_recipient_string) == 12:
                    filter_obj |= Q(sub_awardee_or_recipient_uei=upper_recipient_string)
                return filter_obj

            filter_obj = Q()
            for recipient in value:
                filter_obj |= recip_string_parse(recipient)
            queryset = queryset.filter(filter_obj)

        elif key == "recipient_scope":
            if value == "domestic":
                queryset = queryset.filter(recipient_scope_q)
            elif value == "foreign":
                queryset = queryset.exclude(recipient_scope_q)
            else:
                raise InvalidParameterException("Invalid filter: recipient_scope type is invalid.")

        elif key == "recipient_locations":
            queryset = queryset.filter(geocode_filter_subaward_locations("sub_legal_entity", value))

        elif key == "recipient_type_names":
            if len(value) != 0:
                queryset = queryset.filter(business_categories__overlap=value)

        elif key == "place_of_performance_scope":
            if value == "domestic":
                queryset = queryset.filter(pop_scope_q)
            elif value == "foreign":
                queryset = queryset.exclude(pop_scope_q)
            else:
                raise InvalidParameterException("Invalid filter: place_of_performance_scope is invalid.")

        elif key == "place_of_performance_locations":
            queryset = queryset.filter(geocode_filter_subaward_locations("sub_place_of_perform", value))

        elif key == "award_amounts":
            queryset &= total_obligation_queryset(value, SubawardSearch, filters, is_subaward=True)

        elif key == "award_ids":
            queryset = build_award_ids_filter(queryset, value, ("piid", "fain"))

        elif key == PSCCodes.underscore_name:
            q = PSCCodes.build_tas_codes_filter(value)
            queryset = queryset.filter(q) if q else queryset

        # add "naics_codes" (column naics) after NAICS are mapped to subawards
        elif key in ("contract_pricing_type_codes"):
            if len(value) != 0:
                queryset &= SubawardSearch.objects.filter(type_of_contract_pricing__in=value)

        elif key == "program_numbers":
            if len(value) != 0:
                queryset = queryset.filter(
                    Exists(
                        TransactionNormalized.objects.filter(
                            award_id=OuterRef("award_id"),
                            assistance_data__cfda_number__in=value,
                        )
                    )
                )

        elif key in ("set_aside_type_codes", "extent_competed_type_codes"):
            or_queryset = Q()
            filter_to_col = {"set_aside_type_codes": "type_set_aside", "extent_competed_type_codes": "extent_competed"}
            in_query = [v for v in value]
            for v in value:
                or_queryset |= Q(**{"{}__exact".format(filter_to_col[key]): in_query})
            queryset = queryset.filter(or_queryset)

        # Because these two filters OR with each other, we need to know about the presence of both filters to know what to do
        # This filter was picked arbitrarily to be the one that checks for the other
        elif key == TasCodes.underscore_name:
            q = TasCodes.build_tas_codes_filter(queryset, value)
            if TreasuryAccounts.underscore_name in filters.keys():
                q |= TreasuryAccounts.build_tas_codes_filter(queryset, filters[TreasuryAccounts.underscore_name])
            queryset = queryset.filter(q)

        elif key == TreasuryAccounts.underscore_name and TasCodes.underscore_name not in filters.keys():
            queryset = queryset.filter(TreasuryAccounts.build_tas_codes_filter(queryset, value))

        elif key == "def_codes":
            queryset = queryset.filter(DefCodes.build_def_codes_filter(value))

    return queryset