RadarParlamentar-MES2017-1/radar

View on GitHub
radar_parlamentar/analises/analise.py

Summary

Maintainability
F
3 days
Test Coverage
# coding=utf8

# Copyright (C) 2012, Leonardo Leite, Saulo Trento, Diego Rabatone,
# Guilherme Januário
#
# This file is part of Radar Parlamentar.
#
# Radar Parlamentar is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Radar Parlamentar is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radar Parlamentar.  If not, see <http://www.gnu.org/licenses/>.


"""Módulo analise"""


from math import hypot, atan2, pi
from .models import AnalisePeriodo, AnaliseTemporal
from modelagem import models
from modelagem import utils
from analises import filtro
import logging
import numpy
from . import pca
import copy
# import time # timetrack

logger = logging.getLogger("radar")


class AnalisadorTemporal:

    """O AnalisadorTemporal cria objetos do tipo AnaliseTemporal, o qual
    contém uma lista de objetos AnalisePeriodo.

    Uma análise de um período é uma análise de componentes principais dos
    votos de um dado período, por exemplo do ano de 2010. Para fazer um gráfico
    animado, é preciso fazer análises de dois ou mais períodos consecutivos,
    por exemplo 2010, 2011 e 2012, e rotacionar adequadamente os resultados
    para que os partidos globalmente caminhem o mínimo possível de um lado para
    o outro (vide algoritmo de rotação).

    A classe AnalisadorTemporal tem métodos para criar os objetos
    AnalisadorPeriodo e fazer as análises.

    Atributos:
        data_inicio e data_fim -- strings no formato 'aaaa-mm-dd'.
        analises_periodo -- lista de objetos da classe AnalisePeriodo
        palavras_chave -- lista de strings para serem utilizadas na filtragem
        de votações
        votacoes -- lista de objetos do tipo Votacao para serem usados
        na análise se não for especificado, procura votações na base de dados
        de acordo data_inicio, data_fim e palavras_chave.
    """

    def __init__(self, casa_legislativa, periodicidade,
                 palavras_chave=[], votacoes=[]):
        self.casa_legislativa = casa_legislativa
        recuperador_votacoes = utils.PeriodosRetriever(
            self.casa_legislativa, periodicidade)
        self.periodos = recuperador_votacoes.get_periodos()
        self.ini = self.periodos[0].ini
        self.fim = self.periodos[len(self.periodos) - 1].fim
        self.periodicidade = periodicidade
        self.analises_periodo = []
        self.palavras_chave = palavras_chave
        self.votacoes = []
        self.total_votacoes = 0
        self.json = ""
        self.chefes_executivos = []

    def get_analise_temporal(self):
        """Retorna instância de AnaliseTemporal"""
        if not self.analises_periodo:
            self._faz_analises()
        analise_temporal = AnaliseTemporal()
        analise_temporal.casa_legislativa = self.casa_legislativa
        analise_temporal.periodicidade = self.periodicidade
        analise_temporal.analises_periodo = self.analises_periodo
        analise_temporal.votacoes = self.votacoes
        analise_temporal.chefes_executivos = self.chefes_executivos
        analise_temporal.total_votacoes = self.total_votacoes
        analise_temporal.palavras_chaves = self.palavras_chave
        return analise_temporal

    def _faz_analises(self):
        """Método da classe AnalisadorTemporal que cria os objetos
        AnalisadorPeriodo e faz as análises."""

        for periodo in self.periodos:
            logger.info("Analisando periodo %s a %s." %
                        (str(periodo.ini), str(periodo.fim)))
            analisadorPeriodo = AnalisadorPeriodo(self.casa_legislativa,
                                                  periodo, self.votacoes,
                                                  self.palavras_chave)
            if analisadorPeriodo.votacoes:
                logger.info("O periodo possui %d votações." %
                            len(analisadorPeriodo.votacoes))
                analisePeriodo = analisadorPeriodo.analisa()
                self.analises_periodo.append(analisePeriodo)
                self.total_votacoes += len(analisadorPeriodo.votacoes)
            else:
                logger.info("O periodo não possui nenhuma votação.")

        # Rotaciona/espelha cada análise baseado em sua análise anterior
        logger.info("Rotacionando...")
        # a partir da segunda analise
        for i in range(1, len(self.analises_periodo)):
            rotacionador = Rotacionador(
                self.analises_periodo[i], self.analises_periodo[i - 1])
            analiseRotacionada = rotacionador.espelha_ou_roda()
            self.analises_periodo[i] = analiseRotacionada
        logger.info("Rotacionado")

    def votacoes_filtradas(self):
        votacoes_filtradas = []
        for periodo in self.periodos:
            analisadorPeriodo = AnalisadorPeriodo(self.casa_legislativa,
                                                  periodo, self.votacoes,
                                                  self.palavras_chave)
            votacoes_filtradas.extend(analisadorPeriodo._inicializa_votacoes())
        return votacoes_filtradas


