dbt_coves/tasks/extract/airbyte.py
import glob
import json
import os
import pathlib
from copy import copy
from rich.console import Console
from dbt_coves.core.exceptions import MissingArgumentException
from dbt_coves.utils.api_caller import AirbyteApiCaller
from dbt_coves.utils.tracking import trackable
from .base import BaseExtractTask
# from dbt_coves.utils import airbyte_api
console = Console()
NON_EXTRACT_KEYS = ["icon", "breakingChange"]
class AirbyteExtractorException(Exception):
pass
class ExtractAirbyteTask(BaseExtractTask):
"""
Task that extracts airbyte sources, connections and destinations and stores them as json files
"""
@classmethod
def register_parser(cls, sub_parsers, base_subparser):
subparser = sub_parsers.add_parser(
"airbyte",
parents=[base_subparser],
help="""Extracts airbyte sources, connections and destinations
and stores them as json files""",
)
subparser.add_argument(
"--path",
type=str,
help="""Path where configuration json files will be created,
i.e. '/var/data/airbyte_extract/'""",
)
subparser.add_argument(
"--host",
type=str,
help="Airbyte's API hostname, i.e. 'http://airbyte-server'",
)
subparser.add_argument(
"--port",
type=str,
help="Airbyte's API port, i.e. '8001'",
)
subparser.set_defaults(cls=cls, which="airbyte")
return subparser
@trackable
def run(self):
self.extraction_results = {
"sources": set(),
"destinations": set(),
"connections": set(),
}
extract_destination = self.get_config_value("path")
airbyte_host = self.get_config_value("host")
airbyte_port = self.get_config_value("port")
if not extract_destination or not airbyte_host or not airbyte_port:
raise MissingArgumentException(["path", "host", "port"], self.coves_config)
extract_destination = pathlib.Path(extract_destination)
connections_path = extract_destination / "connections"
sources_path = extract_destination / "sources"
destinations_path = extract_destination / "destinations"
self.connections_extract_destination = os.path.abspath(connections_path)
self.destinations_extract_destination = os.path.abspath(destinations_path)
self.sources_extract_destination = os.path.abspath(sources_path)
self.airbyte_api = AirbyteApiCaller(airbyte_host, airbyte_port)
console.print(
"Extracting Airbyte's [b]Source[/b], [b]Destination[/b] and [b]Connection[/b]"
f"configurations to {os.path.abspath(extract_destination)}\n"
)
sources_path.mkdir(exist_ok=True, parents=True)
destinations_path.mkdir(exist_ok=True, parents=True)
connections_path.mkdir(exist_ok=True, parents=True)
for airbyte_source in self.airbyte_api.airbyte_sources_list:
source_id = airbyte_source["sourceId"]
source_json = self._get_airbyte_source_from_id(source_id)
self._save_json_source(source_json)
for airbyte_destination in self.airbyte_api.airbyte_destinations_list:
destination_id = airbyte_destination["destinationId"]
destination_json = self._get_airbyte_destination_from_id(destination_id)
self._save_json_destination(destination_json)
for airbyte_conn in self.airbyte_api.airbyte_connections_list:
self._save_json_connection(airbyte_conn)
if len(self.extraction_results["sources"]) >= 1:
console.print(
f"Extraction to path {extract_destination} was successful!\n"
f"[u]Sources[/u]: {self.extraction_results['sources']}\n"
f"[u]Destinations[/u]: {self.extraction_results['destinations']}\n"
f"[u]Connections[/u]: {self.extraction_results['connections']}\n"
)
else:
console.print("No Airbyte Connections were extracted")
return 0
def dbt_packages_exist(self, dbt_project_path):
return glob.glob(f"{str(dbt_project_path)}/dbt_packages")
def _get_airbyte_destination_definition_from_id(self, definition_id):
req_body = {
"destinationDefinitionId": definition_id,
"workspaceId": self.airbyte_api.airbyte_workspace_id,
}
return self.airbyte_api.api_call(
self.airbyte_api.api_endpoints["GET_OBJECTS"].format(
obj="destination_definition_specifications"
),
req_body,
)
def _get_airbyte_destination_from_id(self, destinationId):
"""
Get the complete Destination object from it's ID
"""
for destination in self.airbyte_api.airbyte_destinations_list:
if destination["destinationId"] == destinationId:
# Grab Source definition ID
destination_definition = self._get_airbyte_destination_definition_from_id(
destination["destinationDefinitionId"]
)
# Get Secret fields for source definition
airbyte_secret_fields = self._get_airbyte_secret_fields_for_definition(
destination_definition
)
# Ensure all airbyte_secret fields are effectively hidden
destination["connectionConfiguration"] = self._hide_configuration_secret_fields(
destination["connectionConfiguration"], airbyte_secret_fields
)
# Add object definition version
destination["connectorVersion"] = self._get_connector_version(
"destinationDefinitionId",
self.airbyte_api.destination_definitions,
destination_definition["destinationDefinitionId"],
)
return destination
raise AirbyteExtractorException(
"Airbyte extract error: there is no Airbyte"
f"Destination for id [red]{destinationId}[/red]"
)
def _get_connector_version(self, lookup_field, definitions_list, definition_id):
for definition in definitions_list:
if definition[lookup_field] == definition_id:
return definition["dockerImageTag"]
raise AirbyteExtractorException(f"No connector definition found for ID {definition_id}")
def _get_airbyte_source_definition_from_id(self, definition_id):
req_body = {
"sourceDefinitionId": definition_id,
"workspaceId": self.airbyte_api.airbyte_workspace_id,
}
return self.airbyte_api.api_call(
self.airbyte_api.api_endpoints["GET_OBJECTS"].format(
obj="source_definition_specifications"
),
req_body,
)
def _hide_configuration_secret_fields(self, connection_configuration, airbyte_secret_fields):
for k, v in connection_configuration.items():
if isinstance(v, dict):
self._hide_configuration_secret_fields(v, airbyte_secret_fields)
elif k in airbyte_secret_fields:
connection_configuration[k] = "**********"
return connection_configuration
def _get_airbyte_secret_fields_for_definition(
self, definition, dict_name=None, secret_fields=[]
):
try:
for k, v in definition.items():
if isinstance(v, dict):
self._get_airbyte_secret_fields_for_definition(v, k, secret_fields)
else:
if "airbyte_secret" in str(k):
if bool(definition["airbyte_secret"]) and dict_name not in secret_fields:
secret_fields.append(dict_name)
return secret_fields
except KeyError as e:
raise AirbyteExtractorException(
"There was an error searching secret fields for"
f"{definition['connectionSpecification']['title']}:"
f"{e}"
)
def _get_airbyte_source_from_id(self, source_id):
"""
Get the complete Source object from it's ID
"""
for source in self.airbyte_api.airbyte_sources_list:
if source["sourceId"] == source_id:
# Grab Source definition ID
source_definition = self._get_airbyte_source_definition_from_id(
source["sourceDefinitionId"]
)
# Get Secret fields for source definition
airbyte_secret_fields = self._get_airbyte_secret_fields_for_definition(
source_definition
)
# Ensure all airbyte_secret fields are effectively hidden
source["connectionConfiguration"] = self._hide_configuration_secret_fields(
source["connectionConfiguration"], airbyte_secret_fields
)
# Add object definition version
source["connectorVersion"] = self._get_connector_version(
"sourceDefinitionId",
self.airbyte_api.source_definitions,
source_definition["sourceDefinitionId"],
)
return source
raise AirbyteExtractorException(
f"Airbyte extract error: there is no Airbyte Source for id [red]{source_id}[/red]"
)
def _remove_unnecessary_fields(self, json_object):
json_copy = json_object.copy()
for k in json_copy.keys():
if k in NON_EXTRACT_KEYS:
del json_object[k]
return json_object
def _save_json(self, path, json_object):
json_object = self._remove_unnecessary_fields(json_object)
try:
with open(path, "w") as json_file:
json.dump(json_object, json_file, indent=4)
json_file.write("\n")
except OSError as e:
raise AirbyteExtractorException(f"Couldn't write {path}: {e}")
def _save_json_connection(self, connection: dict):
connection = copy(connection)
connection.pop("connectionId")
connection_source_name = self._get_airbyte_source_from_id(connection["sourceId"])["name"]
connection_destination_name = self._get_airbyte_destination_from_id(
connection["destinationId"]
)["name"]
# Once we used the source and destination IDs,
# they are no longer required and don't need to be saved
# Instead, they are replaced with their respective names
connection.pop("sourceId", None)
connection.pop("destinationId", None)
connection.pop("sourceCatalogId", None)
connection["sourceName"] = connection_source_name
connection["destinationName"] = connection_destination_name
filename = f"{connection_source_name}-{connection_destination_name}.json"
path = os.path.join(self.connections_extract_destination, filename)
self._save_json(path, connection)
self.extraction_results["connections"].add(filename)
def _save_json_destination(self, destination):
destination = copy(destination)
destination.pop("destinationDefinitionId", None)
destination.pop("workspaceId", None)
destination.pop("destinationId", None)
filename = f"{destination['name']}.json"
path = os.path.join(self.destinations_extract_destination, filename.lower())
self._save_json(path, destination)
self.extraction_results["destinations"].add(filename.lower())
def _save_json_source(self, source):
source = copy(source)
source.pop("sourceDefinitionId", None)
source.pop("workspaceId", None)
source.pop("sourceId", None)
filename = f"{source['name']}.json"
path = os.path.join(self.sources_extract_destination, filename.lower())
self._save_json(path, source)
self.extraction_results["sources"].add(filename.lower())
def get_config_value(self, key):
return self.coves_config.integrated["extract"]["airbyte"][key]