src/tests/catwalk_tests/test_baselines.py

Summary

Maintainability
A
45 mins
Test Coverage
import numpy as np
import pandas as pd
import pandas.api.types as ptypes

import pytest
from unittest import TestCase
from numpy.testing import assert_array_equal

from triage.component.catwalk.baselines.rankers import PercentileRankOneFeature
from triage.component.catwalk.baselines.rankers import BaselineRankMultiFeature
from triage.component.catwalk.baselines.thresholders import SimpleThresholder
from triage.component.catwalk.baselines.thresholders import get_operator_method
from triage.component.catwalk.baselines.thresholders import OPERATOR_METHODS
from triage.component.catwalk.exceptions import BaselineFeatureNotInMatrix


@pytest.fixture(scope="class")
def data(request):
    X_train = pd.DataFrame( 
        {
            "x1": [0, 1, 2, 56, 25, 8, -3, 89],
            "x2": [0, 23, 1, 6, 5, 3, 18, 7],
            "x3": [1, 12, 5, -6, 2, 5, 3, -3],
            "x4": [6, 13, 4, 5, 35, 6, 43, 74],
            "x6": [400, 400, 400, 300, 300, 300, 300, 300],
            "x7": [1, 12, 12, -6, -6, 5, 5, -3],
        }
    )
    y_train = [0, 1, 0, 1, 1, 1, 3, 0]
    X_test = pd.DataFrame(
        {
            "x1": [4, 14, 0, 6, 25, 8, -3, 4],
            "x2": [6, -1, 1, 24, 5, 3, 18, 39],
            "x3": [1, 7, 4, 57, 2, 5, 3, 2],
            "x4": [7, 3, 6, 39, 35, 6, 43, -6],
            "x6": [305, 305, 305, 305, 401, 401, 401, 401],
            "x7": [1, 7, 7, 57, 2, 5, 3, 2],
        }
    )
    y_test = [1, 3, 0, 0, 0, 0, 0, 1]

    request.cls.data = {
        "X_train": X_train,
        "X_test": X_test,
        "y_train": y_train,
        "y_test": y_test,
    }

@pytest.fixture(scope="class")
def rules(request):
    request.cls.rules = ["x1 > 0", "x2 <= 1"]

def predict_proba_deprecated(ranker, x):
        """ Generate the rank scores and return these.
        """
        # reduce x to the selected set of features
        x = x[ranker.all_feature_names].reset_index(drop=True)

        x = x.sort_values(ranker.all_feature_names, ascending=ranker.all_sort_directions)

        # initialize curr_rank to -1 so the first record will have rank 0 (hence "score"
        # will range from 0 to 1)
        ranks = []
        curr_rank = -1
        prev = []

        # calculate ranks over sorted records, giving ties the same rank
        for rec in x.values:
            if not np.array_equal(prev, rec):
                curr_rank += 1
            ranks.append(curr_rank)
            prev = rec

        # normalize to 0 to 1 range
        x['score'] = [r/max(ranks) for r in ranks]

        # reset back to original sort order, calculate "score" for "0 class"
        scores_1 = x.sort_index()['score'].values
        scores_0 = np.array([1-s for s in scores_1])

        return np.array([scores_0, scores_1]).transpose()

def scores_align_with_ranks(expected_ranks, returned_scores):
    '''
    Helper function to check that scores align with ranks
    correctly for the ranking baselines (e.g., higher ranks
    get higher scores and ties have the same score)
    '''
    df = pd.DataFrame({
        'rank': expected_ranks,
        'score': returned_scores
        }).sort_values('rank', ascending=True)

    curr_rank = None
    curr_score = None

    # Loop through the sorted records to check for any inconsistencies
    for ix, rec in df.iterrows():
        if curr_rank is None:
            curr_rank = rec['rank']
            curr_score = rec['score']
            continue

        if rec['rank'] < curr_rank:
            return RuntimeError('Something has gone wrong with df.sort_values!')
        elif rec['rank'] == curr_rank and rec['score'] != curr_score:
            return False
        elif rec['rank'] > curr_rank and rec['score'] <= curr_score:
            return False

        curr_rank = rec['rank']
        curr_score = rec['score']

    # If we got through the loop without any issues, return True
    return True