class AnalisadorPeriodo:

    def __init__(self, casa_legislativa, periodo,
                 votacoes=[], palavras_chave=[]):
        """Argumentos:
            casa_legislativa -- objeto do tipo CasaLegislativa;
            somente votações desta casa serão analisados.
            periodo -- objeto do tipo PeriodoCasaLegislativa;
                       sem periodo, a análise é feita sobre todas as votações.
            votacoes -- lista de objetos do tipo Votacao para serem usados na
            análise se não for especificado, procura votações na base de dados
                        de acordo data_inicio, data_fim e palavras_chave.
            palavras_chave -- lista de strings para serem usadas na filtragem
            das votações
        """
        self.casa_legislativa = casa_legislativa
        self.periodo = periodo
        self.ini = periodo.ini if periodo is not None else None
        self.fim = periodo.fim if periodo is not None else None
        self.partidos = self.casa_legislativa.partidos()
        self.parlamentares = self.casa_legislativa.parlamentares()
        self.votacoes = votacoes
        self.palavras_chave = palavras_chave
        if not self.votacoes:
            self._inicializa_votacoes()

        self.num_votacoes = len(self.votacoes)
        self.analise_ja_feita = False  # quando a analise for feita, vale True.
        # em graus, eventual rotação feita por self.espelha_ou_roda()
        self.theta = 0

        # calculados por self._inicializa_vetores():
        self.vetores_votacao = []
        self.vetores_presencas = []
        self.tamanhos_partidos = {}
        self.coordenadas_partidos = {}

        # array de partido.nome's, um por parlamentar
        self.partido_do_parlamentar = []

        # parlamentar.id => {True,False},
        # sendo True se estava presente no periodo.
        self.presencas_parlamentares = {}

        # partido.nome => lista de parlamentares do partido
        # (independente de periodo).
        self.parlamentares_por_partido = {}

        self.pca_parlamentares = None
        self.coordenadas_parlamentares = {}

        # lista de chefes_executivos
        self.chefes_executivos = self._inicializa_chefes_executivo()

    def _inicializa_votacoes(self):
        """Pega votações deste período no banco de dados filtrando por palavras
        chave e seta a lista self.votacoes"""
        filtro_votacao = filtro.FiltroVotacao(
            self.casa_legislativa, self.periodo, self.palavras_chave)
        self.votacoes = filtro_votacao.filtra_votacoes()
        return self.votacoes

    def _inicializa_chefes_executivo(self):
        """Pega chefes executivo deste período no banco de dados filtrando pela casa
        legislativa e seta a lista self.chefes_executivo"""
        filtro_chefe = filtro.FiltroChefesExecutivo(
            self.casa_legislativa, self.periodo)
        chefes_executivos = filtro_chefe.filtra_chefes_executivo()
        return chefes_executivos

    def analisa(self):
        """Retorna instância de AnalisePeriodo"""
        self._calcula_parlamentares_2d()
        self._analisa_partidos()
        analisePeriodo = AnalisePeriodo()
        analisePeriodo.casa_legislativa = self.casa_legislativa
        analisePeriodo.periodo = self.periodo
        analisePeriodo.partidos = self.partidos
        analisePeriodo.votacoes = self.votacoes
        analisePeriodo.num_votacoes = self.num_votacoes
        analisePeriodo.pca = self.pca
        analisePeriodo.tamanhos_partidos = self.tamanhos_partidos
        analisePeriodo.coordenadas_parlamentares = \
            self.coordenadas_parlamentares
        analisePeriodo.coordenadas_partidos = self.coordenadas_partidos
        analisePeriodo.parlamentares_por_partido = \
            self.parlamentares_por_partido
        analisePeriodo.chefes_executivos = self.chefes_executivos
        return analisePeriodo

    def _inicializa_vetores(self):
        construtorMatrizes = ConstrutorDeMatrizesDeDados(
            self.votacoes, self.partidos, self.parlamentares)
        construtorMatrizes.gera_matrizes()
        self.vetores_votacao = construtorMatrizes.matriz_votacoes
        self.vetores_presencas = construtorMatrizes.matriz_presencas
        self.partido_do_parlamentar = construtorMatrizes.partido_do_parlamentar

    def _calcula_parlamentares_2d(self):
        """Retorna mapa com as coordenadas de parlamentares no plano 2D formado
        pelas duas primeiras componentes principais.

        A chave do mapa é o id do parlamentar (int) e o valor é uma lista
        de duas posições [x,y].
        """
        if not self.analise_ja_feita:
            self.coordenadas_parlamentares = self._pca_parlamentares()
            if self.num_votacoes > 1:
                for partido in list(self.coordenadas_parlamentares.keys()):
                    self.coordenadas_parlamentares[partido] = (
                        self.coordenadas_parlamentares[partido])[0:2]
            # se só tem 1 votação, só tem 1 C.P. Jogar tudo zero na segunda CP.
            elif self.num_votacoes == 1:
                for partido in list(self.coordenadas_parlamentares.keys()):
                    self.coordenadas_parlamentares[partido] = numpy.array(
                        [(self.coordenadas_parlamentares[partido])[0], 0.])
            # Zero votações no período. Os partidos são todos iguais. Tudo
            # zero.
            else:
                for parlamentar in list(self.coordenadas_parlamentares.keys()):
                    self.coordenadas_parlamentares[
                        parlamentar] = numpy.array([0., 0.])
        return self.coordenadas_parlamentares

    def _pca_parlamentares(self):
        """Roda a análise de componentes principais por parlamentares.

        Retorna um dicionário no qual as chaves são os ids dos parlamentares
        e o valor de cada chave é um vetor com as n dimensões da análise pca
        """
        if not self.pca_parlamentares:
            if not self.vetores_votacao:
                self._inicializa_vetores()
            ids_parlamentares_presentes = \
                self._listar_indices_de_parlamentares_presentes()
            matriz = self.vetores_votacao
            # exclui parlamentares ausentes em todas as votações do período
            matriz = matriz[ids_parlamentares_presentes, :]
            matriz = matriz - matriz.mean(axis=0)  # centraliza dados
            self.pca = pca.PCA(matriz, fraction=1)  # faz o pca
            self._preenche_pca_de_parlamentares_nulos(
                ids_parlamentares_presentes)
            logger.info("PCA terminada com sucesso. ini=%s, fim=%s" %
                        (str(self.ini), str(self.fim)))
        # Criar dicionario a ser retornado:
        dicionario = {}
        for parlamentar, vetor in zip(self.parlamentares, self.pca.U):
            dicionario[parlamentar.id] = vetor
        return dicionario

    def _listar_indices_de_parlamentares_presentes(self):
        return self.vetores_presencas.sum(axis=1).nonzero()[0].tolist()

    def _preenche_pca_de_parlamentares_nulos(self, ipnn):
        """Recupera parlamentares ausentes no período, atribuindo NaN em todas
        as dimensões no espaço das componentes principais"""
        U2 = self.pca.U.copy()  # Salvar resultado da pca em U2
        matriz_de_nans = numpy.zeros(
            (len(self.parlamentares), self.num_votacoes)) * numpy.nan
        self.pca.U = matriz_de_nans
        ip = -1
        ipnn2 = -1
        for p in self.parlamentares:
            ip += 1
            if ip in ipnn:  # Se este parlamentar for não nulo
                ipnn2 += 1
                cpmaximo = U2.shape[1]
                # colocar nesta linha os valores que eu salvei antes em U2
                self.pca.U[ip, 0:cpmaximo] = U2[ipnn2, :]
                # aproveitar para preencher presencas_parlamentares
                # (parlamentar.id => True / False)
                self.presencas_parlamentares[p.id] = True
            else:
                self.pca.U[ip, :] = numpy.zeros(
                    (1, self.num_votacoes)) * numpy.NaN
                self.presencas_parlamentares[p.id] = False

    def _analisa_partidos(self):
        coordenadas_parlamentares = self.pca.U[:, 0:2]
        if coordenadas_parlamentares.shape[1] == 1:
            coordenadas_parlamentares = numpy.append(
                coordenadas_parlamentares, numpy.zeros(
                    [len(coordenadas_parlamentares), 1]), 1)
        analisador_partidos = AnalisadorPartidos(
            coordenadas_parlamentares, self.parlamentares, self.partidos,
            self.vetores_presencas, self.partido_do_parlamentar)
        analisador_partidos.analisa_partidos()
        self.coordenadas_partidos = analisador_partidos.coordenadas_partidos
        self.tamanhos_partidos = analisador_partidos.tamanhos_partidos
        self.parlamentares_por_partido = \
            analisador_partidos.parlamentares_por_partido


