okfn-brasil/serenata-de-amor

View on GitHub
rosie/rosie/chamber_of_deputies/classifiers/meal_price_outlier_classifier.py

Summary

Maintainability
A
35 mins
Test Coverage
import unicodedata

import numpy as np
import pandas as pd
from sklearn.base import TransformerMixin
from sklearn.cluster import KMeans


class MealPriceOutlierClassifier(TransformerMixin):
    """
    Meal Price Outlier classifier.

    Dataset
    -------
    applicant_id : string column
        A personal identifier code for every person making expenses.

    category : category column
        Category of the expense. The model will be applied just in rows where
        the value is equal to "Meal".

    net_value : float column
        The value of the expense.

    recipient_id : string column
        A CNPJ (Brazilian company ID) or CPF (Brazilian personal tax ID).
    """

    HOTEL_REGEX = r'hote(?:(?:ls?)|is)'
    CLUSTER_KEYS = ['mean', 'std']
    COLS = ['applicant_id',
            'category',
            'net_value',
            'recipient',
            'recipient_id']

    def fit(self, X):
        _X = X[self.__applicable_rows(X)]
        companies = _X.groupby('recipient_id').apply(self.__company_stats) \
            .reset_index()
        companies = companies[self.__applicable_company_rows(companies)]

        self.cluster_model = KMeans(n_clusters=3)
        self.cluster_model.fit(companies[self.CLUSTER_KEYS])
        companies['cluster'] = self.cluster_model.predict(companies[self.CLUSTER_KEYS])
        self.clusters = companies.groupby('cluster') \
            .apply(self.__cluster_stats) \
            .reset_index()
        self.clusters['threshold'] = \
            self.clusters['mean'] + 4 * self.clusters['std']
        return self

    def transform(self, X=None):
        pass

    def predict(self, X):
        _X = X[self.COLS].copy()
        companies = _X[self.__applicable_rows(_X)] \
            .groupby('recipient_id').apply(self.__company_stats) \
            .reset_index()
        companies['cluster'] = \
            self.cluster_model.predict(companies[self.CLUSTER_KEYS])
        companies = pd.merge(companies,
                             self.clusters[['cluster', 'threshold']],
                             how='left')
        _X = pd.merge(_X, companies[['recipient_id', 'threshold']], how='left')
        known_companies = companies[self.__applicable_company_rows(companies)]
        known_thresholds = known_companies \
            .groupby('recipient_id') \
            .apply(lambda x: x['mean'] + 3 * x['std']) \
            .reset_index() \
            .rename(columns={0: 'cnpj_threshold'})
        _X = pd.merge(_X, known_thresholds, how='left')
        if 'cnpj_threshold' in _X.columns:
            _X.loc[_X['cnpj_threshold'].notnull(),
                   'threshold'] = _X['cnpj_threshold']
        _X['y'] = 1
        is_outlier = self.__applicable_rows(_X) & \
            _X['threshold'].notnull() & \
            (_X['net_value'] > _X['threshold'])
        _X.loc[is_outlier, 'y'] = -1
        return _X['y']

    def __applicable_rows(self, X):
        return (X['category'] == 'Meal') & \
            (X['recipient_id'].str.len() == 14) & \
            (~X['recipient'].fillna('').apply(self.__normalize_string).str.contains(self.HOTEL_REGEX))

    def __applicable_company_rows(self, companies):
        return (companies['congresspeople'] > 3) & (companies['records'] > 20)

    def __company_stats(self, X):
        stats = {'mean': np.mean(X['net_value']),
                 'std': np.std(X['net_value']),
                 'congresspeople': len(np.unique(X['applicant_id'])),
                 'records': len(X)}
        return pd.Series(stats)

    def __cluster_stats(self, X):
        stats = {'mean': np.mean(X['mean']),
                 'std': np.mean(X['std'])}
        return pd.Series(stats)

    def __normalize_string(self, string):
        nfkd_form = unicodedata.normalize('NFKD', string.lower())
        return nfkd_form.encode('ASCII', 'ignore').decode('utf-8')