def test_scores_align_with_ranks():
    # correct, no ties
    assert scores_align_with_ranks([1,2,3], [0,0.5,1.0])
    # correct, with ties
    assert scores_align_with_ranks([1,2,2,3], [0,0.5,0.5,1.0])
    # incorrect, no ties
    assert not scores_align_with_ranks([1,2,3], [1.0,0.5,0.8])
    # ties with different scores
    assert not scores_align_with_ranks([1,2,2,3], [0,0.5,0.7,1.0])



@pytest.mark.usefixtures("data")
class TestRankOneFeature(TestCase):

    def test_fit(self):
        ranker = PercentileRankOneFeature(feature="x3")
        assert ranker.feature_importances_ is None
        ranker.fit(x=self.data["X_train"], y=self.data["y_train"])
        np.testing.assert_array_equal(
            ranker.feature_importances_, np.array([0, 0, 1, 0, 0, 0])
        )

    def test_ranking_on_unavailable_feature_raises_error(self):
        ranker = PercentileRankOneFeature(feature="x5")
        with self.assertRaises(BaselineFeatureNotInMatrix):
            ranker.fit(x=self.data["X_train"], y=self.data["y_train"])

    def test_predict_proba(self):
        for direction_value in [True, False]:
            ranker = PercentileRankOneFeature(feature="x3", low_value_high_score=direction_value)
            ranker.fit(x=self.data["X_train"], y=self.data["y_train"])
            results = ranker.predict_proba(self.data["X_test"])
            if direction_value:
                expected_ranks = [6, 1, 3, 0, 5, 2, 4, 5]
            else:
               expected_ranks = [0, 5, 3, 6, 1, 4, 2, 1]

            assert scores_align_with_ranks(expected_ranks, results[:,1])


@pytest.mark.usefixtures("data")
class TestRankMultiFeature(TestCase):
    def test_fit(self):
        rules = {'feature': 'x3', 'low_value_high_score': False}
        ranker = BaselineRankMultiFeature(rules=rules)
        assert ranker.feature_importances_ is None
        ranker.fit(x=self.data["X_train"], y=self.data["y_train"])
        np.testing.assert_array_equal(
            ranker.feature_importances_, np.array([0, 0, 1, 0, 0, 0])
        )

    def test_ranking_on_unavailable_feature_raises_error(self):
        rules = [{'feature': 'x5', 'low_value_high_score': False}]
        ranker = BaselineRankMultiFeature(rules=rules)
        with self.assertRaises(BaselineFeatureNotInMatrix):
            ranker.fit(x=self.data["X_train"], y=self.data["y_train"])

    def test_predict_proba_one_feature(self):
        for direction_value in [True, False]:
            rules = {'feature': 'x3', 'low_value_high_score': direction_value}
            ranker = BaselineRankMultiFeature(rules=rules)
            ranker.fit(x=self.data["X_train"], y=self.data["y_train"])
            results = ranker.predict_proba(self.data["X_test"])
            if direction_value:
                expected_ranks = [6, 1, 3, 0, 5, 2, 4, 5]
            else:
                expected_ranks = [0, 5, 3, 6, 1, 4, 2, 1]

            assert scores_align_with_ranks(expected_ranks, results[:,1])

    def test_predict_proba_multi_feature(self):
        rules = [
            {'feature': 'x3', 'low_value_high_score': True},
            {'feature': 'x2', 'low_value_high_score': False}
        ]

        ranker = BaselineRankMultiFeature(rules=rules)
        ranker.fit(x=self.data["X_train"], y=self.data["y_train"])
        results = ranker.predict_proba(self.data["X_test"])

        expected_ranks = [7, 1, 3, 0, 5, 2, 4, 6]

        assert scores_align_with_ranks(expected_ranks, results[:,1])

    def test_predict_proba_no_ties(self):
        for direction_value in [True, False]:
            rules = [{'feature': 'x2', 'low_value_high_score': direction_value}]

            ranker = BaselineRankMultiFeature(rules=rules)
            ranker.fit(x=self.data["X_train"], y=self.data["y_train"])
            results_new = ranker.predict_proba(self.data["X_test"])
            results_deprecated = predict_proba_deprecated(ranker, self.data["X_test"])
            
            assert_array_equal(results_new, results_deprecated)

    
    def test_predict_proba_half_ties(self):
        for direction_value in [True, False]: 
            rules = [{'feature': 'x6', 'low_value_high_score': direction_value}]

            ranker = BaselineRankMultiFeature(rules=rules)
            ranker.fit(x=self.data["X_train"], y=self.data["y_train"])
            results_new = ranker.predict_proba(self.data["X_test"])
            results_deprecated = predict_proba_deprecated(ranker, self.data["X_test"])

            assert_array_equal(results_new, results_deprecated)


    def test_predict_proba_some_ties(self):
        for direction_value in [True, False]: 
            rules = [{'feature': 'x7', 'low_value_high_score': direction_value}]

            ranker = BaselineRankMultiFeature(rules=rules)
            ranker.fit(x=self.data["X_train"], y=self.data["y_train"])
            results_new = ranker.predict_proba(self.data["X_test"])
            results_deprecated = predict_proba_deprecated(ranker, self.data["X_test"])

            assert_array_equal(results_new, results_deprecated)


