getindata/data-pipelines-cli

View on GitHub
data_pipelines_cli/cli_commands/compile.py

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
import json
import pathlib
import shutil
from typing import Dict, Optional

import click
import yaml

from ..bi_utils import BiAction, bi
from ..cli_configs import find_datahub_config_file
from ..cli_constants import BUILD_DIR, IMAGE_TAG_TO_REPLACE
from ..cli_utils import echo_info, echo_warning
from ..config_generation import (
    copy_config_dir_to_build_dir,
    copy_dag_dir_to_build_dir,
    generate_profiles_yml,
)
from ..data_structures import DockerArgs
from ..dbt_utils import read_dbt_vars_from_configs, run_dbt_command
from ..docker_response_reader import DockerResponseReader
from ..errors import DockerErrorResponseError, DockerNotInstalledError
from ..io_utils import replace
from ..jinja import replace_vars_with_values


def _docker_build(docker_args: DockerArgs) -> None:
    """
    :param docker_args: Arguments required by the Docker to make a push to \
        the repository
    :raises DataPipelinesError: Docker not installed
    """
    try:
        import docker
        import docker.errors
    except ModuleNotFoundError:
        raise DockerNotInstalledError()

    echo_info("Building Docker image")
    docker_client = docker.from_env()
    docker_tag = docker_args.docker_build_tag()
    try:
        _, logs_generator = docker_client.images.build(
            path=".", tag=docker_tag, buildargs=docker_args.build_args
        )
        DockerResponseReader(logs_generator).click_echo_ok_responses()
    except docker.errors.BuildError as err:
        build_log = "\n".join([str(log) for log in err.build_log])
        raise DockerErrorResponseError(f"{err.msg}\n{build_log}")


def _dbt_compile(env: str) -> None:
    profiles_path = generate_profiles_yml(env, False)
    echo_info("Running dbt commands:")
    run_dbt_command(("deps",), env, profiles_path)
    run_dbt_command(("compile",), env, profiles_path)
    run_dbt_command(("docs", "generate"), env, profiles_path)
    run_dbt_command(("source", "freshness"), env, profiles_path)


def _copy_dbt_manifest() -> None:
    echo_info("Copying DBT manifest")
    shutil.copyfile(
        pathlib.Path.cwd().joinpath("target", "manifest.json"),
        BUILD_DIR.joinpath("dag", "manifest.json"),
    )


def replace_image_settings(image_tag: str) -> None:
    k8s_config = BUILD_DIR.joinpath("dag", "config", "base", "execution_env.yml")
    echo_info(f"Replacing {IMAGE_TAG_TO_REPLACE} with image tag = {image_tag}")
    replace(k8s_config, IMAGE_TAG_TO_REPLACE, image_tag)


def _replace_datahub_with_jinja_vars(env: str) -> None:
    datahub_config_path: pathlib.Path = find_datahub_config_file(env)

    if not datahub_config_path.is_file():
        echo_warning(
            f"File config/base/datahub.yml does not exist in {BUILD_DIR}. "
            "Content will not be replaced."
        )
        return

    echo_info(f"Replacing Jinja variables in {datahub_config_path}.")
    with open(datahub_config_path, "r") as datahub_config_file:
        updated_config = replace_vars_with_values(
            yaml.safe_load(datahub_config_file), read_dbt_vars_from_configs(env)
        )
    with open(datahub_config_path, "w") as datahub_config_file:
        yaml.dump(updated_config, datahub_config_file)


def compile_project(
    env: str,
    docker_tag: Optional[str] = None,
    docker_build: bool = False,
    docker_build_args: Optional[Dict[str, str]] = None,
) -> None:
    """
    Create local working directories and build artifacts.

    :param env: Name of the environment
    :type env: str
    :param docker_tag: Image tag of a Docker image to create
    :type docker_tag: Optional[str]
    :param docker_build: Whether to build a Docker image
    :type docker_build: bool
    :param bi_build: Whether to generate a BI codes
    :raises DataPipelinesError:
    """
    copy_dag_dir_to_build_dir()
    copy_config_dir_to_build_dir()

    docker_args = DockerArgs(env, docker_tag, docker_build_args or {})

    replace_image_settings(docker_args.image_tag or "Empty")

    _replace_datahub_with_jinja_vars(env)

    _dbt_compile(env)
    _copy_dbt_manifest()

    if docker_build:
        _docker_build(docker_args)

    bi(env, BiAction.COMPILE)


@click.command(
    name="compile",
    help="Create local working directories and build artifacts",
)
@click.option(
    "--env",
    default="local",
    type=str,
    show_default=True,
    required=True,
    help="Name of the environment",
)
@click.option(
    "--docker-build",
    is_flag=True,
    default=False,
    help="Whether to build a Docker image",
)
@click.option(
    "--docker-tag", type=str, required=False, help="Image tag of a Docker image to create"
)
@click.option(
    "--docker-args", type=str, required=False, help="Args required to build project in json format"
)
def compile_project_command(
    env: str,
    docker_build: bool,
    docker_tag: Optional[str],
    docker_args: Optional[str],
) -> None:
    compile_project(env, docker_tag, docker_build, json.loads(docker_args or "{}"))