webservices/resources/elections.py

Summary

Maintainability
A
4 hrs
Test Coverage
import sqlalchemy as sa
from sqlalchemy import cast, Integer
from flask_apispec import doc, marshal_with

from webservices import args
from webservices import docs
from webservices import utils
from webservices import filters
from webservices import schemas
from webservices.utils import use_kwargs
from webservices.common.models import (
    db, CandidateHistory, CandidateCommitteeLink,
    CommitteeTotalsPresidential, CommitteeTotalsHouseSenate,
    ElectionResult, ElectionsList, ZipsDistricts, ScheduleEByCandidate,
)


office_totals_map = {
    'president': CommitteeTotalsPresidential,
    'senate': CommitteeTotalsHouseSenate,
    'house': CommitteeTotalsHouseSenate,
}
office_args_map = {
    'house': ['state', 'district'],
    'senate': ['state'],
}


def cycle_length(elections):
    return sa.case(
        [
            (elections.c.office == 'P', 4),
            (elections.c.office == 'S', 6),
            (elections.c.office == 'H', 2),
        ]
    )


@doc(
    description=docs.ELECTION_SEARCH,
    tags=['financial']
)
class ElectionsListView(utils.Resource):

    model = ElectionsList
    schema = schemas.ElectionsListSchema
    page_schema = schemas.ElectionsListPageSchema

    filter_multi_fields = [
        ('cycle', ElectionsList.cycle),
    ]

    @use_kwargs(args.paging)
    @use_kwargs(args.elections_list)
    @use_kwargs(args.make_multi_sort_args(default=['sort_order', 'district',]))
    @marshal_with(schemas.ElectionsListPageSchema())
    def get(self, **kwargs):
        query = self._get_elections(kwargs)
        return utils.fetch_page(query, kwargs, model=ElectionsList, multi=True)

    def _get_elections(self, kwargs):
        """Get elections from ElectionsList model."""
        query = db.session.query(ElectionsList)
        if kwargs.get('office'):
            values = [each[0].upper() for each in kwargs['office']]
            query = query.filter(ElectionsList.office.in_(values))
        if kwargs.get('state'):
            query = query.filter(
                sa.or_(
                    ElectionsList.state.in_(kwargs['state']),
                    ElectionsList.office == 'P',
                )
            )
        if kwargs.get('district'):
            query = query.filter(
                sa.or_(
                    ElectionsList.district.in_(kwargs['district']),
                    ElectionsList.office.in_(['P', 'S']),
                ),
            )
        if kwargs.get('zip'):
            query = self._filter_zip(query, kwargs)

        return filters.filter_multi(query, kwargs, self.filter_multi_fields)

    def _filter_zip(self, query, kwargs):
        """Filter query by zip codes."""
        districts = db.session.query(ZipsDistricts).filter(
            cast(ZipsDistricts.zip_code, Integer).in_(kwargs['zip']),
            ZipsDistricts.active == 'Y'
        ).subquery()
        return query.join(
            districts,
            sa.or_(
                # House races from matching states and districts
                sa.and_(
                    ElectionsList.district == districts.c['district'],
                    ElectionsList.state == districts.c['state_abbrevation'],
                ),
                # Senate and presidential races from matching states
                sa.and_(
                    sa.or_(
                        ElectionsList.district == '00'
                    ),
                    ElectionsList.state.in_([districts.c['state_abbrevation'], 'US'])
                ),
            )
        )


@doc(
    description=docs.ELECTIONS,
    tags=['financial']
)
class ElectionView(utils.Resource):

    @use_kwargs(args.paging)
    @use_kwargs(args.elections)
    @use_kwargs(args.make_sort_args(default='-total_receipts'))
    @marshal_with(schemas.ElectionPageSchema())
    def get(self, **kwargs):
        query = self._get_records(kwargs)
        return utils.fetch_page(query, kwargs, cap=0)

    def _get_records(self, kwargs):
        utils.check_election_arguments(kwargs)
        totals_model = office_totals_map[kwargs['office']]
        pairs = self._get_pairs(totals_model, kwargs).subquery()
        aggregates = self._get_aggregates(pairs).subquery()
        outcomes = self._get_outcomes(kwargs).subquery()
        latest = self._get_latest(pairs).subquery()
        return db.session.query(
            aggregates,
            latest,
            sa.case(
                [(outcomes.c.cand_id != None, True)],  # noqa
                else_=False,
            ).label('won'),
        ).outerjoin(
            latest,
            aggregates.c.candidate_id == latest.c.candidate_id,
        ).outerjoin(
            outcomes,
            aggregates.c.candidate_id == outcomes.c.cand_id,
        ).distinct()

    def _get_pairs(self, totals_model, kwargs):
        pairs = CandidateHistory.query.with_entities(
            CandidateHistory.candidate_id,
            CandidateHistory.name,
            CandidateHistory.party_full,
            CandidateHistory.incumbent_challenge_full,
            CandidateHistory.office,
            CandidateHistory.two_year_period,
            CandidateHistory.candidate_election_year,
            CandidateCommitteeLink.committee_id,
            totals_model.receipts,
            totals_model.disbursements,
            totals_model.last_cash_on_hand_end_period.label('cash_on_hand_end_period'),
            totals_model.coverage_end_date,
        )
        pairs = join_candidate_totals(pairs, kwargs, totals_model)
        pairs = filter_candidate_totals(pairs, kwargs, totals_model)
        return pairs

    def _get_latest(self, pairs):
        latest = db.session.query(
            pairs.c.cash_on_hand_end_period,
        ).distinct(
            pairs.c.candidate_id,
            pairs.c.cmte_id,
        ).order_by(
            pairs.c.candidate_id,
            pairs.c.cmte_id,
            sa.desc(pairs.c.two_year_period),
        ).subquery()
        return db.session.query(
            latest.c.candidate_id,
            sa.func.sum(sa.func.coalesce(latest.c.cash_on_hand_end_period,0.0)).label('cash_on_hand_end_period'),
        ).group_by(
            latest.c.candidate_id,
        )

    def _get_aggregates(self, pairs):
        return db.session.query(
            pairs.c.candidate_id,
            pairs.c.candidate_election_year,
            sa.func.max(pairs.c.name).label('candidate_name'),
            sa.func.max(pairs.c.party_full).label('party_full'),
            sa.func.max(pairs.c.incumbent_challenge_full).label('incumbent_challenge_full'),
            sa.func.max(pairs.c.office).label('office'),
            sa.func.sum(sa.func.coalesce(pairs.c.receipts, 0.0)).label('total_receipts'),
            sa.func.sum(sa.func.coalesce(pairs.c.disbursements, 0.0)).label('total_disbursements'),
            sa.func.sum(sa.func.coalesce(pairs.c.cash_on_hand_end_period, 0.0)).label('cash_on_hand_end_period'),
            sa.func.array_agg(sa.distinct(pairs.c.cmte_id)).label('committee_ids'),
            sa.func.max(pairs.c.coverage_end_date).label('coverage_end_date'),
        ).group_by(
            pairs.c.candidate_id,
            pairs.c.candidate_election_year
        )

    def _get_outcomes(self, kwargs):
        return db.session.query(
            ElectionResult.cand_id
        ).filter(
            ElectionResult.election_yr == kwargs['cycle'],
            ElectionResult.cand_office == kwargs['office'][0].upper(),
            ElectionResult.cand_office_st == (kwargs.get('state', 'US')),
            ElectionResult.cand_office_district == (kwargs.get('district', '00')),
        )


