src/tests/architect_tests/test_integration.py
import os
from datetime import datetime
from tempfile import TemporaryDirectory
import yaml
import testing.postgresql
from triage import create_engine
from triage.component.results_schema import Base
from triage.component.timechop import Timechop
from triage.component.architect.features import (
FeatureGenerator,
FeatureDictionaryCreator,
FeatureGroupCreator,
FeatureGroupMixer,
)
from triage.component.architect.label_generators import LabelGenerator
from triage.component.architect.entity_date_table_generators import EntityDateTableGenerator
from triage.component.architect.planner import Planner
from triage.component.architect.builders import MatrixBuilder
from triage.component.catwalk.storage import ProjectStorage
from tests.utils import sample_config
def populate_source_data(db_engine):
cat_complaints = [
(1, "2010-10-01", 5),
(1, "2011-10-01", 4),
(1, "2011-11-01", 4),
(1, "2011-12-01", 4),
(1, "2012-02-01", 5),
(1, "2012-10-01", 4),
(1, "2013-10-01", 5),
(2, "2010-10-01", 5),
(2, "2011-10-01", 5),
(2, "2011-11-01", 4),
(2, "2011-12-01", 4),
(2, "2012-02-01", 6),
(2, "2012-10-01", 5),
(2, "2013-10-01", 6),
(3, "2010-10-01", 5),
(3, "2011-10-01", 3),
(3, "2011-11-01", 4),
(3, "2011-12-01", 4),
(3, "2012-02-01", 4),
(3, "2012-10-01", 3),
(3, "2013-10-01", 4),
]
dog_complaints = [
(1, "2010-10-01", 5),
(1, "2011-10-01", 4),
(1, "2011-11-01", 4),
(1, "2011-12-01", 4),
(1, "2012-02-01", 5),
(1, "2012-10-01", 4),
(1, "2013-10-01", 5),
(2, "2010-10-01", 5),
(2, "2011-10-01", 5),
(2, "2011-11-01", 4),
(2, "2011-12-01", 4),
(2, "2012-02-01", 6),
(2, "2012-10-01", 5),
(2, "2013-10-01", 6),
(3, "2010-10-01", 5),
(3, "2011-10-01", 3),
(3, "2011-11-01", 4),
(3, "2011-12-01", 4),
(3, "2012-02-01", 4),
(3, "2012-10-01", 3),
(3, "2013-10-01", 4),
]
events = [
(1, 1, "2011-01-01"),
(1, 1, "2011-06-01"),
(1, 1, "2011-09-01"),
(1, 1, "2012-01-01"),
(1, 1, "2012-01-10"),
(1, 1, "2012-06-01"),
(1, 1, "2013-01-01"),
(1, 0, "2014-01-01"),
(1, 1, "2015-01-01"),
(2, 1, "2011-01-01"),
(2, 1, "2011-06-01"),
(2, 1, "2011-09-01"),
(2, 1, "2012-01-01"),
(2, 1, "2013-01-01"),
(2, 1, "2014-01-01"),
(2, 1, "2015-01-01"),
(3, 0, "2011-01-01"),
(3, 0, "2011-06-01"),
(3, 0, "2011-09-01"),
(3, 0, "2012-01-01"),
(3, 0, "2013-01-01"),
(3, 1, "2014-01-01"),
(3, 0, "2015-01-01"),
]
db_engine.execute(
"""create table cat_complaints (
entity_id int,
as_of_date date,
cat_sightings int
)"""
)
for complaint in cat_complaints:
db_engine.execute(
"insert into cat_complaints values (%s, %s, %s)", complaint)
db_engine.execute(
"""create table dog_complaints (
entity_id int,
as_of_date date,
dog_sightings int
)"""
)
for complaint in dog_complaints:
db_engine.execute(
"insert into dog_complaints values (%s, %s, %s)", complaint)
db_engine.execute(
"""create table events (
entity_id int,
outcome int,
outcome_date date
)"""
)
for event in events:
db_engine.execute("insert into events values (%s, %s, %s)", event)
def basic_integration_test(
cohort_names,
feature_group_create_rules,
feature_group_mix_rules,
expected_matrix_multiplier,
expected_group_lists,
):
with testing.postgresql.Postgresql() as postgresql:
db_engine = create_engine(postgresql.url())
Base.metadata.create_all(db_engine)
populate_source_data(db_engine)
with TemporaryDirectory() as temp_dir:
chopper = Timechop(
feature_start_time=datetime(2010, 1, 1),
feature_end_time=datetime(2014, 1, 1),
label_start_time=datetime(2011, 1, 1),
label_end_time=datetime(2014, 1, 1),
model_update_frequency="1year",
training_label_timespans=["6months"],
test_label_timespans=["6months"],
training_as_of_date_frequencies="1day",
test_as_of_date_frequencies="3months",
max_training_histories=["1months"],
test_durations=["1months"],
)
entity_date_table_generator = EntityDateTableGenerator(
db_engine=db_engine,
entity_date_table_name="cohort_abcd",
query="select distinct(entity_id) from events"
)
label_generator = LabelGenerator(
db_engine=db_engine, query=sample_config("query")[
"label_config"]["query"]
)
feature_generator = FeatureGenerator(
db_engine=db_engine, features_schema_name="features", replace=True
)
feature_dictionary_creator = FeatureDictionaryCreator(
db_engine=db_engine, features_schema_name="features"
)
feature_group_creator = FeatureGroupCreator(
feature_group_create_rules)
feature_group_mixer = FeatureGroupMixer(feature_group_mix_rules)
project_storage = ProjectStorage(temp_dir)
planner = Planner(
feature_start_time=datetime(2010, 1, 1),
label_names=["outcome"],
label_types=["binary"],
cohort_names=cohort_names,
user_metadata={},
)
builder = MatrixBuilder(
engine=db_engine,
db_config={
"features_schema_name": "features",
"labels_schema_name": "public",
"labels_table_name": "labels",
"cohort_table_name": "cohort_abcd",
},
experiment_hash=None,
matrix_storage_engine=project_storage.matrix_storage_engine(),
replace=True,
)
# chop time
split_definitions = chopper.chop_time()
num_split_matrices = sum(
1 + len(split["test_matrices"]) for split in split_definitions
)
# generate as_of_times for feature/label/state generation
all_as_of_times = []
for split in split_definitions:
all_as_of_times.extend(split["train_matrix"]["as_of_times"])
for test_matrix in split["test_matrices"]:
all_as_of_times.extend(test_matrix["as_of_times"])
all_as_of_times = list(set(all_as_of_times))
# generate entity_date state table
entity_date_table_generator.generate_entity_date_table(
as_of_dates=all_as_of_times)
# create labels table
label_generator.generate_all_labels(
labels_table="labels",
as_of_dates=all_as_of_times,
label_timespans=["6months"],
)
# create feature table tasks
# we would use FeatureGenerator#create_all_tables but want to use
# the tasks dict directly to create a feature dict
aggregations = feature_generator.aggregations(
feature_aggregation_config=[
{
"prefix": "cat",
"from_obj": "cat_complaints",
"knowledge_date_column": "as_of_date",
"aggregates": [
{
"quantity": "cat_sightings",
"metrics": ["count", "avg"],
"imputation": {"all": {"type": "mean"}},
}
],
"intervals": ["1y"],
},
{
"prefix": "dog",
"from_obj": "dog_complaints",
"knowledge_date_column": "as_of_date",
"aggregates_imputation": {
"count": {"type": "constant", "value": 7},
"sum": {"type": "mean"},
"avg": {"type": "zero"},
},
"aggregates": [
{"quantity": "dog_sightings",
"metrics": ["count", "avg"]}
],
"intervals": ["1y"],
},
],
feature_dates=all_as_of_times,
state_table=entity_date_table_generator.entity_date_table_name,
)
feature_table_agg_tasks = feature_generator.generate_all_table_tasks(
aggregations, task_type="aggregation"
)
# create feature aggregation tables
feature_generator.process_table_tasks(feature_table_agg_tasks)
feature_table_imp_tasks = feature_generator.generate_all_table_tasks(
aggregations, task_type="imputation"
)
# create feature imputation tables
feature_generator.process_table_tasks(feature_table_imp_tasks)
# build feature dictionaries from feature tables and
# subsetting config
master_feature_dict = feature_dictionary_creator.feature_dictionary(
feature_table_names=feature_table_imp_tasks.keys(),
index_column_lookup=feature_generator.index_column_lookup(
aggregations),
)
feature_dicts = feature_group_mixer.generate(
feature_group_creator.subsets(master_feature_dict)
)
# figure out what matrices need to be built
_, matrix_build_tasks = planner.generate_plans(
split_definitions, feature_dicts
)
# go and build the matrices
builder.build_all_matrices(matrix_build_tasks)
# super basic assertion: did matrices we expect get created?
matrices_records = list(
db_engine.execute(
"""select matrix_uuid, num_observations, matrix_type
from triage_metadata.matrices
"""
)
)
matrix_directory = os.path.join(temp_dir, "matrices")
matrices = [path for path in os.listdir(
matrix_directory) if ".csv" in path]
metadatas = [
path for path in os.listdir(matrix_directory) if ".yaml" in path
]
assert len(matrices) == num_split_matrices * \
expected_matrix_multiplier
assert len(metadatas) == num_split_matrices * \
expected_matrix_multiplier
assert len(matrices) == len(matrices_records)
feature_group_name_lists = []
for metadata_path in metadatas:
with open(os.path.join(matrix_directory, metadata_path)) as f:
metadata = yaml.load(f, Loader=yaml.Loader)
feature_group_name_lists.append(metadata["feature_groups"])
for matrix_uuid, num_observations, matrix_type in matrices_records:
assert matrix_uuid in matrix_build_tasks # the hashes of the matrices
assert type(num_observations) is int
assert matrix_type == matrix_build_tasks[matrix_uuid]["matrix_type"]
def deep_unique_tuple(l):
return set([tuple(i) for i in l])
assert deep_unique_tuple(feature_group_name_lists) == deep_unique_tuple(
expected_group_lists
)
def test_integration_simple():
basic_integration_test(
cohort_names=["mycohort"],
feature_group_create_rules={"prefix": ["cat", "dog"]}, # Will be set by defaults.py
feature_group_mix_rules=["all"],
# only looking at one state, and one feature group.
# so we don't multiply timechop's output by anything
expected_matrix_multiplier=1,
expected_group_lists=[["prefix: cat", "prefix: dog"]],
)
def test_integration_feature_grouping():
basic_integration_test(
cohort_names=["mycohort"],
feature_group_create_rules={"prefix": ["cat", "dog"]},
feature_group_mix_rules=["leave-one-out", "all"],
# 3 feature groups (cat/dog/cat+dog),
# so the # of matrices should be each train/test split *3
expected_matrix_multiplier=3,
expected_group_lists=[
["prefix: cat"],
["prefix: cat", "prefix: dog"],
["prefix: dog"],
],
)