fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/awards/v2/filters/location_filter_geocode.py

Summary

Maintainability
A
0 mins
Test Coverage
A
93%
from django.db.models import Q

from usaspending_api.common.exceptions import InvalidParameterException
from usaspending_api.common.helpers.api_helper import (
    DUPLICATE_DISTRICT_LOCATION_PARAMETERS,
    INCOMPATIBLE_DISTRICT_LOCATION_PARAMETERS,
)
from usaspending_api.common.helpers.dict_helpers import upper_case_dict_values

ALL_FOREIGN_COUNTRIES = "FOREIGN"


def geocode_filter_locations(scope: str, values: list) -> Q:
    """
    Function filter querysets on location table
    scope- place of performance or recipient location mappings
    values- array of location requests
    returns queryset
    """
    or_queryset = Q()

    # creates a dictionary with all of the locations organized by country
    # Counties and congressional districts are nested under state codes
    nested_values = create_nested_object(values)

    # In this for-loop a django Q filter object is created from the python dict
    for country, state_zip in nested_values.items():
        country_qs = None
        if country != ALL_FOREIGN_COUNTRIES:
            country_qs = Q(**{f"{scope}_country_code__exact": country})
        state_qs = Q()

        for state_zip_key, location_values in state_zip.items():

            if state_zip_key == "city":
                state_inner_qs = Q(**{f"{scope}_city_name__in": location_values})
            elif state_zip_key == "zip":
                state_inner_qs = Q(**{f"{scope}_zip5__in": location_values})
            else:
                state_inner_qs = Q(**{f"{scope}_state_code__exact": state_zip_key.upper()})
                county_qs = Q()
                district_qs = Q()
                city_qs = Q()

                if location_values["county"]:
                    county_qs = Q(**{f"{scope}_county_code__in": location_values["county"]})
                if location_values["district_current"]:
                    district_qs = Q(**{f"{scope}_congressional_code_current__in": location_values["district_current"]})
                if location_values["district_original"]:
                    district_qs = Q(**{f"{scope}_congressional_code__in": location_values["district_original"]})
                if location_values["city"]:
                    city_qs = Q(**{f"{scope}_city_name__in": location_values["city"]})
                state_inner_qs &= county_qs | district_qs | city_qs

            state_qs |= state_inner_qs
        if country_qs:
            or_queryset |= country_qs & state_qs
        else:
            or_queryset |= state_qs
    return or_queryset


def validate_location_keys(values):
    """ Validate that the keys provided are sufficient and match properly. """
    for v in values:
        state = v.get("state")
        country = v.get("country")
        county = v.get("county")
        district_current = v.get("district_current")
        district_original = v.get("district_original")
        if (state is None or country != "USA" or county is not None) and (
            district_current is not None or district_original is not None
        ):
            raise InvalidParameterException(INCOMPATIBLE_DISTRICT_LOCATION_PARAMETERS)
        if district_current is not None and district_original is not None:
            raise InvalidParameterException(DUPLICATE_DISTRICT_LOCATION_PARAMETERS)
        if ("country" not in v) or ("county" in v and "state" not in v):
            location_error_handling(v.keys())


def create_nested_object(values):
    """ Makes sure keys provided are valid """
    validate_location_keys(values)

    nested_locations = {}
    for v in values:
        upper_case_dict_values(v)
        city = v.get("city")
        country = v.get("country")
        county = v.get("county")
        district_original = v.get("district_original")
        district_current = v.get("district_current")
        state = v.get("state")
        zip = v.get("zip")
        # First level in location filtering in country
        # All location requests must have a country otherwise there will be a key error
        if nested_locations.get(country) is None:
            nested_locations[country] = {}

        # Initialize the list
        if zip and not nested_locations[country].get("zip"):
            nested_locations[country]["zip"] = []

        if city and not nested_locations[country].get("city"):
            nested_locations[country]["city"] = []

        # Second level of filtering is zip and state
        # Requests must have a country+zip or country+state combination
        if zip:
            # Appending zips so we don't overwrite
            nested_locations[country]["zip"].append(zip)

        # If we have a state, add it to the list
        if state and nested_locations[country].get(state) is None:
            nested_locations[country][state] = {
                "county": [],
                "district_original": [],
                "district_current": [],
                "city": [],
            }

        # Based on previous checks, there will always be a state if either of these exist
        if county:
            nested_locations[country][state]["county"].extend(get_fields_list("county", county))

        if district_current:
            nested_locations[country][state]["district_current"].extend(
                get_fields_list("district_current", district_current)
            )

        if district_original:
            nested_locations[country][state]["district_original"].extend(
                get_fields_list("district_original", district_original)
            )

        if city and state:
            nested_locations[country][state]["city"].append(city)
        elif city:
            nested_locations[country]["city"].append(city)

    return nested_locations


def location_error_handling(fields):
    """ Raise the relevant error for location keys. """
    # Request must have country, and can only have 3 fields, and must have state if there is county
    if "country" not in fields:
        raise InvalidParameterException("Invalid filter:  Missing necessary location field: country.")

    if "state" not in fields and ("county" in fields):
        raise InvalidParameterException("Invalid filter:  Missing necessary location field: state.")


def get_fields_list(scope, field_value):
    """ List of values to search for; `field_value`, plus possibly variants on it """
    if scope in ["congressional_code", "county_code"]:
        try:
            # Congressional and county codes are not uniform and contain multiple variables
            # In the location table Ex congressional code (01): '01', '1.0', '1'
            return [str(int(field_value)), field_value, str(float(field_value))]
        except ValueError:
            # if filter causes an error when casting to a float or integer
            # Example: 'ZZ' for an area without a congressional code
            pass
    return [field_value]