okfn-brasil/serenata-de-amor

View on GitHub
research/src/get_family_names.py

Summary

Maintainability
A
0 mins
Test Coverage
import os
import re
import datetime

import requests
import numpy as np
import pandas as pd
from bs4 import BeautifulSoup

DATE = datetime.date.today().strftime('%Y-%m-%d')
DATA_DIR = 'data'
PROCESSED_DATA_FILE = '{}-congressperson-relatives.xz'
PROCESSED_DATA_PATH = os.path.join(DATA_DIR, PROCESSED_DATA_FILE).format(DATE)
RAW_DATA_FILE = '{}-congressperson-relatives-raw.xz'
RAW_DATA_PATH = os.path.join(DATA_DIR, RAW_DATA_FILE).format(DATE)

write_csv_params = {
    'compression': 'xz',
    'encoding': 'utf-8',
    'index': False}


def format_string(s):
    return s.strip().replace(':', '')


def extract_contents_from_div(div):
    return list(map(format_string,
                    filter(lambda x: '\n' not in x,
                           div[0].strings)))


def convert_to_dict(contents):
    return dict(zip(contents[1:-2:2],
                    contents[2:-1:2]))


def is_single_word(s):
    return (' ' not in s)


def fix_when_theres_a_single_surname_after_the_split(names):
    pairs = [(i, name) for i, name
             in enumerate(names) if is_single_word(name)]
    while pairs:
        i, name = pairs.pop(0)
        if i == 0:
            continue
        names_to_join = names[i - 1:i + 1]
        name = ' e '.join(names_to_join)
        for n in names_to_join:
            names.remove(n)
        names.append(name)
        pairs = [(i, name) for i, name
                 in enumerate(names) if is_single_word(name)]
    return names


def split_names(s):
    names = s.split(' e ')
    names = fix_when_theres_a_single_surname_after_the_split(names)
    return names


def create_one_row_per_parent(df):
    result = []
    dict_df = df.to_dict(orient='records')[0]
    for name in dict_df['parents_list']:
        result.append({'id': dict_df['id'],
                       'relative_name': name,
                       'relationship': 'son_of'})
    return pd.DataFrame(data=result)


def get_all_congress_people_ids():
    print('Fetching all congresspeople ids', end='\r')
    ids_series = [read_csv(name)['congressperson_id']
                  for name in ['current-year', 'last-year', 'previous-years']]
    return list(pd.concat(ids_series).unique())


def find_newest_file(name):
    """
    Assuming that the files will be in the form of :
    yyyy-mm-dd-type_of_file.xz we can try to find the newest file
    based on the date, but if the file doesn't exist fallback to another
    date until all dates are exhausted
    """
    date_regex = re.compile('\d{4}-\d{2}-\d{2}')

    matches = (date_regex.findall(f) for f in os.listdir(DATA_DIR))
    dates = sorted(set([l[0] for l in matches if l]), reverse=True)
    for date in dates:
        filename = os.path.join(DATA_DIR, '{}-{}.xz'.format(date, name))
        if os.path.isfile(filename):
            return filename

    return None


def read_csv(name):
    filename = find_newest_file(name)
    if filename is None:
        raise TypeError('could not find the dataset for {}.'.format(name))

    return pd.read_csv(filename,
                       parse_dates=[16],
                       dtype={'document_id': np.str,
                              'congressperson_id': np.str,
                              'congressperson_document': np.str,
                              'term_id': np.str,
                              'cnpj_cpf': np.str,
                              'reimbursement_number': np.str})


def write_formatted_data(df):
    people_with_two_or_less_parents = (
        df
        [df.parents_list.apply(len) <= 2]
        [['id', 'parents_list']])

    final = (
        people_with_two_or_less_parents
        .groupby('id')
        .apply(create_one_row_per_parent)
        .reset_index(drop=True))

    final.rename(inplace=True, columns={'id': 'congressperson_id'})
    final.to_csv(PROCESSED_DATA_PATH, **write_csv_params)


def write_raw_data(df):
    people_with_more_than_two_parents = (
        df
        [df.parents_list.apply(len) > 2]
        [['id', 'Filiação']])

    people_with_more_than_two_parents.rename(inplace=True, columns={
        'id': 'congressperson_id',
        'Filiação': 'parents'})

    if len(people_with_more_than_two_parents):
        people_with_more_than_two_parents.to_csv(
            RAW_DATA_PATH,
            **write_csv_params
        )


def get_congresspeople_parents_names():
    url = 'http://www2.camara.leg.br/deputados/pesquisa/layouts_deputados_' \
          'biografia?pk={}'

    ids = get_all_congress_people_ids()

    dicts = []
    total = len(ids)
    for i, id in enumerate(ids):
        id = str(id).replace('\n', '').strip()
        try:
            data = requests.get(url.format(id)).content.decode('utf8')
            soup = BeautifulSoup(str(data), 'html.parser')
            bio_details = soup.findAll('div', {'class': 'bioDetalhes'})
            contents_bio_details = extract_contents_from_div(bio_details)
            dict_bio_details = convert_to_dict(contents_bio_details)
            dict_bio_details['id'] = id
            dicts.append(dict_bio_details)
        except IndexError:
            print('Could not parse data')

        msg = 'Processed {} out of {} ({:.2f}%)'
        print(msg.format(i, total, i / total * 100), end='\r')

    df = pd.DataFrame(data=dicts)
    df = df[df['Filiação'].notnull()]
    df = df[~(df['Filiação'].str.contains('Escolaridade'))]
    df['parents_list'] = df['Filiação'].apply(split_names)

    write_formatted_data(df)
    write_raw_data(df)


if __name__ == '__main__':
    get_congresspeople_parents_names()