@pytest.mark.parametrize('operator', OPERATOR_METHODS.keys())
def test_get_operator_method(operator):
    series = pd.Series([1, 2, 3, 4, 5])
    pd_operator = get_operator_method(operator)
    result = getattr(series, pd_operator)(series)
    assert ptypes.is_bool_dtype(result)


@pytest.mark.usefixtures("data", "rules")
class TestSimpleThresholder(TestCase):
    def test__convert_string_rule_to_dict(self):
        thresholder = SimpleThresholder(self.rules, "or")
        results = thresholder._convert_string_rule_to_dict(self.rules[0])
        expected_results = {"feature_name": "x1", "operator": "gt", "threshold": 0}
        assert results == expected_results

    def test_all_feature_names_property(self):
        thresholder = SimpleThresholder(self.rules, "or")
        assert thresholder.all_feature_names == ["x1", "x2"]

    def test_fit(self):
        thresholder = SimpleThresholder(self.rules, "or")
        assert thresholder.feature_importances_ is None
        thresholder.fit(x=self.data["X_train"], y=self.data["y_train"])
        np.testing.assert_array_equal(
            thresholder.feature_importances_, np.array([1, 1, 0, 0, 0, 0])
        )

    def test_rule_with_unavailable_feature_raises_error(self):
        rules = self.rules
        rules.append("x5 == 3")
        thresholder = SimpleThresholder(rules, "or")
        with self.assertRaises(BaselineFeatureNotInMatrix):
            thresholder.fit(x=self.data["X_train"], y=self.data["y_train"])

    def test_predict_proba(self):
        for logical_operator in ["and", "or"]:
            thresholder = SimpleThresholder(self.rules, logical_operator)
            thresholder.fit(x=self.data["X_train"], y=self.data["y_train"])
            results = thresholder.predict_proba(self.data["X_test"])
            if logical_operator == "and":
                expected_results = np.array(
                    [[0, 1, 0, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0, 0, 0]]
                ).transpose()
            elif logical_operator == "or":
                expected_results = np.array(
                    [[1, 1, 1, 1, 1, 1, 0, 1], [1, 1, 1, 1, 1, 1, 0, 1]]
                ).transpose()
            np.testing.assert_array_equal(results, expected_results)