@doc(
    description=docs.ELECTION_SEARCH,
    tags=['financial']
)
class ElectionSummary(utils.Resource):

    @use_kwargs(args.elections)
    @marshal_with(schemas.ElectionSummarySchema())
    def get(self, **kwargs):
        utils.check_election_arguments(kwargs)
        aggregates = self._get_aggregates(kwargs).subquery()
        expenditures = self._get_expenditures(kwargs).subquery()
        return db.session.query(
            aggregates.c.count,
            aggregates.c.receipts,
            aggregates.c.disbursements,
            expenditures.c.independent_expenditures,
        ).first()._asdict()

    def _get_aggregates(self, kwargs):
        totals_model = office_totals_map[kwargs['office']]
        aggregates = CandidateHistory.query.with_entities(
            sa.func.count(sa.distinct(CandidateHistory.candidate_id)).label('count'),
            sa.func.sum(totals_model.receipts).label('receipts'),
            sa.func.sum(totals_model.disbursements).label('disbursements'),
        )
        aggregates = join_candidate_totals(aggregates, kwargs, totals_model)
        aggregates = filter_candidate_totals(aggregates, kwargs, totals_model)
        return aggregates

    def _get_expenditures(self, kwargs):
        expenditures = db.session.query(
            sa.func.sum(ScheduleEByCandidate.total).label('independent_expenditures'),
        ).select_from(
            CandidateHistory
        ).join(
            ScheduleEByCandidate,
            sa.and_(
                CandidateHistory.candidate_id == ScheduleEByCandidate.candidate_id,
                ScheduleEByCandidate.cycle == kwargs['cycle'],
            ),
        )
        expenditures = filter_candidates(expenditures, kwargs)
        return expenditures

election_durations = {
    'senate': 6,
    'president': 4,
    'house': 2,
}


def join_candidate_totals(query, kwargs, totals_model):
    return query.outerjoin(
        CandidateCommitteeLink,
        sa.and_(
            CandidateHistory.candidate_id == CandidateCommitteeLink.candidate_id,
            CandidateHistory.two_year_period == CandidateCommitteeLink.fec_election_year,
        )
    ).outerjoin(
        totals_model,
        sa.and_(
            CandidateCommitteeLink.committee_id == totals_model.committee_id,
            CandidateCommitteeLink.fec_election_year == totals_model.cycle,
        )
    )


def filter_candidates(query, kwargs):
    duration = (
        election_durations.get(kwargs['office'], 2)
        if kwargs.get('election_full')
        else 2
    )
    query = query.filter(
        CandidateHistory.two_year_period <= kwargs['cycle'],
        CandidateHistory.two_year_period > (kwargs['cycle'] - duration),
        # CandidateHistory.cycles.any(kwargs['cycle']),
        CandidateHistory.candidate_election_year + (CandidateHistory.candidate_election_year % 2) == kwargs['cycle'],
        CandidateHistory.office == kwargs['office'][0].upper(),

    )
    if kwargs.get('state'):
        query = query.filter(CandidateHistory.state == kwargs['state'])
    if kwargs.get('district'):
        query = query.filter(CandidateHistory.district == kwargs['district'])
    return query


def filter_candidate_totals(query, kwargs, totals_model):
    query = filter_candidates(query, kwargs)
    query = query.filter(
        CandidateHistory.candidate_inactive == False,  # noqa
        # CandidateCommitteeLink.committee_designation.in_(['P', 'A']),
    ).distinct()
    return query