okfn-brasil/serenata-de-amor

View on GitHub
rosie/rosie/chamber_of_deputies/adapter.py

Summary

Maintainability
A
0 mins
Test Coverage
import logging
import os
from datetime import date
from pathlib import Path
from re import match

import numpy as np
import pandas as pd
from urllib.error import HTTPError

from serenata_toolbox.chamber_of_deputies.reimbursements import Reimbursements
from serenata_toolbox.datasets import fetch


class Adapter:

    STARTING_YEAR = 2009
    COMPANIES_DATASET = '2016-09-03-companies.xz'
    REIMBURSEMENTS_PATTERN = r'reimbursements-\d{4}\.csv'
    RENAME_COLUMNS = {
        'subquota_description': 'category',
        'total_net_value': 'net_value',
        'cnpj_cpf': 'recipient_id',
        'supplier': 'recipient'
    }
    DTYPE = {
        'applicant_id': np.str,
        'cnpj_cpf': np.str,
        'congressperson_id': np.str,
        'subquota_number': np.str
    }

    def __init__(self, path):
        self.path = path
        self.log = logging.getLogger(__name__)

    @property
    def dataset(self):
        self.update_datasets()
        df = self.reimbursements.merge(
            self.companies,
            how='left',
            left_on='cnpj_cpf',
            right_on='cnpj'
        )
        self.prepare_dataset(df)
        self.log.info('Dataset ready! Rosie starts her analysis now :)')
        return df

    @property
    def companies(self):
        self.log.info('Loading companies')
        path = Path(self.path) / self.COMPANIES_DATASET
        df = pd.read_csv(path, dtype={'cnpj': np.str}, low_memory=False)
        df['cnpj'] = df['cnpj'].str.replace(r'\D', '')
        return df

    @property
    def reimbursements(self):
        df = pd.DataFrame()
        paths = (
            str(path) for path in Path(self.path).glob('*.csv')
            if match(self.REIMBURSEMENTS_PATTERN, path.name)
        )

        for path in paths:
            self.log.info(f'Loading reimbursements from {path}')
            year_df = pd.read_csv(path, dtype=self.DTYPE, low_memory=False)
            df = df.append(year_df)

        return df

    def update_datasets(self):
        self.update_companies()
        self.update_reimbursements()

    def update_companies(self):
        self.log.info('Updating companies')
        os.makedirs(self.path, exist_ok=True)
        fetch(self.COMPANIES_DATASET, self.path)

    def update_reimbursements(self, years=None):
        if not years:
            next_year = date.today().year + 1
            years = range(self.STARTING_YEAR, next_year)

        for year in years:
            self.log.info(f'Updating reimbursements from {year}')
            try:
                Reimbursements(year, self.path)()
            except HTTPError as e:
                self.log.error(f'Could not update Reimbursements from year {year}: {e} - {e.filename}')

    def prepare_dataset(self, df):
        self.rename_categories(df)
        self.coerce_dates(df)
        self.rename_columns(df)

    def rename_columns(self, df):
        self.log.info('Renaming columns to Serenata de Amor standard')
        df.rename(columns=self.RENAME_COLUMNS, inplace=True)

    def rename_categories(self, df):
        self.log.info('Categorizing reimbursements')

        # There's no documented type for `3`, `4` and `5`, thus we assume it's
        # an input error until we hear back from Chamber of Deputies
        types = ('bill_of_sale', 'simple_receipt', 'expense_made_abroad')
        converters = {number: None for number in range(3, 6)}
        df['document_type'].replace(converters, inplace=True)
        df['document_type'] = df['document_type'].astype('category')
        df['document_type'].cat.rename_categories(types, inplace=True)

        # Some classifiers expect a more broad category name for meals
        rename = {'Congressperson meal': 'Meal'}
        df['subquota_description'] = df['subquota_description'].replace(rename)
        df['is_party_expense'] = df['congressperson_id'].isnull()

    def coerce_dates(self, df):
        for field, fmt in (('issue_date', '%Y-%m-%d'), ('situation_date', '%d/%m/%Y')):
            self.log.info(f'Coercing {field} column to date data type')
            df[field] = pd.to_datetime(df[field], format=fmt, errors='coerce')