class ConstrutorDeMatrizesDeDados:

    def __init__(self, votacoes, partidos, parlamentares):
        self.votacoes = votacoes
        self.partidos = partidos
        self.parlamentares = parlamentares
        self.matriz_votacoes = numpy.zeros(
            (len(self.parlamentares), len(self.votacoes)))
        self.matriz_presencas = numpy.zeros(
            (len(self.parlamentares), len(self.votacoes)))
        # array de partido.nome's, um por parlamentar
        self.partido_do_parlamentar = []
        # chave eh nome do partido, e valor eh VotoPartido
        self._dic_partido_votos = {}
        self._dic_parlamentares_votos = {}  # parlamentar.id => voto.opcao

    def gera_matrizes(self):
        """Cria duas matrizes:
            matriz_votacoes -- de votações (por parlamentares),
            matriz_presencas -- presenças de parlamentares

        Os valores possíveis na matriz de votações são:
        -1 (não), 0 (abtencão/falta) e 1 (sim).
        Os valores possíveis na matriz de presenças são:
        0 (falta) e 1 (presente).
        As linhas indexam parlamentares. As colunas indexam as votações.
        A ordenação das linhas segue a ordem de self.partidos ou
        self.parlamentares, e a ordenação das colunas segue a ordem
        de self.votacoes.

        Retorna matriz_votacoes
        """
        iv = -1  # índice votação
        for votacao in self.votacoes:
            iv += 1
            self._construtor_dicionario_parlamentares_votos(votacao)
            self._preenche_matrizes(votacao, iv)
        return self.matriz_votacoes

    def _construtor_dicionario_parlamentares_votos(self, votacao):
        # com o "select_related" fazemos uma query eager
        votos = votacao.voto_set.select_related(
            'opcao', 'parlamentar__id').all()
        for voto in votos:
            self._dic_parlamentares_votos[voto.parlamentar.id] = voto.opcao

    def _preenche_matrizes(self, votacao, iv):
        ip = -1  # indice parlamentares
        for parlamentar in self.parlamentares:
            ip += 1
            self.partido_do_parlamentar.append(parlamentar.partido.nome)
            if parlamentar.id in self._dic_parlamentares_votos:
                opcao = self._dic_parlamentares_votos[parlamentar.id]
                self.matriz_votacoes[ip][iv] = \
                    self._converter_opcao_para_valor(opcao)
                if (opcao == models.AUSENTE):
                    self.matriz_presencas[ip][iv] = 0.
                else:
                    self.matriz_presencas[ip][iv] = 1.
            else:
                self.matriz_votacoes[ip][iv] = 0.
                self.matriz_presencas[ip][iv] = 0.

    def _converter_opcao_para_valor(self, opcao):
        if opcao == 'SIM':
            return 1.
        if opcao == 'NAO':
            return -1.
        return 0.


