airbnb/caravel

View on GitHub
superset/extensions/metastore_cache.py

Summary

Maintainability
A
0 mins
Test Coverage
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime, timedelta
from typing import Any, Optional
from uuid import UUID, uuid3

from flask import current_app, Flask, has_app_context
from flask_caching import BaseCache
from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.key_value.exceptions import KeyValueCreateFailedError
from superset.key_value.types import (
    KeyValueCodec,
    KeyValueResource,
    PickleKeyValueCodec,
)
from superset.key_value.utils import get_uuid_namespace
from superset.utils.decorators import transaction

RESOURCE = KeyValueResource.METASTORE_CACHE

logger = logging.getLogger(__name__)


class SupersetMetastoreCache(BaseCache):
    def __init__(
        self,
        namespace: UUID,
        codec: KeyValueCodec,
        default_timeout: int = 300,
    ) -> None:
        super().__init__(default_timeout)
        self.namespace = namespace
        self.codec = codec

    @classmethod
    def factory(
        cls, app: Flask, config: dict[str, Any], args: list[Any], kwargs: dict[str, Any]
    ) -> BaseCache:
        seed = config.get("CACHE_KEY_PREFIX", "")
        kwargs["namespace"] = get_uuid_namespace(seed)
        codec = config.get("CODEC") or PickleKeyValueCodec()
        if (
            has_app_context()
            and not current_app.debug
            and isinstance(codec, PickleKeyValueCodec)
        ):
            logger.warning(
                "Using PickleKeyValueCodec with SupersetMetastoreCache may be unsafe, "
                "use at your own risk."
            )
        kwargs["codec"] = codec
        return cls(*args, **kwargs)

    def get_key(self, key: str) -> UUID:
        return uuid3(self.namespace, key)

    def _get_expiry(self, timeout: Optional[int]) -> Optional[datetime]:
        timeout = self._normalize_timeout(timeout)
        if timeout is not None and timeout > 0:
            return datetime.now() + timedelta(seconds=timeout)
        return None

    def set(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
        # pylint: disable=import-outside-toplevel
        from superset.daos.key_value import KeyValueDAO

        KeyValueDAO.upsert_entry(
            resource=RESOURCE,
            key=self.get_key(key),
            value=value,
            codec=self.codec,
            expires_on=self._get_expiry(timeout),
        )
        db.session.commit()  # pylint: disable=consider-using-transaction
        return True

    def add(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
        # pylint: disable=import-outside-toplevel
        from superset.daos.key_value import KeyValueDAO

        try:
            KeyValueDAO.delete_expired_entries(RESOURCE)
            KeyValueDAO.create_entry(
                resource=RESOURCE,
                value=value,
                codec=self.codec,
                key=self.get_key(key),
                expires_on=self._get_expiry(timeout),
            )
            db.session.commit()  # pylint: disable=consider-using-transaction
            return True
        except (SQLAlchemyError, KeyValueCreateFailedError):
            db.session.rollback()  # pylint: disable=consider-using-transaction
            return False

    def get(self, key: str) -> Any:
        # pylint: disable=import-outside-toplevel
        from superset.daos.key_value import KeyValueDAO

        return KeyValueDAO.get_value(RESOURCE, self.get_key(key), self.codec)

    def has(self, key: str) -> bool:
        entry = self.get(key)
        if entry:
            return True
        return False

    @transaction()
    def delete(self, key: str) -> Any:
        # pylint: disable=import-outside-toplevel
        from superset.daos.key_value import KeyValueDAO

        return KeyValueDAO.delete_entry(RESOURCE, self.get_key(key))