PiePline/piepline

View on GitHub
piepline/monitoring/monitors.py

Summary

Maintainability
A
1 hr
Test Coverage
import json
import os
from abc import ABCMeta, abstractmethod
from typing import List

import numpy as np

from piepline.utils.fsm import FolderRegistrable, FileStructManager
from piepline.train_config.metrics import MetricsGroup, AbstractMetric

__all__ = ['AbstractMonitor', 'AbstractMetricsMonitor', 'ConsoleLossMonitor', 'FileLogMonitor']


class AbstractMonitor(metaclass=ABCMeta):
    def __init__(self):
        self._epoch_num = 0

    def set_epoch_num(self, epoch_num: int) -> None:
        """
        Set current epoch num

        :param epoch_num: num of current epoch
        """
        self._epoch_num = epoch_num

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass


class AbstractLossMonitor(AbstractMonitor, metaclass=ABCMeta):
    """
    Basic class for every loss monitor
    """

    def update_losses(self, losses: {}) -> None:
        """
        Update losses on monitor

        :param losses: losses values dict with keys is names of stages in train pipeline (e.g. [train, validation])
        """
        pass

    @staticmethod
    def _iterate_by_losses(losses: {}, callback: callable) -> None:
        """
        Internal method for unify iteration by losses dict

        :param losses: dic of losses
        :param callback: callable, that call for every loss value and get params loss_name and loss_values: ``callback(name: str, values: np.ndarray)``
        """
        for m, v in losses.items():
            callback(m, v)


class AbstractMetricsMonitor(AbstractMonitor, metaclass=ABCMeta):
    """
    Basic class for every metrics monitor
    """

    def update_metrics(self, metrics: {}) -> None:
        """
        Update metrics on   monitor

        :param metrics: metrics dict with keys 'metrics' and 'groups'
        """
        for metric in metrics['metrics']:
            self._process_metric([], metric)

        for metrics_group in metrics['groups']:
            for metric in metrics_group.metrics():
                self._process_metric([metrics_group], metric)
            for group in metrics_group.groups():
                for metric in group.metrics():
                    self._process_metric([metrics_group, group], metric)

    @abstractmethod
    def _process_metric(self, path: List[MetricsGroup], metric: 'AbstractMetric'):
        """
        Internal method for process one metric

        Args:
            path (List[MetricsGroup]): list of parent metrics groups from root to current metric (except this metric)
        """


class ConsoleLossMonitor(AbstractLossMonitor):
    """
    Monitor, that used for write metrics to console.

    Output looks like: ``Epoch: [#]; train: [-1, 0, 1]; validation: [-1, 0, 1]``. This 3 numbers is [min, mean, max] values of
    training stage loss values
    """

    class ResStr:
        def __init__(self, start: str):
            self.res = start

        def append(self, string: str):
            self.res += string

        def __str__(self):
            return self.res[:len(self.res) - 1]

    def update_losses(self, losses: {}) -> None:
        def on_loss(name: str, values: np.ndarray, string) -> None:
            string.append(" {}: [{:4f}, {:4f}];".format(name, np.median(values), np.std(values)))

        res_string = self.ResStr("Epoch: [{}];".format(self._epoch_num))
        self._iterate_by_losses(losses, lambda m, v: on_loss(m, v, res_string))
        print(res_string)


class FileLogMonitor(AbstractMetricsMonitor, AbstractLossMonitor, FolderRegistrable):
    """
    Monitor, used for logging metrics in files. It's write full log and can also write last metrics in separate file if required

    All output files in JSON format and stores in ``<base_dir_path>/monitors/metrics_log``

    :param fsm: :class:`FileStructManager` object
    """

    def __init__(self, fsm: FileStructManager):
        super().__init__()

        self._fsm = fsm
        self._fsm.register_dir(self)
        self._files = {}
        self._meta_file = None
        self._final_file = None

    def _process_metric(self, path: List[MetricsGroup], metric: 'AbstractMetric'):
        file_log_dir = self._fsm.get_path(self, create_if_non_exists=True, check=False)

        intermediate_path = ''
        for p in path:
            intermediate_path = os.path.join(intermediate_path, p.name())
        cur_dir = os.path.join(file_log_dir, intermediate_path)

        if not os.path.exists(cur_dir):
            os.makedirs(cur_dir)

        metric_value = metric.get_value()

        cur_file_path = os.path.join(cur_dir, metric.name() + '.csv')
        with open(cur_file_path, 'a') as out:
            out.write("{}, {}\n".format(self._epoch_num, metric_value))

        if self._final_file is not None:
            final_file_path = os.path.join(file_log_dir, self._final_file)
            final_data = {}
            if os.path.exists(final_file_path):
                with open(final_file_path, 'r') as final_file:
                    final_data = json.load(final_file)
            final_data['/'.join([p.name() for p in path] + [metric.name()])] = metric_value
            with open(final_file_path, 'w') as final_file:
                json.dump(final_data, final_file)

        if cur_file_path not in self._files:
            self._files[cur_file_path] = {'name': metric.name(), 'path': [p.name() for p in path]}

            if self._meta_file is None:
                self._meta_file = os.path.join(file_log_dir, 'meta.json')

            with open(self._meta_file, 'w') as meta_out:
                json.dump(self._files, meta_out)

    def load(self) -> dict:
        cur_dir = self._fsm.get_path(self, create_if_non_exists=False, check=True)
        with open(os.path.join(cur_dir, 'meta.json'), 'r') as meta_file:
            meta = json.load(meta_file)

        res = {}
        for path in meta:
            cur_path = os.path.join(cur_dir, path)
            for f in os.listdir(cur_path):
                res[path] = np.loadtxt(os.path.join(cur_path, f), delimiter=',')

        return res

    def write_final_metrics(self, path: str = 'metrics.json') -> 'FileLogMonitor':
        self._final_file = path
        return self

    def get_dir(self) -> str:
        return self._fsm.get_path(self, create_if_non_exists=False, check=False)

    def _get_gir(self) -> str:
        return os.path.join('monitors', 'metrics_log')

    def _get_name(self) -> str:
        return 'FileLogMonitor'