class AnalisadorPartidos:

    """Analisa um partido em um período"""

    def __init__(self, coordenadas_parlamentares, parlamentares, partidos,
                 matriz_presencas, partido_do_parlamentar):
        self.coordenadas_parlamentares = coordenadas_parlamentares
        self.parlamentares = parlamentares
        self.partidos = partidos
        self.matriz_presencas = matriz_presencas
        self.partido_do_parlamentar = partido_do_parlamentar
        self.coordenadas_partidos = {}
        self.tamanhos_partidos = {}
        self.parlamentares_por_partido = {}

    def analisa_partidos(self):
        """Gera as seguintes saídas:
            self.coordenadas_partido # partido => [x,y]
            self.tamanhos_partidos # partido => int
            self.parlamentares_por_partido # partido => parlamentares
        """
        for ip in range(0, len(self.partidos)):
            indices_deste_partido = []
            for il in range(0, len(self.parlamentares)):
                if self.partido_do_parlamentar[il] == self.partidos[ip].nome:
                    indices_deste_partido.append(il)
            coordenadas_medias = self._media_sem_nans(
                self.coordenadas_parlamentares[indices_deste_partido, :])
            tamanho_partido = len(self.matriz_presencas[
                                  indices_deste_partido, :].sum(
                                  axis=1).nonzero()[0])
            self.tamanhos_partidos[self.partidos[ip]] = tamanho_partido
            self.parlamentares_por_partido[self.partidos[ip].nome] = [
                self.parlamentares[x] for x in indices_deste_partido]
            self.coordenadas_partidos[self.partidos[ip]] = coordenadas_medias

    def _media_sem_nans(self, array_numpy):
        """ Retorna média por colunas de uma array numpy,
        desconsiderando os nans."""
        mdat = numpy.ma.masked_array(array_numpy, numpy.isnan(array_numpy))
        mm = numpy.mean(mdat, axis=0)
        return mm.filled(numpy.nan)


