ErikGartner/hyperdock

View on GitHub
hyperdock/common/experiment.py

Summary

Maintainability
A
2 hrs
Test Coverage
import logging
from datetime import datetime
import os
import sys
import json

import docker
import requests
import schema

from .utils import try_key, slugify
from .stability import tryd

LOG_TAIL_ROWS = 100

SCHEMA_GRAPH = schema.Schema(
    [
        {
            "name": schema.And(str, len),
            "x_axis": str,
            "y_axis": str,
            "series": schema.Schema(
                [
                    {
                        "label": schema.And(str),
                        "x": schema.And(list, len),
                        "y": schema.And(list, len),
                    }
                ]
            ),
        }
    ]
)


class Experiment:
    """
    Class managing an experiment.
    """

    def __init__(self, job, worker_env, privileged=False):
        super().__init__()

        self.has_started = False
        self.id = job["_id"]
        self._queue_job = job
        self._logger = logging.getLogger("Experiment %s" % self.id)
        self._is_running = False
        self._result = {}
        self._graphs = []
        self._container = None
        self._volumes = []
        self._volume_root = None
        self._docker_client = docker.from_env()
        self._privileged = privileged
        self._worker_env = worker_env
        self._update = {}

    def start(self):
        """
        Start the experiment.
        """
        self._logger.info("Starting: {}".format(self._queue_job))

        if self.has_started:
            raise RuntimeError("This experiment has already been executed.")
        self.has_started = True

        self._is_running = True
        self._setup_volumes()
        image = self._queue_job["data"]["docker"]["image"]
        self._container = self._start_container(image)

        if self._container is None:
            self._is_running = False

    def stop(self):
        """
        Stops the experiment.
        """
        if self._is_running and self._container is not None:
            try:
                tryd(self._container.stop)
            except (docker.errors.APIError) as e:
                self._logger.error("Failed to stop container:\n%s" % e)
                self._result = {"status": "fail", "msg": e}
            self._fetch_result()

    def cleanup(self):
        """
        Stops and removes temporary files.
        """
        if self.is_running():
            self.stop()

        if self._container is not None:
            try:
                tryd(self._container.remove, force=True)
            except (docker.errors.APIError) as e:
                self._logger.error("Failed to remove container:\n%s" % e)
            self._container = None
            self._is_running = False

    def is_running(self):
        """
        Checks with underlying runner if the current experiment is running.
        Otherwise it could: not be started, finished or failed.
        """
        if self._is_running:
            self._is_running = self._check_container_running()

            if not self._is_running:
                # Container finished since last check, fetch the result.
                self._fetch_result()

        return self._is_running

    def get_result(self):
        """
        Returns the result, that is: state, loss, and any extra data.
        """
        if self.is_running():
            return {"state": "running"}
        else:
            return self._result

    def get_update(self):
        """
        Returns the latest update from the experiment.
        """
        if self._container is not None:
            try:
                logs = tryd(
                    self._container.logs, stdout=True, stderr=True, tail=LOG_TAIL_ROWS
                )
                if isinstance(logs, (bytes, bytearray)):
                    logs = logs.decode()
            except (docker.errors.APIError) as e:
                logs = "Failed to fetch logs."
                self._logger.warning("Failed to fetch logs: %s" % e)

            self._update = {
                "container": {
                    "name": self._container.name,
                    "id": self._container.short_id,
                    "long_id": self._container.id,
                    "logs": logs,
                    "results_folder": self._volume_root,
                    "graphs": self._read_graphs(),
                }
            }

        return self._update

    def _start_container(self, image):
        """
        Starts a docker container and returns its handle.
        """
        try:
            runtime = try_key(self._queue_job["data"], "", "docker", "runtime")
            environment = self._get_environment()
            volumes = self._volumes
            container = tryd(
                self._docker_client.containers.run,
                image=image,
                tty=False,
                detach=True,
                environment=self._get_environment(),
                runtime=runtime,
                log_config={"type": "json-file"},
                stdout=True,
                stderr=True,
                privileged=self._privileged,
                volumes=self._volumes,
                hostname=str(self.id),
            )
            self._logger.info(
                "Started container ({}, {}) with environment: {}, volumes: {}, privileged: {}".format(
                    container.name,
                    container.name,
                    environment,
                    volumes,
                    self._privileged,
                )
            )
            return container

        except (
            docker.errors.ContainerError,
            docker.errors.APIError,
            docker.errors.ImageNotFound,
        ) as e:
            self._logger.error("Failed to start container:\n%s" % e)
            self._result = {"state": "fail", "msg": str(e)}
            return None

    def _get_environment(self):
        """
        Creates the Target Image enviroment from the enviroment set by
        the worker and the job. Raises ValueError if job_spec contains
        the wrong format.
        """
        env = []
        job_env = try_key(self._queue_job["data"], [], "docker", "environment")
        if isinstance(job_env, list):
            env.extend(job_env)
        elif isinstance(job_env, dict):
            for k, v in job_env.items():
                env.append("%s=%s" % (k, v))
        else:
            raise ValueError("Invalid environment in job spec: %s" % job_env)
        env.extend(self._worker_env)
        return env

    def _check_container_running(self):
        """
        Checks if the container is running or not.
        """
        if self._container is None:
            return False
        else:
            try:
                tryd(self._container.reload)
            except (docker.errors.ContainerError, docker.errors.APIError) as e:
                self._logger.warning("Failed to get status of container: %s" % e)
                return False
            return self._container.status == "running"

    def _fetch_result(self):
        """
        After a container has finished. This method fetches the
        result from the container.
        """
        if self._container is None:
            self._result = {"state": "fail"}

        else:
            self._result = self._read_loss()
            self._graphs = self._read_graphs()
            self._read_docker_logs()

    def _read_loss(self):
        """
        Tries to read the loss from the experiment.
        """
        try:
            with open(os.path.join(self._volume_root, "loss.json"), "r") as f:
                loss = json.load(f)
            return loss
        except:
            self._logger.warning("Failed to read loss")
            return {"state": "fail", "msg": "Failed to read loss."}

    def _read_docker_logs(self):
        """
        Tries to write the docker container logs to the docker_logs.txt file.
        """
        try:
            docker_logs = tryd(self._container.logs, stdout=True, stderr=True)
            with open(os.path.join(self._volume_root, "docker_log.txt"), "wb") as f:
                f.write(docker_logs)
        except:
            self._logger.warning(
                "Failed to read/write docker logs: %s" % sys.exc_info()[0]
            )

    def _read_graphs(self):
        """
        Tries to read and validate the graph from the out folder.
        """
        graphs_path = os.path.join(self._volume_root, "graphs.json")
        try:
            with open(graphs_path, "r") as f:
                graphs = json.load(f)
            graphs = SCHEMA_GRAPH.validate(graphs)
        except:
            self._logger.debug(
                "Failed to read %s: %s" % (graphs_path, sys.exc_info()[0])
            )
            graphs = []
        return graphs

    def _setup_volumes(self):
        """
        Sets up the volumes on the host's end along with the paths for the
        volumes.
        """
        data = self._queue_job["data"]

        # Results folder path on host
        results_folder = try_key(data, "results", "volumes", "results")
        folder_name = datetime.utcnow().strftime("%Y-%m-%d_%H.%M.%S.%f")
        trial_name = slugify(self._queue_job["trial_name"])[:64]
        trial_folder = "{}-{}".format(trial_name, self._queue_job["trial"])
        volume_root = os.path.join(results_folder, trial_folder, folder_name)

        # Ensure path is absolute
        volume_root = os.path.abspath(volume_root)
        os.makedirs(volume_root, exist_ok=True)

        # Folder paths
        out_folder = os.path.join(volume_root, "out")
        in_file = os.path.join(volume_root, "params.json")
        loss_file = os.path.join(volume_root, "loss.json")
        docker_log = os.path.join(volume_root, "docker_log.txt")

        # Make empty foldes and files
        os.mkdir(out_folder)
        open(loss_file, "a").close()

        # Create params file
        with open(in_file, "w") as f:
            json.dump(self._queue_job["parameters"], f, indent=2)

        self._volumes = ["%s:/hyperdock" % volume_root]
        self._volume_root = volume_root

        # Get host data folder, ensure it is absolute
        host_data_folder = try_key(data, "", "volumes", "data")
        host_data_folder = os.path.abspath(host_data_folder)
        if host_data_folder is not "":
            self._volumes.append("%s:/data:ro" % host_data_folder)

        return self._volumes

    def __str__(self):
        return "Experiment: %s" % self._queue_job