superset/databases/api.py
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=too-many-lines
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from io import BytesIO
from typing import Any, cast
from zipfile import is_zipfile, ZipFile
from deprecation import deprecated
from flask import make_response, render_template, request, Response, send_file
from flask_appbuilder.api import expose, protect, rison, safe
from flask_appbuilder.models.sqla.interface import SQLAInterface
from marshmallow import ValidationError
from sqlalchemy.exc import NoSuchTableError, OperationalError, SQLAlchemyError
from superset import app, event_logger
from superset.commands.database.create import CreateDatabaseCommand
from superset.commands.database.delete import DeleteDatabaseCommand
from superset.commands.database.exceptions import (
DatabaseConnectionFailedError,
DatabaseCreateFailedError,
DatabaseDeleteDatasetsExistFailedError,
DatabaseDeleteFailedError,
DatabaseInvalidError,
DatabaseNotFoundError,
DatabaseUpdateFailedError,
InvalidParametersError,
)
from superset.commands.database.export import ExportDatabasesCommand
from superset.commands.database.importers.dispatcher import ImportDatabasesCommand
from superset.commands.database.ssh_tunnel.delete import DeleteSSHTunnelCommand
from superset.commands.database.ssh_tunnel.exceptions import (
SSHTunnelDatabasePortError,
SSHTunnelDeleteFailedError,
SSHTunnelingNotEnabledError,
)
from superset.commands.database.tables import TablesDatabaseCommand
from superset.commands.database.test_connection import TestConnectionDatabaseCommand
from superset.commands.database.update import UpdateDatabaseCommand
from superset.commands.database.uploaders.base import UploadCommand
from superset.commands.database.uploaders.columnar_reader import ColumnarReader
from superset.commands.database.uploaders.csv_reader import CSVReader
from superset.commands.database.uploaders.excel_reader import ExcelReader
from superset.commands.database.validate import ValidateDatabaseParametersCommand
from superset.commands.database.validate_sql import ValidateSQLCommand
from superset.commands.importers.exceptions import (
IncorrectFormatError,
NoValidFilesFoundError,
)
from superset.commands.importers.v1.utils import get_contents_from_bundle
from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod
from superset.daos.database import DatabaseDAO, DatabaseUserOAuth2TokensDAO
from superset.databases.decorators import check_table_access
from superset.databases.filters import DatabaseFilter, DatabaseUploadEnabledFilter
from superset.databases.schemas import (
CatalogsResponseSchema,
ColumnarMetadataUploadFilePostSchema,
ColumnarUploadPostSchema,
CSVMetadataUploadFilePostSchema,
CSVUploadPostSchema,
database_catalogs_query_schema,
database_schemas_query_schema,
database_tables_query_schema,
DatabaseConnectionSchema,
DatabaseFunctionNamesResponse,
DatabasePostSchema,
DatabasePutSchema,
DatabaseRelatedObjectsResponse,
DatabaseSchemaAccessForFileUploadResponse,
DatabaseTablesResponse,
DatabaseTestConnectionSchema,
DatabaseValidateParametersSchema,
ExcelMetadataUploadFilePostSchema,
ExcelUploadPostSchema,
get_export_ids_schema,
OAuth2ProviderResponseSchema,
openapi_spec_methods_override,
QualifiedTableSchema,
SchemasResponseSchema,
SelectStarResponseSchema,
TableExtraMetadataResponseSchema,
TableMetadataResponseSchema,
UploadFileMetadata,
ValidateSQLRequest,
ValidateSQLResponse,
)
from superset.databases.utils import get_table_metadata
from superset.db_engine_specs import get_available_engine_specs
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import (
DatabaseNotFoundException,
InvalidPayloadSchemaError,
OAuth2Error,
OAuth2RedirectError,
SupersetErrorsException,
SupersetException,
SupersetSecurityException,
TableNotFoundException,
)
from superset.extensions import security_manager
from superset.models.core import Database
from superset.sql_parse import Table
from superset.superset_typing import FlaskResponse
from superset.utils import json
from superset.utils.core import error_msg_from_exception, parse_js_uri_path_item
from superset.utils.decorators import transaction
from superset.utils.oauth2 import decode_oauth2_state
from superset.utils.ssh_tunnel import mask_password_info
from superset.views.base_api import (
BaseSupersetModelRestApi,
RelatedFieldFilter,
requires_form_data,
requires_json,
statsd_metrics,
)
from superset.views.error_handling import handle_api_exception, json_error_response
from superset.views.filters import BaseFilterRelatedUsers, FilterRelatedOwners
logger = logging.getLogger(__name__)
# pylint: disable=too-many-public-methods
class DatabaseRestApi(BaseSupersetModelRestApi):
datamodel = SQLAInterface(Database)
include_route_methods = RouteMethod.REST_MODEL_VIEW_CRUD_SET | {
RouteMethod.EXPORT,
RouteMethod.IMPORT,
RouteMethod.RELATED,
"tables",
"table_metadata",
"table_metadata_deprecated",
"table_extra_metadata",
"table_extra_metadata_deprecated",
"select_star",
"catalogs",
"schemas",
"test_connection",
"related_objects",
"function_names",
"available",
"validate_parameters",
"validate_sql",
"delete_ssh_tunnel",
"schemas_access_for_file_upload",
"get_connection",
"csv_upload",
"csv_metadata",
"excel_upload",
"excel_metadata",
"columnar_upload",
"columnar_metadata",
"oauth2",
}
resource_name = "database"
class_permission_name = "Database"
method_permission_name = MODEL_API_RW_METHOD_PERMISSION_MAP
allow_browser_login = True
base_filters = [["id", DatabaseFilter, lambda: []]]
show_columns = [
"id",
"uuid",
"database_name",
"cache_timeout",
"expose_in_sqllab",
"allow_run_async",
"allow_file_upload",
"configuration_method",
"allow_ctas",
"allow_cvas",
"allow_dml",
"backend",
"driver",
"force_ctas_schema",
"impersonate_user",
"is_managed_externally",
"engine_information",
]
list_columns = [
"allow_file_upload",
"allow_ctas",
"allow_cvas",
"allow_dml",
"allow_run_async",
"allows_cost_estimate",
"allows_subquery",
"allows_virtual_table_explore",
"backend",
"changed_on",
"changed_on_delta_humanized",
"changed_by.first_name",
"changed_by.last_name",
"created_by.first_name",
"created_by.last_name",
"database_name",
"explore_database_id",
"expose_in_sqllab",
"extra",
"force_ctas_schema",
"id",
"uuid",
"disable_data_preview",
"disable_drill_to_detail",
"allow_multi_catalog",
"engine_information",
]
add_columns = [
"database_name",
"sqlalchemy_uri",
"cache_timeout",
"expose_in_sqllab",
"allow_run_async",
"allow_file_upload",
"allow_ctas",
"allow_cvas",
"allow_dml",
"configuration_method",
"force_ctas_schema",
"impersonate_user",
"extra",
"encrypted_extra",
"server_cert",
]
edit_columns = add_columns
search_columns = [
"allow_file_upload",
"allow_dml",
"allow_run_async",
"created_by",
"changed_by",
"database_name",
"expose_in_sqllab",
"uuid",
]
search_filters = {"allow_file_upload": [DatabaseUploadEnabledFilter]}
allowed_rel_fields = {"changed_by", "created_by"}
list_select_columns = list_columns + ["extra", "sqlalchemy_uri", "password"]
order_columns = [
"allow_file_upload",
"allow_dml",
"allow_run_async",
"changed_on",
"changed_on_delta_humanized",
"created_by.first_name",
"database_name",
"expose_in_sqllab",
]
# Removes the local limit for the page size
max_page_size = -1
add_model_schema = DatabasePostSchema()
edit_model_schema = DatabasePutSchema()
apispec_parameter_schemas = {
"database_catalogs_query_schema": database_catalogs_query_schema,
"database_schemas_query_schema": database_schemas_query_schema,
"database_tables_query_schema": database_tables_query_schema,
"get_export_ids_schema": get_export_ids_schema,
}
openapi_spec_tag = "Database"
openapi_spec_component_schemas = (
CatalogsResponseSchema,
ColumnarUploadPostSchema,
CSVUploadPostSchema,
DatabaseConnectionSchema,
DatabaseFunctionNamesResponse,
DatabaseSchemaAccessForFileUploadResponse,
DatabaseRelatedObjectsResponse,
DatabaseTablesResponse,
DatabaseTestConnectionSchema,
DatabaseValidateParametersSchema,
ExcelUploadPostSchema,
TableExtraMetadataResponseSchema,
TableMetadataResponseSchema,
SelectStarResponseSchema,
SchemasResponseSchema,
CSVMetadataUploadFilePostSchema,
ExcelMetadataUploadFilePostSchema,
ColumnarMetadataUploadFilePostSchema,
UploadFileMetadata,
ValidateSQLRequest,
ValidateSQLResponse,
)
openapi_spec_methods = openapi_spec_methods_override
""" Overrides GET methods OpenApi descriptions """
related_field_filters = {
"changed_by": RelatedFieldFilter("first_name", FilterRelatedOwners),
}
base_related_field_filters = {
"changed_by": [["id", BaseFilterRelatedUsers, lambda: []]],
}
@expose("/<int:pk>/connection", methods=("GET",))
@protect()
@safe
def get_connection(self, pk: int) -> Response:
"""Get database connection info.
---
get:
summary: Get a database connection info
parameters:
- in: path
schema:
type: integer
description: The database id
name: pk
responses:
200:
description: Database with connection info
content:
application/json:
schema:
$ref: "#/components/schemas/DatabaseConnectionSchema"
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
database = DatabaseDAO.find_by_id(pk)
database_connection_schema = DatabaseConnectionSchema()
response = {
"id": pk,
"result": database_connection_schema.dump(database, many=False),
}
try:
if ssh_tunnel := DatabaseDAO.get_ssh_tunnel(pk):
response["result"]["ssh_tunnel"] = ssh_tunnel.data
return self.response(200, **response)
except SupersetException as ex:
return self.response(ex.status, message=ex.message)
@expose("/<int:pk>", methods=("GET",))
@protect()
@safe
def get(self, pk: int, **kwargs: Any) -> Response:
"""Get a database.
---
get:
summary: Get a database
parameters:
- in: path
schema:
type: integer
description: The database id
name: pk
responses:
200:
description: Database
content:
application/json:
schema:
type: object
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
data = self.get_headless(pk, **kwargs)
try:
if ssh_tunnel := DatabaseDAO.get_ssh_tunnel(pk):
payload = data.json
payload["result"]["ssh_tunnel"] = ssh_tunnel.data
return payload
return data
except SupersetException as ex:
return self.response(ex.status, message=ex.message)
@expose("/", methods=("POST",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.post",
log_to_statsd=False,
)
@requires_json
def post(self) -> FlaskResponse:
"""Create a new database.
---
post:
summary: Create a new database
requestBody:
description: Database schema
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/{{self.__class__.__name__}}.post'
responses:
201:
description: Database added
content:
application/json:
schema:
type: object
properties:
id:
type: number
result:
$ref: '#/components/schemas/{{self.__class__.__name__}}.post'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
try:
item = self.add_model_schema.load(request.json)
# This validates custom Schema with custom validations
except ValidationError as error:
return self.response_400(message=error.messages)
try:
new_model = CreateDatabaseCommand(item).run()
item["uuid"] = new_model.uuid
# Return censored version for sqlalchemy URI
item["sqlalchemy_uri"] = new_model.sqlalchemy_uri
item["expose_in_sqllab"] = new_model.expose_in_sqllab
# If parameters are available return them in the payload
if new_model.parameters:
item["parameters"] = new_model.parameters
if new_model.driver:
item["driver"] = new_model.driver
# Return SSH Tunnel and hide passwords if any
if item.get("ssh_tunnel"):
item["ssh_tunnel"] = mask_password_info(new_model.ssh_tunnel)
return self.response(201, id=new_model.id, result=item)
except OAuth2RedirectError:
raise
except DatabaseInvalidError as ex:
return self.response_422(message=ex.normalized_messages())
except DatabaseConnectionFailedError as ex:
return self.response_422(message=str(ex))
except SupersetErrorsException as ex:
return json_error_response(ex.errors, status=ex.status)
except DatabaseCreateFailedError as ex:
logger.error(
"Error creating model %s: %s",
self.__class__.__name__,
str(ex),
exc_info=True,
)
return self.response_422(message=str(ex))
except (SSHTunnelingNotEnabledError, SSHTunnelDatabasePortError) as ex:
return self.response_400(message=str(ex))
except SupersetException as ex:
return self.response(ex.status, message=ex.message)
@expose("/<int:pk>", methods=("PUT",))
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.put",
log_to_statsd=False,
)
@requires_json
def put(self, pk: int) -> Response:
"""Update a database.
---
put:
summary: Change a database
parameters:
- in: path
schema:
type: integer
name: pk
requestBody:
description: Database schema
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/{{self.__class__.__name__}}.put'
responses:
200:
description: Database changed
content:
application/json:
schema:
type: object
properties:
id:
type: number
result:
$ref: '#/components/schemas/{{self.__class__.__name__}}.put'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
403:
$ref: '#/components/responses/403'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
try:
item = self.edit_model_schema.load(request.json)
# This validates custom Schema with custom validations
except ValidationError as error:
return self.response_400(message=error.messages)
try:
changed_model = UpdateDatabaseCommand(pk, item).run()
# Return censored version for sqlalchemy URI
item["sqlalchemy_uri"] = changed_model.sqlalchemy_uri
if changed_model.parameters:
item["parameters"] = changed_model.parameters
# Return SSH Tunnel and hide passwords if any
if item.get("ssh_tunnel"):
item["ssh_tunnel"] = mask_password_info(changed_model.ssh_tunnel)
return self.response(200, id=changed_model.id, result=item)
except DatabaseNotFoundError:
return self.response_404()
except DatabaseInvalidError as ex:
return self.response_422(message=ex.normalized_messages())
except DatabaseConnectionFailedError as ex:
return self.response_422(message=str(ex))
except DatabaseUpdateFailedError as ex:
logger.error(
"Error updating model %s: %s",
self.__class__.__name__,
str(ex),
exc_info=True,
)
return self.response_422(message=str(ex))
except (SSHTunnelingNotEnabledError, SSHTunnelDatabasePortError) as ex:
return self.response_400(message=str(ex))
@expose("/<int:pk>", methods=("DELETE",))
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}" f".delete",
log_to_statsd=False,
)
def delete(self, pk: int) -> Response:
"""Delete a database.
---
delete:
summary: Delete a database
parameters:
- in: path
schema:
type: integer
name: pk
responses:
200:
description: Database deleted
content:
application/json:
schema:
type: object
properties:
message:
type: string
401:
$ref: '#/components/responses/401'
403:
$ref: '#/components/responses/403'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
try:
DeleteDatabaseCommand(pk).run()
return self.response(200, message="OK")
except DatabaseNotFoundError:
return self.response_404()
except DatabaseDeleteDatasetsExistFailedError as ex:
return self.response_422(message=str(ex))
except DatabaseDeleteFailedError as ex:
logger.error(
"Error deleting model %s: %s",
self.__class__.__name__,
str(ex),
exc_info=True,
)
return self.response_422(message=str(ex))
@expose("/<int:pk>/catalogs/")
@protect()
@rison(database_catalogs_query_schema)
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}" f".catalogs",
log_to_statsd=False,
)
def catalogs(self, pk: int, **kwargs: Any) -> FlaskResponse:
"""Get all catalogs from a database.
---
get:
summary: Get all catalogs from a database
parameters:
- in: path
schema:
type: integer
name: pk
description: The database id
- in: query
name: q
content:
application/json:
schema:
$ref: '#/components/schemas/database_catalogs_query_schema'
responses:
200:
description: A List of all catalogs from the database
content:
application/json:
schema:
$ref: "#/components/schemas/CatalogsResponseSchema"
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
database = DatabaseDAO.find_by_id(pk)
if not database:
return self.response_404()
try:
catalogs = database.get_all_catalog_names(
cache=database.catalog_cache_enabled,
cache_timeout=database.catalog_cache_timeout or None,
force=kwargs["rison"].get("force", False),
)
catalogs = security_manager.get_catalogs_accessible_by_user(
database,
catalogs,
)
return self.response(200, result=list(catalogs))
except OperationalError:
return self.response(
500,
message="There was an error connecting to the database",
)
except OAuth2RedirectError:
raise
except SupersetException as ex:
return self.response(ex.status, message=ex.message)
@expose("/<int:pk>/schemas/")
@protect()
@rison(database_schemas_query_schema)
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}" f".schemas",
log_to_statsd=False,
)
def schemas(self, pk: int, **kwargs: Any) -> FlaskResponse:
"""Get all schemas from a database.
---
get:
summary: Get all schemas from a database
parameters:
- in: path
schema:
type: integer
name: pk
description: The database id
- in: query
name: q
content:
application/json:
schema:
$ref: '#/components/schemas/database_schemas_query_schema'
responses:
200:
description: A List of all schemas from the database
content:
application/json:
schema:
$ref: "#/components/schemas/SchemasResponseSchema"
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
database = self.datamodel.get(pk, self._base_filters)
if not database:
return self.response_404()
try:
catalog = kwargs["rison"].get("catalog")
schemas = database.get_all_schema_names(
catalog=catalog,
cache=database.schema_cache_enabled,
cache_timeout=database.schema_cache_timeout or None,
force=kwargs["rison"].get("force", False),
)
schemas = security_manager.get_schemas_accessible_by_user(
database,
catalog,
schemas,
)
return self.response(200, result=list(schemas))
except OperationalError:
return self.response(
500, message="There was an error connecting to the database"
)
except OAuth2RedirectError:
raise
except SupersetException as ex:
return self.response(ex.status, message=ex.message)
@expose("/<int:pk>/tables/")
@protect()
@rison(database_tables_query_schema)
@statsd_metrics
@handle_api_exception
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}" f".tables",
log_to_statsd=False,
)
def tables(self, pk: int, **kwargs: Any) -> FlaskResponse:
"""Get a list of tables for given database.
---
get:
summary: Get a list of tables for given database
parameters:
- in: path
schema:
type: integer
name: pk
description: The database id
- in: query
name: q
content:
application/json:
schema:
$ref: '#/components/schemas/database_tables_query_schema'
responses:
200:
description: Tables list
content:
application/json:
schema:
type: object
properties:
count:
type: integer
result:
description: >-
A List of tables for given database
type: array
items:
$ref: '#/components/schemas/DatabaseTablesResponse'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
force = kwargs["rison"].get("force", False)
catalog_name = kwargs["rison"].get("catalog_name")
schema_name = kwargs["rison"].get("schema_name", "")
command = TablesDatabaseCommand(pk, catalog_name, schema_name, force)
payload = command.run()
return self.response(200, **payload)
@expose("/<int:pk>/table/<path:table_name>/<schema_name>/", methods=("GET",))
@protect()
@check_table_access
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".table_metadata_deprecated",
log_to_statsd=False,
)
def table_metadata_deprecated(
self, database: Database, table_name: str, schema_name: str
) -> FlaskResponse:
"""Get database table metadata.
---
get:
summary: Get database table metadata
parameters:
- in: path
schema:
type: integer
name: pk
description: The database id
- in: path
schema:
type: string
name: table_name
description: Table name
- in: path
schema:
type: string
name: schema_name
description: Table schema
responses:
200:
description: Table metadata information
content:
application/json:
schema:
$ref: "#/components/schemas/TableMetadataResponseSchema"
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
self.incr_stats("init", self.table_metadata_deprecated.__name__)
try:
table_info = get_table_metadata(database, Table(table_name, schema_name))
except SQLAlchemyError as ex:
self.incr_stats("error", self.table_metadata_deprecated.__name__)
return self.response_422(error_msg_from_exception(ex))
except SupersetException as ex:
return self.response(ex.status, message=ex.message)
self.incr_stats("success", self.table_metadata_deprecated.__name__)
return self.response(200, **table_info)
@expose("/<int:pk>/table_extra/<path:table_name>/<schema_name>/", methods=("GET",))
@protect()
@check_table_access
@safe
@statsd_metrics
@deprecated(deprecated_in="4.0", removed_in="5.0")
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".table_extra_metadata_deprecated",
log_to_statsd=False,
)
def table_extra_metadata_deprecated(
self, database: Database, table_name: str, schema_name: str
) -> FlaskResponse:
"""Get table extra metadata.
A newer API was introduced between 4.0 and 5.0, with support for catalogs for
SIP-95. This method was kept to prevent breaking API integrations, but will be
removed in 5.0.
---
get:
summary: Get table extra metadata
description: >-
Response depends on each DB engine spec normally focused on partitions.
parameters:
- in: path
schema:
type: integer
name: pk
description: The database id
- in: path
schema:
type: string
name: table_name
description: Table name
- in: path
schema:
type: string
name: schema_name
description: Table schema
responses:
200:
description: Table extra metadata information
content:
application/json:
schema:
$ref: "#/components/schemas/TableExtraMetadataResponseSchema"
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
self.incr_stats("init", self.table_extra_metadata_deprecated.__name__)
parsed_schema = parse_js_uri_path_item(schema_name, eval_undefined=True)
table_name = cast(str, parse_js_uri_path_item(table_name))
table = Table(table_name, parsed_schema)
payload = database.db_engine_spec.get_extra_table_metadata(database, table)
return self.response(200, **payload)
@expose("/<int:pk>/table_metadata/", methods=["GET"])
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".table_metadata",
log_to_statsd=False,
)
def table_metadata(self, pk: int) -> FlaskResponse:
"""
Get metadata for a given table.
Optionally, a schema and a catalog can be passed, if different from the default
ones.
---
get:
summary: Get table metadata
description: >-
Metadata associated with the table (columns, indexes, etc.)
parameters:
- in: path
schema:
type: integer
name: pk
description: The database id
- in: query
schema:
type: string
name: table
required: true
description: Table name
- in: query
schema:
type: string
name: schema
description: >-
Optional table schema, if not passed default schema will be used
- in: query
schema:
type: string
name: catalog
description: >-
Optional table catalog, if not passed default catalog will be used
responses:
200:
description: Table metadata information
content:
application/json:
schema:
$ref: "#/components/schemas/TableExtraMetadataResponseSchema"
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
self.incr_stats("init", self.table_metadata.__name__)
database = DatabaseDAO.find_by_id(pk)
if database is None:
raise DatabaseNotFoundException("No such database")
try:
parameters = QualifiedTableSchema().load(request.args)
except ValidationError as ex:
raise InvalidPayloadSchemaError(ex) from ex
table = Table(parameters["name"], parameters["schema"], parameters["catalog"])
try:
security_manager.raise_for_access(database=database, table=table)
except SupersetSecurityException as ex:
# instead of raising 403, raise 404 to hide table existence
raise TableNotFoundException("No such table") from ex
payload = database.db_engine_spec.get_table_metadata(database, table)
return self.response(200, **payload)
@expose("/<int:pk>/table_metadata/extra/", methods=["GET"])
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".table_extra_metadata",
log_to_statsd=False,
)
def table_extra_metadata(self, pk: int) -> FlaskResponse:
"""
Get extra metadata for a given table.
Optionally, a schema and a catalog can be passed, if different from the default
ones.
---
get:
summary: Get table extra metadata
description: >-
Extra metadata associated with the table (partitions, description, etc.)
parameters:
- in: path
schema:
type: integer
name: pk
description: The database id
- in: query
schema:
type: string
name: name
required: true
description: Table name
- in: query
schema:
type: string
name: schema
description: >-
Optional table schema, if not passed the schema configured in the database
will be used
- in: query
schema:
type: string
name: catalog
description: >-
Optional table catalog, if not passed the catalog configured in the
database will be used
responses:
200:
description: Table extra metadata information
content:
application/json:
schema:
$ref: "#/components/schemas/TableExtraMetadataResponseSchema"
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
self.incr_stats("init", self.table_extra_metadata.__name__)
if not (database := DatabaseDAO.find_by_id(pk)):
raise DatabaseNotFoundException("No such database")
try:
parameters = QualifiedTableSchema().load(request.args)
except ValidationError as ex:
raise InvalidPayloadSchemaError(ex) from ex
table = Table(parameters["name"], parameters["schema"], parameters["catalog"])
try:
security_manager.raise_for_access(database=database, table=table)
except SupersetSecurityException as ex:
# instead of raising 403, raise 404 to hide table existence
raise TableNotFoundException("No such table") from ex
payload = database.db_engine_spec.get_extra_table_metadata(database, table)
return self.response(200, **payload)
@expose("/<int:pk>/select_star/<path:table_name>/", methods=("GET",))
@expose("/<int:pk>/select_star/<path:table_name>/<schema_name>/", methods=("GET",))
@protect()
@check_table_access
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.select_star",
log_to_statsd=False,
)
def select_star(
self, database: Database, table_name: str, schema_name: str | None = None
) -> FlaskResponse:
"""Get database select star for table.
---
get:
summary: Get database select star for table
parameters:
- in: path
schema:
type: integer
name: pk
description: The database id
- in: path
schema:
type: string
name: table_name
description: Table name
- in: path
schema:
type: string
name: schema_name
description: Table schema
responses:
200:
description: SQL statement for a select star for table
content:
application/json:
schema:
$ref: "#/components/schemas/SelectStarResponseSchema"
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
self.incr_stats("init", self.select_star.__name__)
try:
result = database.select_star(
Table(table_name, schema_name, database.get_default_catalog()),
latest_partition=True,
)
except NoSuchTableError:
self.incr_stats("error", self.select_star.__name__)
return self.response(404, message="Table not found on the database")
self.incr_stats("success", self.select_star.__name__)
return self.response(200, result=result)
@expose("/test_connection/", methods=("POST",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".test_connection",
log_to_statsd=False,
)
@requires_json
def test_connection(self) -> FlaskResponse:
"""Test a database connection.
---
post:
summary: Test a database connection
requestBody:
description: Database schema
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/DatabaseTestConnectionSchema"
responses:
200:
description: Database Test Connection
content:
application/json:
schema:
type: object
properties:
message:
type: string
400:
$ref: '#/components/responses/400'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
try:
item = DatabaseTestConnectionSchema().load(request.json)
# This validates custom Schema with custom validations
except ValidationError as error:
return self.response_400(message=error.messages)
try:
TestConnectionDatabaseCommand(item).run()
return self.response(200, message="OK")
except (SSHTunnelingNotEnabledError, SSHTunnelDatabasePortError) as ex:
return self.response_400(message=str(ex))
@expose("/<int:pk>/related_objects/", methods=("GET",))
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".related_objects",
log_to_statsd=False,
)
def related_objects(self, pk: int) -> Response:
"""Get charts and dashboards count associated to a database.
---
get:
summary: Get charts and dashboards count associated to a database
parameters:
- in: path
name: pk
schema:
type: integer
responses:
200:
description: Query result
content:
application/json:
schema:
$ref: "#/components/schemas/DatabaseRelatedObjectsResponse"
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
database = DatabaseDAO.find_by_id(pk)
if not database:
return self.response_404()
data = DatabaseDAO.get_related_objects(pk)
charts = [
{
"id": chart.id,
"slice_name": chart.slice_name,
"viz_type": chart.viz_type,
}
for chart in data["charts"]
]
dashboards = [
{
"id": dashboard.id,
"json_metadata": dashboard.json_metadata,
"slug": dashboard.slug,
"title": dashboard.dashboard_title,
}
for dashboard in data["dashboards"]
]
sqllab_tab_states = [
{"id": tab_state.id, "label": tab_state.label, "active": tab_state.active}
for tab_state in data["sqllab_tab_states"]
]
return self.response(
200,
charts={"count": len(charts), "result": charts},
dashboards={"count": len(dashboards), "result": dashboards},
sqllab_tab_states={
"count": len(sqllab_tab_states),
"result": sqllab_tab_states,
},
)
@expose("/<int:pk>/validate_sql/", methods=("POST",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.validate_sql",
log_to_statsd=False,
)
def validate_sql(self, pk: int) -> FlaskResponse:
"""Validate that arbitrary SQL is acceptable for the given database.
---
post:
summary: Validate arbitrary SQL
description: >-
Validates that arbitrary SQL is acceptable for the given database.
parameters:
- in: path
schema:
type: integer
name: pk
requestBody:
description: Validate SQL request
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ValidateSQLRequest'
responses:
200:
description: Validation result
content:
application/json:
schema:
type: object
properties:
result:
description: >-
A List of SQL errors found on the statement
type: array
items:
$ref: '#/components/schemas/ValidateSQLResponse'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
try:
sql_request = ValidateSQLRequest().load(request.json)
except ValidationError as error:
return self.response_400(message=error.messages)
try:
validator_errors = ValidateSQLCommand(pk, sql_request).run()
return self.response(200, result=validator_errors)
except DatabaseNotFoundError:
return self.response_404()
@expose("/oauth2/", methods=["GET"])
@transaction()
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.oauth2",
log_to_statsd=True,
)
def oauth2(self) -> FlaskResponse:
"""
---
get:
summary: >-
Receive personal access tokens from OAuth2
description: ->
Receive and store personal access tokens from OAuth for user-level
authorization
parameters:
- in: query
name: state
schema:
type: string
- in: query
name: code
schema:
type: string
- in: query
name: scope
schema:
type: string
- in: query
name: error
schema:
type: string
responses:
200:
description: A dummy self-closing HTML page
content:
text/html:
schema:
type: string
400:
$ref: '#/components/responses/400'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
parameters = OAuth2ProviderResponseSchema().load(request.args)
if "error" in parameters:
raise OAuth2Error(parameters["error"])
# note that when decoding the state we will perform JWT validation, preventing a
# malicious payload that would insert a bogus database token, or delete an
# existing one.
state = decode_oauth2_state(parameters["state"])
# exchange code for access/refresh tokens
database = DatabaseDAO.find_by_id(state["database_id"])
if database is None:
return self.response_404()
oauth2_config = database.get_oauth2_config()
if oauth2_config is None:
raise OAuth2Error("No configuration found for OAuth2")
token_response = database.db_engine_spec.get_oauth2_token(
oauth2_config,
parameters["code"],
)
# delete old tokens
existing = DatabaseUserOAuth2TokensDAO.find_one_or_none(
user_id=state["user_id"],
database_id=state["database_id"],
)
if existing:
DatabaseUserOAuth2TokensDAO.delete([existing])
# store tokens
expiration = datetime.now() + timedelta(seconds=token_response["expires_in"])
DatabaseUserOAuth2TokensDAO.create(
attributes={
"user_id": state["user_id"],
"database_id": state["database_id"],
"access_token": token_response["access_token"],
"access_token_expiration": expiration,
"refresh_token": token_response.get("refresh_token"),
},
)
# return blank page that closes itself
return make_response(
render_template("superset/oauth2.html", tab_id=state["tab_id"]),
200,
)
@expose("/export/", methods=("GET",))
@protect()
@safe
@statsd_metrics
@rison(get_export_ids_schema)
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.export",
log_to_statsd=False,
)
def export(self, **kwargs: Any) -> Response:
"""Download database(s) and associated dataset(s) as a zip file.
---
get:
summary: Download database(s) and associated dataset(s) as a zip file
parameters:
- in: query
name: q
content:
application/json:
schema:
$ref: '#/components/schemas/get_export_ids_schema'
responses:
200:
description: A zip file with database(s) and dataset(s) as YAML
content:
application/zip:
schema:
type: string
format: binary
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
requested_ids = kwargs["rison"]
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
root = f"database_export_{timestamp}"
filename = f"{root}.zip"
buf = BytesIO()
with ZipFile(buf, "w") as bundle:
try:
for file_name, file_content in ExportDatabasesCommand(
requested_ids
).run():
with bundle.open(f"{root}/{file_name}", "w") as fp:
fp.write(file_content().encode())
except DatabaseNotFoundError:
return self.response_404()
buf.seek(0)
response = send_file(
buf,
mimetype="application/zip",
as_attachment=True,
download_name=filename,
)
if token := request.args.get("token"):
response.set_cookie(token, "done", max_age=600)
return response
@expose("/import/", methods=("POST",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.import_",
log_to_statsd=False,
)
@requires_form_data
def import_(self) -> Response:
"""Import database(s) with associated datasets.
---
post:
summary: Import database(s) with associated datasets
requestBody:
required: true
content:
multipart/form-data:
schema:
type: object
properties:
formData:
description: upload file (ZIP)
type: string
format: binary
passwords:
description: >-
JSON map of passwords for each featured database in the
ZIP file. If the ZIP includes a database config in the path
`databases/MyDatabase.yaml`, the password should be provided
in the following format:
`{"databases/MyDatabase.yaml": "my_password"}`.
type: string
overwrite:
description: overwrite existing databases?
type: boolean
ssh_tunnel_passwords:
description: >-
JSON map of passwords for each ssh_tunnel associated to a
featured database in the ZIP file. If the ZIP includes a
ssh_tunnel config in the path `databases/MyDatabase.yaml`,
the password should be provided in the following format:
`{"databases/MyDatabase.yaml": "my_password"}`.
type: string
ssh_tunnel_private_keys:
description: >-
JSON map of private_keys for each ssh_tunnel associated to a
featured database in the ZIP file. If the ZIP includes a
ssh_tunnel config in the path `databases/MyDatabase.yaml`,
the private_key should be provided in the following format:
`{"databases/MyDatabase.yaml": "my_private_key"}`.
type: string
ssh_tunnel_private_key_passwords:
description: >-
JSON map of private_key_passwords for each ssh_tunnel associated
to a featured database in the ZIP file. If the ZIP includes a
ssh_tunnel config in the path `databases/MyDatabase.yaml`,
the private_key should be provided in the following format:
`{"databases/MyDatabase.yaml": "my_private_key_password"}`.
type: string
responses:
200:
description: Database import result
content:
application/json:
schema:
type: object
properties:
message:
type: string
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
upload = request.files.get("formData")
if not upload:
return self.response_400()
if not is_zipfile(upload):
raise IncorrectFormatError("Not a ZIP file")
with ZipFile(upload) as bundle:
contents = get_contents_from_bundle(bundle)
if not contents:
raise NoValidFilesFoundError()
passwords = (
json.loads(request.form["passwords"])
if "passwords" in request.form
else None
)
overwrite = request.form.get("overwrite") == "true"
ssh_tunnel_passwords = (
json.loads(request.form["ssh_tunnel_passwords"])
if "ssh_tunnel_passwords" in request.form
else None
)
ssh_tunnel_private_keys = (
json.loads(request.form["ssh_tunnel_private_keys"])
if "ssh_tunnel_private_keys" in request.form
else None
)
ssh_tunnel_priv_key_passwords = (
json.loads(request.form["ssh_tunnel_private_key_passwords"])
if "ssh_tunnel_private_key_passwords" in request.form
else None
)
command = ImportDatabasesCommand(
contents,
passwords=passwords,
overwrite=overwrite,
ssh_tunnel_passwords=ssh_tunnel_passwords,
ssh_tunnel_private_keys=ssh_tunnel_private_keys,
ssh_tunnel_priv_key_passwords=ssh_tunnel_priv_key_passwords,
)
command.run()
return self.response(200, message="OK")
@expose("/csv_metadata/", methods=("POST",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=(
lambda self, *args, **kwargs: f"{self.__class__.__name__}" ".csv_metadata"
),
log_to_statsd=False,
)
@requires_form_data
def csv_metadata(self) -> Response:
"""Upload an CSV file and returns file metadata.
---
post:
summary: Upload an CSV file and returns file metadata
requestBody:
required: true
content:
multipart/form-data:
schema:
$ref: '#/components/schemas/CSVMetadataUploadFilePostSchema'
responses:
200:
description: Columnar upload response
content:
application/json:
schema:
type: object
properties:
result:
$ref: '#/components/schemas/UploadFileMetadata'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
try:
request_form = request.form.to_dict()
request_form["file"] = request.files.get("file")
parameters = CSVMetadataUploadFilePostSchema().load(request_form)
except ValidationError as error:
return self.response_400(message=error.messages)
metadata = CSVReader(parameters).file_metadata(parameters["file"])
return self.response(200, result=UploadFileMetadata().dump(metadata))
@expose("/<int:pk>/csv_upload/", methods=("POST",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.csv_upload",
log_to_statsd=False,
)
@requires_form_data
def csv_upload(self, pk: int) -> Response:
"""Upload a CSV file into a database.
---
post:
summary: Upload a CSV file to a database table
parameters:
- in: path
schema:
type: integer
name: pk
requestBody:
required: true
content:
multipart/form-data:
schema:
$ref: '#/components/schemas/CSVUploadPostSchema'
responses:
201:
description: CSV upload response
content:
application/json:
schema:
type: object
properties:
message:
type: string
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
try:
request_form = request.form.to_dict()
request_form["file"] = request.files.get("file")
parameters = CSVUploadPostSchema().load(request_form)
UploadCommand(
pk,
parameters["table_name"],
parameters["file"],
parameters.get("schema"),
CSVReader(parameters),
).run()
except ValidationError as error:
return self.response_400(message=error.messages)
return self.response(201, message="OK")
@expose("/excel_metadata/", methods=("POST",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=(
lambda self, *args, **kwargs: f"{self.__class__.__name__}" ".excel_metadata"
),
log_to_statsd=False,
)
@requires_form_data
def excel_metadata(self) -> Response:
"""Upload an Excel file and returns file metadata.
---
post:
summary: Upload an Excel file and returns file metadata
requestBody:
required: true
content:
multipart/form-data:
schema:
$ref: '#/components/schemas/ExcelMetadataUploadFilePostSchema'
responses:
200:
description: Columnar upload response
content:
application/json:
schema:
type: object
properties:
result:
$ref: '#/components/schemas/UploadFileMetadata'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
try:
request_form = request.form.to_dict()
request_form["file"] = request.files.get("file")
parameters = ExcelMetadataUploadFilePostSchema().load(request_form)
except ValidationError as error:
return self.response_400(message=error.messages)
metadata = ExcelReader().file_metadata(parameters["file"])
return self.response(200, result=UploadFileMetadata().dump(metadata))
@expose("/<int:pk>/excel_upload/", methods=("POST",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.excel_upload",
log_to_statsd=False,
)
@requires_form_data
def excel_upload(self, pk: int) -> Response:
"""Upload an Excel file into a database.
---
post:
summary: Upload an Excel file to a database table
parameters:
- in: path
schema:
type: integer
name: pk
requestBody:
required: true
content:
multipart/form-data:
schema:
$ref: '#/components/schemas/ExcelUploadPostSchema'
responses:
201:
description: Excel upload response
content:
application/json:
schema:
type: object
properties:
message:
type: string
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
try:
request_form = request.form.to_dict()
request_form["file"] = request.files.get("file")
parameters = ExcelUploadPostSchema().load(request_form)
UploadCommand(
pk,
parameters["table_name"],
parameters["file"],
parameters.get("schema"),
ExcelReader(parameters),
).run()
except ValidationError as error:
return self.response_400(message=error.messages)
return self.response(201, message="OK")
@expose("/columnar_metadata/", methods=("POST",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
".columnar_metadata",
log_to_statsd=False,
)
@requires_form_data
def columnar_metadata(self) -> Response:
"""Upload a Columnar file and returns file metadata.
---
post:
summary: Upload a Columnar file and returns file metadata
requestBody:
required: true
content:
multipart/form-data:
schema:
$ref: '#/components/schemas/ColumnarMetadataUploadFilePostSchema'
responses:
200:
description: Columnar upload response
content:
application/json:
schema:
type: object
properties:
result:
$ref: '#/components/schemas/UploadFileMetadata'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
try:
request_form = request.form.to_dict()
request_form["file"] = request.files.get("file")
parameters = ColumnarMetadataUploadFilePostSchema().load(request_form)
except ValidationError as error:
return self.response_400(message=error.messages)
metadata = ColumnarReader().file_metadata(parameters["file"])
return self.response(200, result=UploadFileMetadata().dump(metadata))
@expose("/<int:pk>/columnar_upload/", methods=("POST",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self,
*args,
**kwargs: f"{self.__class__.__name__}.columnar_upload",
log_to_statsd=False,
)
@requires_form_data
def columnar_upload(self, pk: int) -> Response:
"""Upload a Columnar file into a database.
---
post:
summary: Upload a Columnar file to a database table
parameters:
- in: path
schema:
type: integer
name: pk
requestBody:
required: true
content:
multipart/form-data:
schema:
$ref: '#/components/schemas/ColumnarUploadPostSchema'
responses:
201:
description: Columnar upload response
content:
application/json:
schema:
type: object
properties:
message:
type: string
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
try:
request_form = request.form.to_dict()
request_form["file"] = request.files.get("file")
parameters = ColumnarUploadPostSchema().load(request_form)
UploadCommand(
pk,
parameters["table_name"],
parameters["file"],
parameters.get("schema"),
ColumnarReader(parameters),
).run()
except ValidationError as error:
return self.response_400(message=error.messages)
return self.response(201, message="OK")
@expose("/<int:pk>/function_names/", methods=("GET",))
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".function_names",
log_to_statsd=False,
)
def function_names(self, pk: int) -> Response:
"""Get function names supported by a database.
---
get:
summary: Get function names supported by a database
parameters:
- in: path
name: pk
schema:
type: integer
responses:
200:
description: Query result
content:
application/json:
schema:
$ref: "#/components/schemas/DatabaseFunctionNamesResponse"
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
database = DatabaseDAO.find_by_id(pk)
if not database:
return self.response_404()
return self.response(
200,
function_names=database.function_names,
)
@expose("/available/", methods=("GET",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}" f".available",
log_to_statsd=False,
)
def available(self) -> Response:
"""Get names of databases currently available.
---
get:
summary: Get names of databases currently available
responses:
200:
description: Database names
content:
application/json:
schema:
type: array
items:
type: object
properties:
name:
description: Name of the database
type: string
engine:
description: Name of the SQLAlchemy engine
type: string
available_drivers:
description: Installed drivers for the engine
type: array
items:
type: string
sqlalchemy_uri_placeholder:
description: Placeholder for the SQLAlchemy URI
type: string
default_driver:
description: Default driver for the engine
type: string
preferred:
description: Is the database preferred?
type: boolean
sqlalchemy_uri_placeholder:
description: Example placeholder for the SQLAlchemy URI
type: string
parameters:
description: JSON schema defining the needed parameters
type: object
engine_information:
description: Dict with public properties form the DB Engine
type: object
properties:
supports_file_upload:
description: Whether the engine supports file uploads
type: boolean
disable_ssh_tunneling:
description: Whether the engine supports SSH Tunnels
type: boolean
400:
$ref: '#/components/responses/400'
500:
$ref: '#/components/responses/500'
"""
preferred_databases: list[str] = app.config.get("PREFERRED_DATABASES", [])
available_databases = []
for engine_spec, drivers in get_available_engine_specs().items():
if not drivers:
continue
payload: dict[str, Any] = {
"name": engine_spec.engine_name,
"engine": engine_spec.engine,
"available_drivers": sorted(drivers),
"sqlalchemy_uri_placeholder": engine_spec.sqlalchemy_uri_placeholder,
"preferred": engine_spec.engine_name in preferred_databases,
"engine_information": engine_spec.get_public_information(),
"supports_oauth2": engine_spec.supports_oauth2,
}
if engine_spec.default_driver:
payload["default_driver"] = engine_spec.default_driver
# show configuration parameters for DBs that support it
if (
hasattr(engine_spec, "parameters_json_schema")
and hasattr(engine_spec, "sqlalchemy_uri_placeholder")
and getattr(engine_spec, "default_driver") in drivers
):
payload["parameters"] = engine_spec.parameters_json_schema()
payload["sqlalchemy_uri_placeholder"] = (
engine_spec.sqlalchemy_uri_placeholder
)
available_databases.append(payload)
# sort preferred first
response = sorted(
(payload for payload in available_databases if payload["preferred"]),
key=lambda payload: preferred_databases.index(payload["name"]),
)
# add others
response.extend(
sorted(
(
payload
for payload in available_databases
if not payload["preferred"]
),
key=lambda payload: payload["name"],
)
)
return self.response(200, databases=response)
@expose("/validate_parameters/", methods=("POST",))
@protect()
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".validate_parameters",
log_to_statsd=False,
)
@requires_json
def validate_parameters(self) -> FlaskResponse:
"""Validate database connection parameters.
---
post:
summary: Validate database connection parameters
requestBody:
description: DB-specific parameters
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/DatabaseValidateParametersSchema"
responses:
200:
description: Database Test Connection
content:
application/json:
schema:
type: object
properties:
message:
type: string
400:
$ref: '#/components/responses/400'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
try:
payload = DatabaseValidateParametersSchema().load(request.json)
except ValidationError as ex:
errors = [
SupersetError(
message="\n".join(messages),
error_type=SupersetErrorType.INVALID_PAYLOAD_SCHEMA_ERROR,
level=ErrorLevel.ERROR,
extra={"invalid": [attribute]},
)
for attribute, messages in ex.messages.items()
]
raise InvalidParametersError(errors) from ex
command = ValidateDatabaseParametersCommand(payload)
command.run()
return self.response(200, message="OK")
@expose("/<int:pk>/ssh_tunnel/", methods=("DELETE",))
@protect()
@statsd_metrics
@deprecated(deprecated_in="4.0")
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".delete_ssh_tunnel",
log_to_statsd=False,
)
def delete_ssh_tunnel(self, pk: int) -> Response:
"""Delete a SSH tunnel.
---
delete:
summary: Delete a SSH tunnel
parameters:
- in: path
schema:
type: integer
name: pk
responses:
200:
description: SSH Tunnel deleted
content:
application/json:
schema:
type: object
properties:
message:
type: string
401:
$ref: '#/components/responses/401'
403:
$ref: '#/components/responses/403'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
database = DatabaseDAO.find_by_id(pk)
if not database:
return self.response_404()
try:
existing_ssh_tunnel_model = database.ssh_tunnels
if existing_ssh_tunnel_model:
DeleteSSHTunnelCommand(existing_ssh_tunnel_model.id).run()
return self.response(200, message="OK")
return self.response_404()
except SSHTunnelDeleteFailedError as ex:
logger.error(
"Error deleting SSH Tunnel %s: %s",
self.__class__.__name__,
str(ex),
exc_info=True,
)
return self.response_422(message=str(ex))
except SSHTunnelingNotEnabledError as ex:
logger.error(
"Error deleting SSH Tunnel %s: %s",
self.__class__.__name__,
str(ex),
exc_info=True,
)
return self.response_400(message=str(ex))
@expose("/<int:pk>/schemas_access_for_file_upload/")
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".schemas_access_for_file_upload",
log_to_statsd=False,
)
def schemas_access_for_file_upload(self, pk: int) -> Response:
"""The list of the database schemas where to upload information.
---
get:
summary: The list of the database schemas where to upload information
parameters:
- in: path
name: pk
schema:
type: integer
responses:
200:
description: The list of the database schemas where to upload information
content:
application/json:
schema:
$ref: "#/components/schemas/DatabaseSchemaAccessForFileUploadResponse"
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
database = DatabaseDAO.find_by_id(pk)
if not database:
return self.response_404()
schemas_allowed = database.get_schema_access_for_file_upload()
# the list schemas_allowed should not be empty here
# and the list schemas_allowed_processed returned from security_manager
# should not be empty either,
# otherwise the database should have been filtered out
# in CsvToDatabaseForm
schemas_allowed_processed = security_manager.get_schemas_accessible_by_user(
database, database.get_default_catalog(), schemas_allowed, True
)
return self.response(200, schemas=schemas_allowed_processed)