.github/scripts/mr_publish_results.py

Summary

Maintainability
A
1 hr
Test Coverage
# Send model regression test results to Datadog
# with a summary of all test results.
# Also write them into a report file.
import copy
import datetime
import json
import os
from typing import Any, Dict, List, Text, Tuple

from datadog_api_client.v1 import ApiClient, Configuration
from datadog_api_client.v1.api.metrics_api import MetricsApi
from datadog_api_client.v1.model.metrics_payload import MetricsPayload
from datadog_api_client.v1.model.point import Point
from datadog_api_client.v1.model.series import Series

DD_ENV = "rasa-regression-tests"
DD_SERVICE = "rasa"
METRIC_RUNTIME_PREFIX = "rasa.perf.benchmark."
METRIC_ML_PREFIX = "rasa.perf.ml."
CONFIG_REPOSITORY = "training-data"

TASK_MAPPING = {
    "intent_report.json": "intent_classification",
    "CRFEntityExtractor_report.json": "entity_prediction",
    "DIETClassifier_report.json": "entity_prediction",
    "response_selection_report.json": "response_selection",
    "story_report.json": "story_prediction",
}

METRICS = {
    "test_run_time": "TEST_RUN_TIME",
    "train_run_time": "TRAIN_RUN_TIME",
    "total_run_time": "TOTAL_RUN_TIME",
}

MAIN_TAGS = {
    "config": "CONFIG",
    "dataset": "DATASET_NAME",
}

OTHER_TAGS = {
    "config_repository_branch": "DATASET_REPOSITORY_BRANCH",
    "dataset_commit": "DATASET_COMMIT",
    "accelerator_type": "ACCELERATOR_TYPE",
    "type": "TYPE",
    "index_repetition": "INDEX_REPETITION",
    "host_name": "HOST_NAME",
}

GIT_RELATED_TAGS = {
    "pr_id": "PR_ID",
    "pr_url": "PR_URL",
    "github_event": "GITHUB_EVENT_NAME",
    "github_run_id": "GITHUB_RUN_ID",
    "github_sha": "GITHUB_SHA",
    "workflow": "GITHUB_WORKFLOW",
}


def create_dict_of_env(name_to_env: Dict[Text, Text]) -> Dict[Text, Text]:
    return {name: os.environ[env_var] for name, env_var in name_to_env.items()}


def _get_is_external_and_dataset_repository_branch() -> Tuple[bool, Text]:
    is_external = os.environ["IS_EXTERNAL"]
    dataset_repository_branch = os.environ["DATASET_REPOSITORY_BRANCH"]
    if is_external.lower() in ("yes", "true", "t", "1"):
        is_external_flag = True
        dataset_repository_branch = os.environ["EXTERNAL_DATASET_REPOSITORY_BRANCH"]
    else:
        is_external_flag = False
    return is_external_flag, dataset_repository_branch


def prepare_datasetrepo_and_external_tags() -> Dict[Text, Any]:
    is_external, dataset_repo_branch = _get_is_external_and_dataset_repository_branch()
    return {
        "dataset_repository_branch": dataset_repo_branch,
        "external_dataset_repository": is_external,
    }


def prepare_dsrepo_and_external_tags_as_str() -> Dict[Text, Text]:
    return {
        "dataset_repository_branch": os.environ["DATASET_REPOSITORY_BRANCH"],
        "external_dataset_repository": os.environ["IS_EXTERNAL"],
    }


def transform_to_seconds(duration: Text) -> float:
    """Transform string (with hours, minutes, and seconds) to seconds.

    Args:
        duration: Examples: '1m27s', '1m27.3s', '27s', '1h27s', '1h1m27s'

    Raises:
        Exception: If the input is not supported.

    Returns:
        Duration converted in seconds.
    """
    h_split = duration.split("h")
    if len(h_split) == 1:
        rest = h_split[0]
        hours = 0
    else:
        hours = int(h_split[0])
        rest = h_split[1]
    m_split = rest.split("m")
    if len(m_split) == 2:
        minutes = int(m_split[0])
        seconds = float(m_split[1].rstrip("s"))
    elif len(m_split) == 1:
        minutes = 0
        seconds = float(m_split[0].rstrip("s"))
    else:
        raise Exception(f"Unsupported duration: {duration}")
    overall_seconds = hours * 60 * 60 + minutes * 60 + seconds
    return overall_seconds


