getindata/data-pipelines-cli

View on GitHub
data_pipelines_cli/cli_commands/generate/utils.py

Summary

Maintainability
A
0 mins
Test Coverage
A
96%
import json
import pathlib
import sys
from typing import Any, Dict, Optional

import yaml

from ...cli_utils import echo_warning
from ...dbt_utils import run_dbt_command
from ...errors import DataPipelinesError


def get_macro_run_output(
    env: str, macro_name: str, macro_args: Dict[str, str], profiles_path: pathlib.Path
) -> str:
    print_args = yaml.dump(macro_args, default_flow_style=True, width=sys.maxsize).rstrip()
    dbt_command_result_bytes = run_dbt_command(
        ("run-operation", macro_name, "--args", print_args),
        env,
        profiles_path,
        log_format_json=True,
        capture_output=True,
    )
    decoded_output = dbt_command_result_bytes.stdout.decode(encoding=sys.stdout.encoding or "utf-8")
    for line in map(json.loads, decoded_output.splitlines()):
        if line.get("code") == "M011":
            return line["msg"]
    raise DataPipelinesError(f"No macro output found in the dbt output:\n{decoded_output}")


def generate_models_or_sources_from_single_table(
    env: str, macro_name: str, macro_args: Dict[str, Any], profiles_path: pathlib.Path
) -> Dict[str, Any]:
    return yaml.safe_load(get_macro_run_output(env, macro_name, macro_args, profiles_path))


def get_output_file_or_warn_if_exists(
    directory: pathlib.Path, overwrite: bool, file_extension: str, filename: Optional[str] = None
) -> Optional[pathlib.Path]:
    output_path = directory.joinpath(f"{filename or directory.name}.{file_extension}")
    if output_path.exists():
        if not overwrite:
            echo_warning(
                f"{str(output_path)} in directory {str(directory)} exists, it "
                "will not be overwritten. If you want to overwrite it, pass "
                "'--overwrite' flag."
            )
            return None
        else:
            echo_warning(
                f"{str(output_path)} in directory {str(directory)} exists, it gets overwritten."
            )
    return output_path