fedspendingtransparency/usaspending-api

View on GitHub
usaspending_api/idvs/v2/views/count/federal_account.py

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
from psycopg2.sql import Identifier, Literal, SQL
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView

from usaspending_api.common.cache_decorator import cache_response
from usaspending_api.common.helpers.sql_helpers import execute_sql_to_ordered_dictionary
from usaspending_api.common.validator.award import get_internal_or_generated_award_id_model
from usaspending_api.common.validator.tinyshield import TinyShield


GET_COUNT_SQL = SQL(
    """
    with gather_award_ids as (
        select  award_id
        from    parent_award
        where   {award_id_column} = {award_id}
        union all
        select  cpa.award_id
        from    parent_award ppa
                inner join parent_award cpa on cpa.parent_award_id = ppa.award_id
        where   ppa.{award_id_column} = {award_id}
    ), gather_awards as (
        select  id award_id
        from    vw_awards
        where   {awards_table_id_column} = {award_id} and
                (piid = {piid} or {piid} is null)
        union   all
        select  ca.id award_id
        from    gather_award_ids gaids
                inner join vw_awards pa on pa.id = gaids.award_id
                inner join vw_awards ca on
                    ca.parent_award_piid = pa.piid and
                    ca.fpds_parent_agency_id = pa.fpds_agency_id and
                    (ca.piid = {piid} or {piid} is null)
    ), gather_financial_accounts_by_awards as (
        select  ga.award_id,
                faba.financial_accounts_by_awards_id
        from    gather_awards ga
                inner join financial_accounts_by_awards faba on faba.award_id = ga.award_id
    )
    select
        count(distinct gfaba.financial_accounts_by_awards_id)
    from
        gather_financial_accounts_by_awards gfaba
"""
)


class IDVFederalAccountCountViewSet(APIView):
    """Returns the total number of funding transactions for an IDV's child and grandchild awards, but not the IDV itself."""

    endpoint_doc = "usaspending_api/api_contracts/contracts/v2/idvs/count/federal_account/award_id.md"

    @staticmethod
    def _parse_and_validate_request(requested_award: str, request_data: dict) -> dict:
        piid = request_data.get("piid")
        data = {"award_id": requested_award, "piid": piid}
        models = [
            get_internal_or_generated_award_id_model(),
            {
                "key": "piid",
                "name": "piid",
                "allow_nulls": True,
                "optional": True,
                "type": "text",
                "text_type": "search",
            },
        ]
        return TinyShield(models).block(data)

    @staticmethod
    def _business_logic(request_data: dict) -> list:
        # By this point, our award_id has been validated and cleaned up by
        # TinyShield.  We will either have an internal award id that is an
        # integer or a generated award id that is a string.
        award_id = request_data["award_id"]
        award_id_column = "award_id" if type(award_id) is int else "generated_unique_award_id"
        awards_table_id_column = "id" if type(award_id) is int else "generated_unique_award_id"

        sql = GET_COUNT_SQL.format(
            award_id_column=Identifier(award_id_column),
            awards_table_id_column=Identifier(awards_table_id_column),
            award_id=Literal(award_id),
            piid=Literal(request_data.get("piid")),
        )

        return execute_sql_to_ordered_dictionary(sql)

    @cache_response()
    def get(self, request: Request, requested_award: str) -> Response:
        request_data = self._parse_and_validate_request(requested_award, request.GET)
        results = self._business_logic(request_data)
        return Response(results[0])