def prepare_ml_metric(result: Dict[Text, Any]) -> Dict[Text, float]:
    """Converts a nested result dict into a list of metrics.

    Args:
        result: Example
            {'accuracy': 1.0,
             'weighted avg': {
                'precision': 1.0, 'recall': 1.0, 'f1-score': 1.0, 'support': 28
             }
            }

    Returns:
        Dict of metric name and metric value
    """
    metrics_ml = {}
    result = copy.deepcopy(result)
    result.pop("file_name", None)
    task = result.pop("task", None)

    for metric_name, metric_value in result.items():
        if isinstance(metric_value, float):
            metric_full_name = f"{task}.{metric_name}"
            metrics_ml[metric_full_name] = float(metric_value)
        elif isinstance(metric_value, dict):
            for mname, mval in metric_value.items():
                metric_full_name = f"{task}.{metric_name}.{mname}"
                metrics_ml[metric_full_name] = float(mval)
        else:
            raise Exception(
                f"metric_value {metric_value} has",
                f"unexpected type {type(metric_value)}",
            )
    return metrics_ml


def prepare_ml_metrics(results: List[Dict[Text, Any]]) -> Dict[Text, float]:
    metrics_ml = {}
    for result in results:
        new_metrics_ml = prepare_ml_metric(result)
        metrics_ml.update(new_metrics_ml)

    return metrics_ml


def prepare_datadog_tags() -> List[Text]:
    tags = {
        "env": DD_ENV,
        "service": DD_SERVICE,
        "branch": os.environ["BRANCH"],
        "config_repository": CONFIG_REPOSITORY,
        **prepare_dsrepo_and_external_tags_as_str(),
        **create_dict_of_env(MAIN_TAGS),
        **create_dict_of_env(OTHER_TAGS),
        **create_dict_of_env(GIT_RELATED_TAGS),
    }
    tags_list = [f"{k}:{v}" for k, v in tags.items()]
    return tags_list


def send_to_datadog(results: List[Dict[Text, Any]]) -> None:
    """Sends metrics to datadog."""
    # Prepare
    tags_list = prepare_datadog_tags()
    timestamp = datetime.datetime.now().timestamp()
    series = []

    # Send metrics about runtime
    metrics_runtime = create_dict_of_env(METRICS)
    for metric_name, metric_value in metrics_runtime.items():
        overall_seconds = transform_to_seconds(metric_value)
        series.append(
            Series(
                metric=f"{METRIC_RUNTIME_PREFIX}{metric_name}.gauge",
                type="gauge",
                points=[Point([timestamp, overall_seconds])],
                tags=tags_list,
            )
        )

    # Send metrics about ML model performance
    metrics_ml = prepare_ml_metrics(results)
    for metric_name, metric_value in metrics_ml.items():
        series.append(
            Series(
                metric=f"{METRIC_ML_PREFIX}{metric_name}.gauge",
                type="gauge",
                points=[Point([timestamp, float(metric_value)])],
                tags=tags_list,
            )
        )

    body = MetricsPayload(series=series)
    with ApiClient(Configuration()) as api_client:
        api_instance = MetricsApi(api_client)
        response = api_instance.submit_metrics(body=body)
        if response.get("status") != "ok":
            print(response)


def read_results(file: Text) -> Dict[Text, Any]:
    with open(file) as json_file:
        data = json.load(json_file)

        keys = [
            "accuracy",
            "weighted avg",
            "macro avg",
            "micro avg",
            "conversation_accuracy",
        ]
        result = {key: data[key] for key in keys if key in data}

    return result


def get_result(file_name: Text, file: Text) -> Dict[Text, Any]:
    result = read_results(file)
    result["file_name"] = file_name
    result["task"] = TASK_MAPPING[file_name]
    return result


def send_all_to_datadog() -> None:
    results = []
    for dirpath, dirnames, files in os.walk(os.environ["RESULT_DIR"]):
        for f in files:
            if any(f.endswith(valid_name) for valid_name in TASK_MAPPING.keys()):
                result = get_result(f, os.path.join(dirpath, f))
                results.append(result)
    send_to_datadog(results)


def generate_json(file: Text, task: Text, data: dict) -> dict:
    config = os.environ["CONFIG"]
    dataset = os.environ["DATASET_NAME"]

    if dataset not in data:
        data = {dataset: {config: []}, **data}
    elif config not in data[dataset]:
        data[dataset] = {config: [], **data[dataset]}

    assert len(data[dataset][config]) <= 1

    data[dataset][config] = [
        {
            "config_repository": CONFIG_REPOSITORY,
            **prepare_datasetrepo_and_external_tags(),
            **create_dict_of_env(METRICS),
            **create_dict_of_env(OTHER_TAGS),
            **(data[dataset][config][0] if data[dataset][config] else {}),
            task: read_results(file),
        }
    ]
    return data


def create_report_file() -> None:
    data = {}
    for dirpath, dirnames, files in os.walk(os.environ["RESULT_DIR"]):
        for f in files:
            if f not in TASK_MAPPING.keys():
                continue

            data = generate_json(os.path.join(dirpath, f), TASK_MAPPING[f], data)

    with open(os.environ["SUMMARY_FILE"], "w") as f:
        json.dump(data, f, sort_keys=True, indent=2)


if __name__ == "__main__":
    send_all_to_datadog()
    create_report_file()