class Rotacionador:

    def __init__(self, analisePeriodo, analisePeriodoReferencia):
        self.analisePeriodo = analisePeriodo
        self.analisePeriodoReferencia = analisePeriodoReferencia

    def _espelhar_coordenadas(self, lista_coordenadas):
        for indice, coords in list(lista_coordenadas.items()):
            lista_coordenadas[indice] = numpy.dot(
                coords, numpy.array([[-1., 0.], [0., 1.]]))

    def _rotacionar_coordenadas(self, theta, lista_coordenadas):
        for indice, coords in list(lista_coordenadas.items()):
            lista_coordenadas[indice] = numpy.dot(
                coords, self._gerar_matriz_rotacao(theta))

    def _energia(self, dados_fixos, dados_meus, por_partido,
                 graus=0, espelho=0):
        """Calcula energia envolvida no movimento entre dois instantes
        (fixo e meu), onde o meu é rodado (entre 0 e 360 graus),
        e primeiro eixo multiplicado por -1 se espelho=1.
        Ver pdf intitulado "Solução Analítica para o Problema de
        Rotação dos Eixos de Representação dos Partidos no Radar
        Parlamentar" (algoritmo_rotacao.pdf)."""
        e = 0
        dados_meus = dados_meus.copy()
        if espelho == 1:
            self._espelhar_coordenadas(dados_meus)
        if graus != 0:
            for partido, coords in list(dados_meus.items()):
                dados_meus[partido] = numpy.dot(
                    coords, self._gerar_matriz_rotacao(graus))

        if por_partido:
            for p in dados_meus:
                e += self._retornar_zero_se_nan(
                    numpy.dot(dados_fixos[p] - dados_meus[p],
                              dados_fixos[p] - dados_meus[p]) *
                    self.analisePeriodo.tamanhos_partidos[p])
        else:
            for l in dados_meus:
                e += self._retornar_zero_se_nan(
                    numpy.dot(dados_fixos[l] - dados_meus[
                        l], dados_fixos[l] - dados_meus[l]))
        return e

    def _polar(self, x, y, deg=0):        # radian if deg=0; degree if deg=1
        """
        Convert from rectangular (x,y) to polar (r,w)
        r = sqrt(x^2 + y^2)
        w = arctan(y/x) = [-\pi,\pi] = [-180,180]
        """
        if deg:
            return hypot(x, y), 180.0 * atan2(y, x) / pi
        else:
            return hypot(x, y), atan2(y, x)

    def _gerar_matriz_rotacao(self, graus):
        """ Retorna matriz de rotação 2x2 que roda os eixos em graus (0 a 360)
        no sentido anti-horário (como se os pontos girassem no sentido
        horário em torno de eixos fixos)."""
        graus = float(graus)
        rad = numpy.pi * graus / 180.
        c = numpy.cos(rad)
        s = numpy.sin(rad)
        return numpy.array([[c, -s], [s, c]])

    def _retornar_zero_se_nan(self, x):
        """Retorna zero sempre que x não for um número (NaN = Not a Number)
        Caso x seja um número, retorna x."""
        if numpy.isnan(x):
            return 0
        else:
            return x

    def espelha_ou_roda(self, por_partido=False, so_espelha=True):
        """Retorna nova AnalisePeriodo com coordenadas rotacionadas
        se por_partido = True:
        a operacao minimiza o quanto os partidos caminharam
        se por_partido = False:
        minimiza o quanto os parlamentares em si caminham
        se so_espelha = True:
        nao se faz rotacao, apenas espelha as componentes se necessario.
        """
        if por_partido:
            dados_meus = self.analisePeriodo.coordenadas_partidos
            dados_fixos = self.analisePeriodoReferencia.coordenadas_partidos
        else:
            dados_meus = self.analisePeriodo.coordenadas_parlamentares
            dados_fixos = \
                self.analisePeriodoReferencia.coordenadas_parlamentares
        epsilon = 0.001

        if not so_espelha:
            logger.info("Calculando ângulo teta 1 e ângulo teta 2...")
            numerador = 0
            denominador = 0
            for indice, coords in list(dados_meus.items()):
                meu_polar = self._polar(coords[0], coords[1], 0)
                alheio_polar = self._polar(
                    dados_fixos[indice][0], dados_fixos[indice][1], 0)
                tamanho = self.analisePeriodo.tamanhos_partidos[
                    indice] if por_partido else 1
                numerador += self._retornar_zero_se_nan(
                    tamanho * meu_polar[0] * alheio_polar[0] * numpy.sin(
                        alheio_polar[1]))
                denominador += self._retornar_zero_se_nan(
                    tamanho * meu_polar[0] * alheio_polar[0] * numpy.cos(
                        alheio_polar[1]))
            if denominador < epsilon and denominador > -epsilon:
                angulo_teta1 = 90
                angulo_teta2 = 270
            else:
                angulo_teta1 = numpy.arctan(
                    numerador / denominador) * 180 / 3.141592
                angulo_teta2 = angulo_teta1 + 180
            logger.info("angulo_teta 1 = " +
                        str(angulo_teta1) + "; angulo_teta2 = " +
                        str(angulo_teta2))
        else:
            angulo_teta1 = 0
            angulo_teta2 = 180

        ex = numpy.array([self._energia(dados_fixos, dados_meus, por_partido,
                          graus=angulo_teta1, espelho=0),
                          self._energia(dados_fixos, dados_meus, por_partido,
                          graus=angulo_teta2, espelho=0),
                          self._energia(dados_fixos, dados_meus, por_partido,
                          graus=angulo_teta1, espelho=1),
                          self._energia(dados_fixos, dados_meus, por_partido,
                          graus=angulo_teta2, espelho=1)])
        logger.info(ex)

        dados_partidos = self.analisePeriodo.coordenadas_partidos
        dados_parlamentares = self.analisePeriodo.coordenadas_parlamentares
        ganhou = ex.argmin()
        campeao = [0, 0]
        if ganhou >= 2:  # espelhar
            campeao[0] = 1
            self._espelhar_coordenadas(dados_partidos)
            self._espelhar_coordenadas(dados_parlamentares)
        if ganhou == 0 or ganhou == 2:  # girar de angulo_te1
            campeao[1] = angulo_teta1
        else:
            campeao[1] = angulo_teta2
        self._rotacionar_coordenadas(campeao[1], dados_partidos)
        self._rotacionar_coordenadas(campeao[1], dados_parlamentares)

        self.theta = campeao[1]
        logger.info("campeao = [espelha,theta] = " + str(campeao))

        analiseRotacionada = copy.copy(self.analisePeriodo)
        analiseRotacionada.coordenadas_partidos = dados_partidos
        analiseRotacionada.coordenadas_parlamentares = dados_parlamentares

        return analiseRotacionada