datacoves/dbt-coves

View on GitHub
dbt_coves/utils/api_caller.py

Summary

Maintainability
A
3 hrs
Test Coverage
import json
from typing import Any, Dict

import requests
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException
from rich.console import Console

console = Console()

FIVETRAN_API_BASE_URL = "https://api.fivetran.com/v1"
FIVETRAN_API_ENDPOINTS = {
    "GROUP_CREATE": FIVETRAN_API_BASE_URL + "/groups",
    "GROUP_DETAILS": FIVETRAN_API_BASE_URL + "/groups/{group}",
    "DESTINATION_LIST": FIVETRAN_API_BASE_URL + "/groups",
    "DESTINATION_DETAILS": FIVETRAN_API_BASE_URL + "/destinations/{destination}",
    "DESTINATION_CREATE": FIVETRAN_API_BASE_URL + "/destinations",
    "CONNECTOR_DESTINATION_LIST": FIVETRAN_API_BASE_URL + "/groups/{destination}/connectors",
    "CONNECTOR_CREATE": FIVETRAN_API_BASE_URL + "/connectors/",
    "CONNECTOR_DETAILS": FIVETRAN_API_BASE_URL + "/connectors/{connector}",
    "CONNECTOR_SCHEMAS": FIVETRAN_API_BASE_URL + "/connectors/{connector}/schemas",
    "CONNECTOR_TABLES_CONFIG": FIVETRAN_API_BASE_URL
    + "/connectors/{connector}/schemas/{schema}/tables/{table}",
    "SOURCE_METADATA": FIVETRAN_API_BASE_URL + "/metadata/connectors/{service}",
}


def api_call(
    method,
    endpoint: str,
    body: Dict[str, str] = None,
    headers=None,
    auth=None,
):
    """Generic `api caller`"""
    response = requests.request(method, url=endpoint, json=body, headers=headers, auth=auth)
    try:
        if response.status_code == 404:
            return {}
        else:
            response.raise_for_status()
            return json.loads(response.text)
    except RequestException:
        error_message = json.loads(response.text)["message"]
        raise FivetranApiCallerException(error_message)


class AirbyteApiCallerException(Exception):
    pass


class FivetranApiCallerException(Exception):
    pass


class AirbyteApiCaller:
    def api_call(self, endpoint: str, body: Dict[str, str] = None, timeout=None):
        """
        Generic `api caller` for contacting Airbyte
        """
        response = requests.post(endpoint, json=body, timeout=timeout)
        if response.status_code >= 200 and response.status_code < 300:
            return json.loads(response.text) if response.text else None
        else:
            raise AirbyteApiCallerException(
                f"Unexpected status code from airbyte in endpoint {endpoint}:"
                f"{response.status_code}: {json.loads(response.text)['message']}"
            )

    def __init__(self, api_host, api_port):
        airbyte_host = api_host
        airbyte_port = api_port
        airbyte_api_root = "api/v1/"
        airbyte_api_base_endpoint = f"{airbyte_host}:{airbyte_port}/{airbyte_api_root}"

        self.api_endpoints = {
            "GET_OBJECTS": airbyte_api_base_endpoint + "{obj}/get",
            "LIST_OBJECTS": airbyte_api_base_endpoint + "{obj}/list",
            "CREATE_OBJECT": airbyte_api_base_endpoint + "{obj}/create",
            "UPDATE_OBJECT": airbyte_api_base_endpoint + "{obj}/update",
            "DELETE_OBJECT": airbyte_api_base_endpoint + "{obj}/delete",
            "TEST_UPDATE": airbyte_api_base_endpoint + "{obj}/check_connection_for_update",
            "TEST_CONNECTION": airbyte_api_base_endpoint + "{obj}/check_connection",
        }
        try:
            console.print("Querying [i]Airbyte[/i] connections")
            self.airbyte_workspace_id = self.api_call(
                self.api_endpoints["LIST_OBJECTS"].format(obj="workspaces")
            )["workspaces"][0]["workspaceId"]
            self.standard_request_body = {"workspaceId": self.airbyte_workspace_id}
            self.airbyte_connections_list = self.api_call(
                self.api_endpoints["LIST_OBJECTS"].format(obj="connections"),
                self.standard_request_body,
            )["connections"]
            self.airbyte_sources_list = self.api_call(
                self.api_endpoints["LIST_OBJECTS"].format(obj="sources"), self.standard_request_body
            )["sources"]
            self.airbyte_destinations_list = self.api_call(
                self.api_endpoints["LIST_OBJECTS"].format(obj="destinations"),
                self.standard_request_body,
            )["destinations"]

            self.load_definitions()
        except AirbyteApiCallerException as e:
            raise AirbyteApiCallerException(
                f"Couldn't retrieve Airbyte connections, sources and destinations {e}"
            )

    def load_definitions(self):
        self.destination_definitions = self.api_call(
            self.api_endpoints["LIST_OBJECTS"].format(obj="destination_definitions"),
            self.standard_request_body,
        )["destinationDefinitions"]

        self.source_definitions = self.api_call(
            self.api_endpoints["LIST_OBJECTS"].format(obj="source_definitions"),
            self.standard_request_body,
        )["sourceDefinitions"]


