getindata/data-pipelines-cli

View on GitHub
data_pipelines_cli/cli_commands/deploy.py

Summary

Maintainability
A
0 mins
Test Coverage
A
95%
import io
import json
from typing import Any, Dict, Optional, cast

import click
import yaml

from ..airbyte_utils import AirbyteFactory
from ..bi_utils import BiAction, bi
from ..cli_configs import find_datahub_config_file
from ..cli_constants import BUILD_DIR
from ..cli_utils import echo_error, echo_info, subprocess_run
from ..config_generation import read_dictionary_from_config_directory
from ..data_structures import DockerArgs
from ..docker_response_reader import DockerResponseReader
from ..errors import (
    AirflowDagsPathKeyError,
    DataPipelinesError,
    DependencyNotInstalledError,
    DockerErrorResponseError,
    DockerNotInstalledError,
)
from ..filesystem_utils import LocalRemoteSync


class DeployCommand:
    """A class used to push and deploy the project to the remote machine."""

    docker_args: Optional[DockerArgs]
    """Arguments required by the Docker to make a push to the repository.
    If set to `None`, :meth:`deploy` will not make a push"""
    datahub_ingest: bool
    """Whether to ingest DataHub metadata"""
    blob_address_path: str
    """URI of the cloud storage to send build artifacts to"""
    provider_kwargs_dict: Dict[str, Any]
    """Dictionary of arguments required by a specific cloud storage provider,
    e.g. path to a token, username, password, etc."""
    env: str
    bi_git_key_path: str
    auth_token: Optional[str]
    """Authorization OIDC ID token for a service account to communication with Airbyte instance"""
    disable_bucket_sync: bool
    """Whether to disable bucket sync with artefacts"""

    def __init__(
        self,
        env: str,
        docker_push: bool,
        dags_path: Optional[str],
        provider_kwargs_dict: Optional[Dict[str, Any]],
        datahub_ingest: bool,
        bi_git_key_path: str,
        auth_token: Optional[str],
        disable_bucket_sync: bool,
    ) -> None:
        self.docker_args = DockerArgs(env, None, {}) if docker_push else None
        self.datahub_ingest = datahub_ingest
        self.provider_kwargs_dict = provider_kwargs_dict or {}
        self.env = env
        self.bi_git_key_path = bi_git_key_path
        self.auth_token = auth_token
        self.disable_bucket_sync = disable_bucket_sync

        try:
            self.blob_address_path = (
                dags_path
                or read_dictionary_from_config_directory(
                    BUILD_DIR.joinpath("dag"),
                    env,
                    "airflow.yml",
                )["dags_path"]
            )
        except KeyError as key_error:
            raise AirflowDagsPathKeyError from key_error

        self.enable_ingest = read_dictionary_from_config_directory(
            BUILD_DIR.joinpath("dag"), env, "ingestion.yml"
        ).get("enable", False)

    def deploy(self) -> None:
        """Push and deploy the project to the remote machine.

        :raises DependencyNotInstalledError: DataHub or Docker not installed
        :raises DataPipelinesError: Error while pushing Docker image
        """
        if self.docker_args:
            self._docker_push()

        if self.datahub_ingest:
            self._datahub_ingest()

        if self.enable_ingest:
            self._enable_ingest()

        self._bi_push()

        if not self.disable_bucket_sync:
            self._bucket_sync()

    def _bi_push(self) -> None:
        bi(self.env, BiAction.DEPLOY, self.bi_git_key_path)

    def _docker_push(self) -> None:
        """
        :raises DockerNotInstalledError: Docker not installed
        :raises DataPipelinesError: Error while pushing Docker image
        """
        try:
            import docker
        except ModuleNotFoundError:
            raise DockerNotInstalledError()

        echo_info("Pushing Docker image")
        docker_client = docker.from_env()
        docker_args = cast(DockerArgs, self.docker_args)

        try:
            DockerResponseReader(
                docker_client.images.push(
                    repository=docker_args.repository,
                    tag=docker_args.image_tag,
                    stream=True,
                    decode=True,
                )
            ).click_echo_ok_responses()
        except DockerErrorResponseError as err:
            echo_error(err.message)
            raise DataPipelinesError(
                "Error raised when pushing Docker image. Ensure that "
                "Docker image you try to push exists. Maybe try running "
                "'dp compile' first?"
            )

    def _datahub_ingest(self) -> None:
        """:raises DependencyNotInstalledError: DataHub not installed"""
        try:
            import datahub  # noqa: F401
        except ModuleNotFoundError:
            raise DependencyNotInstalledError("datahub")

        echo_info("Ingesting datahub metadata")
        subprocess_run(
            [
                "datahub",
                "ingest",
                "-c",
                str(find_datahub_config_file(self.env)),
            ]
        )

    def _enable_ingest(self) -> None:
        echo_info("Ingesting airbyte config")
        airbyte_config_path = AirbyteFactory.find_config_file(self.env, "airbyte")
        AirbyteFactory(
            airbyte_config_path=airbyte_config_path, auth_token=self.auth_token
        ).create_update_connections()

    def _bucket_sync(self) -> None:
        echo_info("Syncing Bucket")
        LocalRemoteSync(
            BUILD_DIR.joinpath("dag"), self.blob_address_path, self.provider_kwargs_dict
        ).sync(delete=True)


@click.command(
    name="deploy",
    help="Push and deploy the project to the remote machine",
)
@click.option("--env", default="base", show_default=True, type=str, help="Name of the environment")
@click.option("--dags-path", required=False, help="Remote storage URI")
@click.option(
    "--blob-args",
    required=False,
    type=click.File("r"),
    help="Path to JSON or YAML file with arguments that should be passed to "
    "your Bucket/blob provider",
)
@click.option(
    "--docker-push",
    type=bool,
    is_flag=True,
    default=False,
    help="Whether to push image to the Docker repository",
)
@click.option(
    "--datahub-ingest",
    is_flag=True,
    default=False,
    help="Whether to ingest DataHub metadata",
)
@click.option(
    "--bi-git-key-path",
    type=str,
    required=False,
    help="Path to the key with write access to repo",
)
@click.option(
    "--auth-token",
    type=str,
    required=False,
    help="Authorization OIDC ID token for a service account to communication with cloud services",
)
@click.option(
    "--disable-bucket-sync",
    is_flag=True,
    default=False,
    help="Whether to disable bucket sync with artefacts",
)
def deploy_command(
    env: str,
    dags_path: Optional[str],
    blob_args: Optional[io.TextIOWrapper],
    docker_push: bool,
    datahub_ingest: bool,
    bi_git_key_path: str,
    auth_token: Optional[str],
    disable_bucket_sync: bool,
) -> None:
    if blob_args:
        try:
            provider_kwargs_dict = json.load(blob_args)
        except json.JSONDecodeError:
            blob_args.seek(0)
            provider_kwargs_dict = yaml.safe_load(blob_args)
    else:
        provider_kwargs_dict = None

    DeployCommand(
        env,
        docker_push,
        dags_path,
        provider_kwargs_dict,
        datahub_ingest,
        bi_git_key_path,
        auth_token,
        disable_bucket_sync,
    ).deploy()