Aiscalate/aiscalator

View on GitHub
src/aiscalator/airflow/command.py

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
# Apache Software License 2.0
#
# Copyright (c) 2018, Christophe Duong
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Implementations of commands for Airflow
"""
import logging
import re
from grp import getgrgid
from os import listdir
from os import makedirs
from os import remove
from os import stat
from os import symlink
from os.path import abspath
from os.path import basename
from os.path import dirname
from os.path import exists
from os.path import isfile
from os.path import islink
from os.path import join
from os.path import realpath
from tempfile import TemporaryDirectory

from aiscalator.core import utils
from aiscalator.core.config import AiscalatorConfig
from aiscalator.core.log_regex_analyzer import LogRegexAnalyzer


def _docker_compose(conf: AiscalatorConfig,
                    extra_commands: list):
    """
    Run the docker-compose command

    Parameters
    ----------
    conf : AiscalatorConfig
        Configuration object for the application
    extra_commands : list
        list of sub-commands to run in docker-compose

    """
    logger = logging.getLogger(__name__)
    conf.validate_config()
    dockerfile = join(conf.app_config_home(), "config",
                      conf.airflow_docker_compose_file())
    commands = ["docker-compose"]
    # Prepare a temp folder to run the command from
    with TemporaryDirectory(prefix="aiscalator_") as tmp:
        with open(join(tmp, ".env"), mode="w") as env_file:
            # concatenate all the env files into one
            for env in conf.user_env_file(conf.dag_field("definition.env")):
                if isfile(env):
                    with open(env, mode="r") as file:
                        for line in file:
                            env_file.write(line)
        utils.copy_replace(join(tmp, ".env"),
                           join(dirname(dockerfile), ".env"))
    commands += ["-f", dockerfile] + extra_commands
    logger.info("Running...: %s", " ".join(commands))
    utils.subprocess_run(commands, no_redirect=True)


def airflow_setup(conf: AiscalatorConfig,
                  config_home: str,
                  workspace: list,
                  append: bool = True):
    """
    Setup the airflow configuration files and environment

    Parameters
    ----------
    conf : AiscalatorConfig
        Configuration object for the application
    config_home : str
        path to the configuration home directory
    workspace : list
        List of path to directories to mount as volumes
        to airflow workers to use as workspaces
    append : bool
        flag to tell if workspace should be appended to
        the list in the config or replace it.

    """
    logger = logging.getLogger(__name__)
    conf.validate_config()
    if config_home:
        makedirs(config_home, exist_ok=True)
        conf.redefine_app_config_home(config_home)
    ws_path = "airflow.setup.workspace_paths"
    if conf.app_config_has(ws_path):
        if append:
            workspace += conf.app_config()[ws_path]
    conf.redefine_airflow_workspaces(workspace)
    image = 'latest'
    if _docker_compose_grep(conf):
        image = _airflow_docker_build(conf)
        if not image:
            raise Exception("Failed to build docker image")
    src = utils.data_file("../config/docker/airflow/config/")
    dst = join(conf.app_config_home(), "config")
    logger.info("Generating a new configuration folder for aiscalator:\n\t%s",
                dst)
    makedirs(dst, exist_ok=True)
    makedirs(join(conf.app_config_home(), "dags"), exist_ok=True)
    makedirs(join(conf.app_config_home(), "pgdata"), exist_ok=True)
    makedirs(join(conf.app_config_home(), "workspace"), exist_ok=True)
    pattern = [
        r"(\s+)# - workspace #",
        "aiscalator/airflow:latest",
    ]
    workspace = []
    for line in conf.app_config()[ws_path]:
        host_src, container_dst = _split_workspace_string(conf, line)
        # bind the same path from host in the container (after creating a
        # symbolic link at container_dst path)
        workspace += [r"\1- " + host_src + ':' + host_src]
    workspace += [r"\1# - workspace #"]
    value = [
        "\n".join(workspace),
        "aiscalator/airflow:" + image,
    ]
    for file in listdir(src):
        utils.copy_replace(join(src, file),
                           join(dst, file),
                           pattern=pattern,
                           replace_value=value)


def airflow_up(conf: AiscalatorConfig):
    """
    Starts an airflow environment

    Parameters
    ----------
    conf : AiscalatorConfig
        Configuration object for the application

    """
    if _docker_compose_grep(conf):
        if not _airflow_docker_build(conf):
            raise Exception("Failed to build docker image")
    _docker_compose(conf, ["up", "-d"])


def _docker_compose_grep(conf: AiscalatorConfig):
    """
    Checks if the docker-compose file is using the
    aiscalator/airflow docker image. In which case,
    we need to make sure that image is properly built
    and available.

    Parameters
    ----------
    conf : AiscalatorConfig
        Configuration object for the application

    Returns
    -------
    bool
        Returns if aiscalator/airflow docker image is
        needed and should be built.
    """
    docker_compose_file = join(conf.app_config_home(), "config",
                               conf.airflow_docker_compose_file())
    pattern = re.compile(r"\s+image:\s+aiscalator/airflow")
    try:
        with open(docker_compose_file, "r") as file:
            for line in file:
                if re.match(pattern, line):
                    # docker compose needs the image
                    return True
    except FileNotFoundError:
        # no docker compose, default will need the image
        return True
    return False


def _airflow_docker_build(conf: AiscalatorConfig):
    """ Build the aiscalator/airflow image and return its ID."""
    logger = logging.getLogger(__name__)
    # TODO get airflow dockerfile from conf?
    conf.app_config_home()
    dockerfile_dir = utils.data_file("../config/docker/airflow")
    # TODO customize dockerfile with apt_packages, requirements etc
    docker_gid, docker_group = _find_docker_gid_group()
    commands = [
        "docker", "build",
        "--build-arg", "DOCKER_GID=" + str(docker_gid),
        "--build-arg", "DOCKER_GROUP=" + str(docker_group),
        "--rm", "-t", "aiscalator/airflow:latest",
        dockerfile_dir
    ]
    log = LogRegexAnalyzer(b'Successfully built ([a-zA-Z0-9]+)\n')
    logger.info("Running...: %s", " ".join(commands))
    utils.subprocess_run(commands, log_function=log.grep_logs)
    result = log.artifact()
    if result:
        # tag the image built with the sha256 of the dockerfile
        tag = utils.sha256(join(dockerfile_dir, 'Dockerfile'))[:12]
        commands = [
            "docker", "tag", result, "aiscalator/airflow:" + tag
        ]
        logger.info("Running...: %s", " ".join(commands))
        utils.subprocess_run(commands)
        return tag
    return None


def _find_docker_gid_group():
    """Returns the group ID and name owning the /var/run/docker.sock file"""
    stat_info = stat('/var/run/docker.sock')
    if stat_info:
        gid = stat_info.st_gid
        return gid, getgrgid(gid)[0]
    return None, None


def airflow_down(conf: AiscalatorConfig):
    """
    Stop an airflow environment

    Parameters
    ----------
    conf : AiscalatorConfig
        Configuration object for the application

    """
    _docker_compose(conf, ["down"])


def airflow_cmd(conf: AiscalatorConfig, service="webserver", cmd=None):
    """
    Execute an airflow subcommand

    Parameters
    ----------
    conf : AiscalatorConfig
        Configuration object for the application
    service : string
        service name of the container where to run the command
    cmd : list
        subcommands to run
    """
    commands = [
        "run", "--rm", service,
    ]
    if cmd is not None:
        commands += cmd
    else:
        commands += ["airflow"]
    _docker_compose(conf, commands)


def airflow_edit(conf: AiscalatorConfig):
    """
    Starts an airflow environment

    Parameters
    ----------
    conf : AiscalatorConfig
        Configuration object for the application

    """
    logger = logging.getLogger(__name__)
    conf.validate_config()
    docker_image = _airflow_docker_build(conf)
    if docker_image:
        # TODO: shutdown other jupyter lab still running
        port = 10001
        notebook = basename(conf.dag_field('definition.code_path'))
        notebook, notebook_py = utils.notebook_file(notebook)
        commands = _prepare_docker_env(conf, [
            "aiscalator/airflow:" + docker_image, "bash",
            "/start-jupyter.sh",
            "/usr/local/airflow/work/" + notebook_py +
            ":/usr/local/airflow/dags/" + notebook_py
        ], port)
        return utils.wait_for_jupyter_lab(commands, logger, notebook,
                                          port, "work")
    raise Exception("Failed to build docker image")


def _prepare_docker_env(conf: AiscalatorConfig, program, port):
    """
    Assembles the list of commands to execute a docker run call

    When calling "docker run ...", this function also adds a set of
    additional parameters to mount the proper volumes and expose the
    correct environment for the call in the docker image.

    Parameters
    ----------
    conf : AiscalatorConfig
        Configuration object for the step
    program : List
        the rest of the commands to execute as part of
        the docker run call

    Returns
    -------
    List
        The full Array of Strings representing the commands to execute
        in the docker run call
    """
    logger = logging.getLogger(__name__)
    commands = [
        "docker", "run", "--name", conf.dag_container_name() + "_edit",
        "--rm",
        # TODO improve port publishing
        "-p", str(port) + ":8888",
        "-p", "18080:8080",
    ]
    for env in conf.user_env_file(conf.dag_field("definition.env")):
        if isfile(env):
            commands += ["--env-file", env]
    commands += [
        "--mount", "type=bind,source=/var/run/docker.sock,"
                   "target=/var/run/docker.sock",
    ]
    code_path = conf.dag_file_path('definition.code_path')
    notebook, _ = utils.notebook_file(code_path)
    utils.check_notebook_dir(logger, notebook)
    commands += [
        "--mount", "type=bind,source=" + dirname(notebook) +
        ",target=/usr/local/airflow/work/",
    ]
    if conf.config_path() is not None:
        commands += [
            "--mount",
            "type=bind,source=" + abspath(conf.config_path()) +
            ",target="
            "/usr/local/airflow/" + basename(conf.config_path()),
        ]
    workspace = []
    ws_path = "airflow.setup.workspace_paths"
    if conf.app_config_has(ws_path):
        ws_home = join(conf.app_config_home(),
                       "workspace")
        makedirs(ws_home, exist_ok=True)
        for folder in conf.app_config()[ws_path]:
            src, dst = _split_workspace_string(conf, folder)
            # bind the same path from host in the container (after creating
            # a symbolic link at dst path)
            workspace += [src + ":" + src]
            commands += [
                "--mount", "type=bind,source=" + src +
                ",target=" + src
            ]
        commands += [
            "--mount", "type=bind,source=" + ws_home +
            ",target=/usr/local/airflow/workspace/"
        ]
    commands += program + workspace
    return commands


def _split_workspace_string(conf: AiscalatorConfig, workspace):
    """
    Interprets the workspace string and split into src and dst
    paths:
    - The src is a path on the host machine.
    - The dst is a path on the container.
    In case, the workspace string doesn't specify both paths
    separated by a ':', this function will automatically mount it
    in the $app_config_home_directory/work/ folder creating a
    symbolic link with the same basename as the workspace.

    Parameters
    ----------
    conf : AiscalatorConfig
        Configuration object for the step
    workspace : str
        the workspace string to interpret
    Returns
    -------
    (str, str)
        A tuple with both src and dst paths
    """
    logger = logging.getLogger(__name__)
    root_dir = conf.app_config_home()
    if workspace.strip():
        if ':' in workspace:
            src = abspath(workspace.split(':')[0])
            if not src.startswith('/'):
                src = abspath(join(root_dir, src))
            dst = workspace.split(':')[1]
            if not dst.startswith('/'):
                dst = abspath(join(root_dir, dst))
        else:
            src = abspath(workspace)
            if not src.startswith('/'):
                src = abspath(join(root_dir, src))
            # in the workspace special folder, we'll create the same named
            # folder (basename) that is actually a link back to the one
            # we want to include in our workspace.
            dst = join("workspace", basename(workspace.strip('/')))
            link = join(root_dir, dst)
            if realpath(src) != realpath(link):
                if exists(link) and islink(link):
                    logger.warning("Removing an existing symbolic"
                                   " link %s -> %s",
                                   link, realpath(link))
                    remove(link)
                if not exists(link):
                    logger.info("Creating a symbolic link %s -> %s", link, src)
                    symlink(src, link)
            dst = "/usr/local/airflow/" + dst
        return src, dst
    return None, None


def airflow_push(conf: AiscalatorConfig):
    """
    Starts an airflow environment

    Parameters
    ----------
    conf : AiscalatorConfig
        Configuration object for the application

    """
    # TODO to implement
    logging.error("Not implemented yet %s", conf.app_config_home())