class FivetranApiCaller:
    def __init__(self, api_key, api_secret):
        self.auth = HTTPBasicAuth(api_key, api_secret)
        self.headers = {
            "Content-Type": "application/json",
            "Accept": "application/json;version=2",
        }
        self.fivetran_data = self._populate_fivetran_data()

    def _fivetran_api_call(self, method: str, endpoint: str, payload=None):
        """
        Common method to reach Fivetran API, extensible for future Methods and Endpoints
        """
        return api_call(
            method,
            endpoint,
            body=payload,
            headers=self.headers,
            auth=self.auth,
        )

    def _get_destination_details(self, destination_id) -> Dict[Any, Any]:
        """
        Get Group details from Fivetran API
        """
        destination_details = self._fivetran_api_call(
            "GET",
            FIVETRAN_API_ENDPOINTS["DESTINATION_DETAILS"].format(destination=destination_id),
        )
        return destination_details.get("data")

    def _get_connector_schemas(self, connector_id):
        """
        Get Connector schemas from Fivetran API
        """
        connector_schemas = self._fivetran_api_call(
            "GET",
            FIVETRAN_API_ENDPOINTS["CONNECTOR_SCHEMAS"].format(connector=connector_id),
        )
        return connector_schemas.get("data", {}).get("schemas", {})

    def _get_connector_details(self, connector_id):
        """
        Get Connector details from Fivetran API
        """
        connector_details = self._fivetran_api_call(
            "GET",
            FIVETRAN_API_ENDPOINTS["CONNECTOR_DETAILS"].format(connector=connector_id),
        )
        return connector_details.get("data", {})

    def _get_destination_connectors(self, destination_id: str) -> Dict[Any, Any]:
        """
        Get Group connectors (ids and their schemas)
        """
        destination_connectors = self._fivetran_api_call(
            "GET",
            FIVETRAN_API_ENDPOINTS["CONNECTOR_DESTINATION_LIST"].format(destination=destination_id),
        )
        connector_data = {}
        for connector in destination_connectors.get("data", {}).get("items", []):
            connector_id = connector["id"]
            connector_data[connector_id] = {}
            connector_data[connector_id]["details"] = self._get_connector_details(connector_id)
            connector_data[connector_id]["schemas"] = self._get_connector_schemas(connector_id)
        return connector_data

    def create_group(self, group_name, service) -> str:
        payload = {"name": group_name}
        created_group = self._fivetran_api_call(
            "POST", FIVETRAN_API_ENDPOINTS["GROUP_CREATE"], payload=payload
        )
        created_group_id = created_group["data"]["id"]
        created_group_name = created_group["data"]["name"]
        self.fivetran_groups[created_group_id] = {
            "name": created_group_name,
            "service": service,
        }
        return created_group_id

    def _populate_fivetran_data(self) -> Dict[Any, Any]:
        console.print("Querying [i]Fivetran[/i] connections")
        fivetran_data = {}
        fivetran_group_map = {}
        fivetran_groups = self._fivetran_api_call(
            "GET", FIVETRAN_API_ENDPOINTS.get("DESTINATION_LIST")
        )
        for group in fivetran_groups.get("data", {}).get("items", []):
            destination_data = {}
            destination_id = group["id"]
            fivetran_group_map[destination_id] = {}
            fivetran_group_map[destination_id]["name"] = group["name"]
            destination_details = self._get_destination_details(destination_id)
            if destination_details:
                fivetran_group_map[destination_id]["service"] = destination_details.get(
                    "service", ""
                )
                destination_data["details"] = destination_details
                destination_data["connectors"] = self._get_destination_connectors(destination_id)
                fivetran_data[destination_id] = destination_data
        self.fivetran_groups = fivetran_group_map
        return fivetran_data

    def update_destination(self, destination_id, destination_details):
        destination = self._fivetran_api_call(
            "PATCH",
            FIVETRAN_API_ENDPOINTS["DESTINATION_DETAILS"].format(destination=destination_id),
            destination_details,
        )
        return destination["data"]

    def create_destination(self, payload):
        destination = self._fivetran_api_call(
            "POST",
            FIVETRAN_API_ENDPOINTS["DESTINATION_CREATE"],
            payload,
        )
        self.fivetran_data
        return destination["data"]

    def update_connector(self, connector_id, connector_details):
        connector = self._fivetran_api_call(
            "PATCH",
            FIVETRAN_API_ENDPOINTS["CONNECTOR_DETAILS"].format(connector=connector_id),
            connector_details,
        )
        return connector["data"]

    def create_connector(self, connector_details):
        connector = self._fivetran_api_call(
            "POST", FIVETRAN_API_ENDPOINTS["CONNECTOR_CREATE"], connector_details
        )
        return connector["data"]

    def update_connector_schema_config(self, connector_id, schemas):
        updated_schemas = self._fivetran_api_call(
            "PATCH",
            FIVETRAN_API_ENDPOINTS["CONNECTOR_SCHEMAS"].format(connector=connector_id),
            schemas,
        )
        updated_schemas["data"]

    def update_connector_table_config(
        self,
        connector_id,
        schema_name_in_destination,
        table_name_in_destination,
        table_config,
    ):
        self._fivetran_api_call(
            "PATCH",
            FIVETRAN_API_ENDPOINTS["CONNECTOR_TABLES_CONFIG"].format(
                connector=connector_id,
                schema=schema_name_in_destination,
                table=table_name_in_destination,
            ),
            table_config,
        )

    def get_group_name(self, group_id):
        group_data = self._fivetran_api_call(
            "GET", FIVETRAN_API_ENDPOINTS["GROUP_DETAILS"].format(group=group_id)
        )
        return group_data.get("data", {}).get("name", "")

    def get_service_required_fields(self, service_type):
        source_metadata = self._fivetran_api_call(
            "GET",
            FIVETRAN_API_ENDPOINTS["SOURCE_METADATA"].format(service=service_type),
        )
        return source_metadata.get("data", {}).get("config", {}).get("required", [])