Sciebo-RDS/py-research-data-services-common

View on GitHub
RDS/Service.py

Summary

Maintainability
D
2 days
Test Coverage
B
80%
from .Token import Token, OAuth2Token
from .User import User
from .Informations import LoginMode, FileTransferMode, FileTransferArchive
from urllib.parse import urlparse, urlunparse
import requests
import json
from datetime import datetime, timedelta
from typing import Union
import logging
import mimetypes
import base64
import os.path
from io import IOBase
from urllib import parse
from requests.exceptions import Timeout

logger = logging.getLogger()


def initService(obj: Union[str, dict]):
    """
    Returns a Service or oauthService object for json String or dict.
    """
    if isinstance(obj, (LoginService, OAuth2Service)):
        return obj

    if not isinstance(obj, (str, dict)):
        raise ValueError("Given object not from type str or dict.")

    from RDS.Util import try_function_on_dict

    load = try_function_on_dict(
        [
            OAuth2Service.from_json,
            OAuth2Service.from_dict,
            LoginService.from_json,
            LoginService.from_dict,
            BaseService.from_json,
            BaseService.from_dict
        ]
    )
    return load(obj)


class BaseService:
    """
    Represents a service, which can be used in RDS.
    """

    _servicename = None
    _implements = None
    _fileTransferMode = None
    _fileTransferArchive = None
    _description = None
    _icon = None
    _infoUrl = None
    _helpUrl = None
    _displayName = None
    _metadataProfile = None
    _projectLinkTemplate = None

    def __init__(
        self,
        servicename: str,
        implements: list = None,
        fileTransferMode: FileTransferMode = FileTransferMode.active,
        fileTransferArchive: FileTransferArchive = FileTransferArchive.none,
        description: dict = None,
        icon: str = "",
        infoUrl: str = "",
        helpUrl: str = "",
        displayName: str = None,
        metadataProfile: str = None,
        projectLinkTemplate: str = ""
    ):
        """Initialize Service without any authentication.

        Args:
            servicename (str): The name of the service, which will be registered. Must be unique.
            implements (list, optional): Specified the implemented port endpoints. Defaults to empty list.
            fileTransferMode (int, optional): Set the mode for transfering files. Defaults to 0=active. Alternative is 1=passive.
            fileTransferArchive (str, optional): Set the archive, which is needed for transfering files. Defaults to "". Other value is "zip"
            description (dict, optional): Set a short description for this service with corresponding language. Defaults to {"en":""}.
            icon: (str, optional): Takes a filepath, so the mimetype and base64 can be calculated for later usage. Defaults to "".
            infoUrl: (str, optional): Set the infoUrl for this service, so the user can be redirected to it to find more information about the service. Defaults to "".
            helpUrl: (str, optional): Set the helpUrl for this service, so the user can be redirected to a helpdesk page about this service. Defaults to "".
            displayName: (str, optional): Set the displayName for this service, which can be different as the servicename. Servicename will be used for identifiers. Defaults to "".
            metadataProfile: (str, optional): Set the metadata profile for the this service
            projectLinkTemplate: (str, optional): Set template for project url on this service. User `${projectId}` (javascript format string notation!) to mark where the project ID goes.
        """
        self.check_string(servicename, "servicename")

        self._servicename = servicename.lower()

        if description is None:
            self._description = {"en": ""}
        else:
            self._description = description

        if infoUrl is not None:
            if parse.unquote_plus(infoUrl) == infoUrl:
                self._infoUrl = parse.quote_plus(infoUrl)
            else:
                self._infoUrl = infoUrl
        else:
            self._infoUrl = ""

        if helpUrl is not None:
            if parse.unquote_plus(helpUrl) == helpUrl:
                self._helpUrl = parse.quote_plus(helpUrl)
            else:
                self._helpUrl = helpUrl
        else:
            self._helpUrl = ""

        if displayName is not None:
            self._displayName = displayName
        else:
            self._displayName = ""

        if icon is not None and icon != "":
            if isinstance(icon, (str)) and str(icon).startswith("data:"):
                self._icon = icon
            elif os.path.isfile(icon):
                mime = mimetypes.guess_type(icon)[0]

                with open(icon, "rb") as f:
                    b64 = base64.b64encode(f.read()).decode("utf-8")
                    self._icon = f"data:{mime};base64,{b64}"
            else:
                raise FileNotFoundError

        if metadataProfile is not None and metadataProfile != "":
            try:
                b64 = base64.b64decode(metadataProfile).decode("utf-8")
                self._metadataProfile = metadataProfile
            except:
                if os.path.isfile(metadataProfile):
                    with open(metadataProfile, "rb") as f:
                        metadataProfile = f.read()
                        json.loads(metadataProfile)
                        self._metadataProfile = base64.b64encode(metadataProfile).decode("utf-8")
                else:
                    raise FileNotFoundError
                

        self._projectLinkTemplate = projectLinkTemplate

        self._implements = implements
        if implements is None:
            self._implements = []

        valid_implements = ["fileStorage", "metadata"]

        if len(self._implements) == 0 or len(self._implements) > 2:
            raise ValueError(
                "implements is empty or over 2 elements. Value: {}, Only valid: {}".format(
                    len(self._implements), valid_implements)
            )

        for impl in self._implements:
            if impl not in valid_implements:
                raise ValueError("implements holds an invalid value: {}. Only valid: {}".format(
                    impl, valid_implements))

        self._fileTransferMode = fileTransferMode
        self._fileTransferArchive = fileTransferArchive

    @property
    def servicename(self):
        return self._servicename

    @property
    def fileTransferMode(self):
        return self._fileTransferMode

    @property
    def fileTransferArchive(self):
        return self._fileTransferArchive

    @property
    def description(self):
        return self._description

    @property
    def icon(self):
        return self._icon

    @property
    def infoUrl(self):
        return self._infoUrl

    @property
    def helpUrl(self):
        return self._helpUrl

    @property
    def displayName(self):
        return self._displayName

    @property
    def implements(self):
        return self._implements

    @property
    def metadataProfile(self):
        return self._metadataProfile
    
    @property
    def projectLinkTemplate(self):
        return self._projectLinkTemplate

    def check_string(self, obj: str, string: str):
        if not obj:
            raise ValueError(f"{string} cannot be an empty string.")

    def is_valid(self, token: Token, user: User):
        pass

    def __eq__(self, obj):
        try:
            return self.servicename == obj.servicename
        except:
            return False

    def __str__(self):
        return json.dumps(self)

    def __repr__(self):
        return json.dumps(self.to_dict())

    def to_json(self):
        """
        Returns this object as a json string.
        """

        data = {"type": self.__class__.__name__, "data": self.to_dict()}
        return data

    def to_dict(self):
        """
        Returns this object as a dict.
        """

        data = {
            "servicename": self._servicename,
            "implements": self._implements,
            "fileTransferMode": self.fileTransferMode.value,
            "fileTransferArchive": self.fileTransferArchive.value,
            "description": self._description,
            "icon": self._icon,
            "infoUrl": self._infoUrl,
            "helpUrl": self._helpUrl,
            "displayName": self._displayName,
            "metadataProfile": self._metadataProfile,
            "projectLinkTemplate": self._projectLinkTemplate
        }

        return data

    @classmethod
    def from_json(cls, serviceStr: str):
        """
        Returns an service object from a json string.
        """

        data = serviceStr
        while (
            type(data) is not dict
        ):  # FIX for bug: JSON.loads sometimes returns a string
            data = json.loads(data)

        if "type" in data and str(data["type"]).endswith("Service") and "data" in data:
            data = data["data"]

            return BaseService(
                servicename=data["servicename"],
                implements=data.get("implements"),
                fileTransferMode=FileTransferMode(
                    data.get("fileTransferMode", 0)),
                fileTransferArchive=FileTransferArchive(
                    data.get("fileTransferArchive", 0)),
                description=data.get("description"),
                icon=data.get("icon"),
                infoUrl=data.get("infoUrl"),
                helpUrl=data.get("helpUrl"),
                displayName=data.get("displayName"),
                metadataProfile=data.get("metadataProfile"),
                projectLinkTemplate=data.get("projectLinkTemplate")
            )

        raise ValueError("not a valid service json string.")

    @classmethod
    def from_dict(cls, serviceDict: dict):
        """
        Returns an service object from a dict string.
        """

        try:
            return BaseService(
                servicename=serviceDict["servicename"],
                implements=serviceDict.get("implements", ["metadata"]),
                fileTransferMode=FileTransferMode(
                    serviceDict.get("fileTransferMode", 0)),
                fileTransferArchive=FileTransferArchive(
                    serviceDict.get("fileTransferArchive", 0)),
                description=serviceDict.get("description"),
                icon=serviceDict.get("icon"),
                infoUrl=serviceDict.get("infoUrl"),
                helpUrl=serviceDict.get("helpUrl"),
                displayName=serviceDict.get("displayName"),
                metadataProfile=serviceDict.get("metadataProfile"),
                projectLinkTemplate=serviceDict.get("projectLinkTemplate")
            )
        except Exception as e:
            logger.error(e, exc_info=True)
            raise ValueError("not a valid service dict for class {}".format(
                cls.__class__))


class LoginService(BaseService):
    _userId = None
    _password = None

    def __init__(
        self,
        userId: bool = True,
        password: bool = True,
        *args, **kwargs
    ):
        """Initialize Service with username:password authentication.

        Args:
            userId (bool, optional): Set True, if username is needed to work. Defaults to True.
            password (bool, optional): Set True, if password is needed to work. Defaults to True.
        """
        super().__init__(*args, **kwargs)

        self._userId = userId
        self._password = password

    @property
    def userId(self):
        return self._userId

    @property
    def password(self):
        return self._password

    def to_json(self):
        """
        Returns this object as a json string.
        """

        data = super().to_json()

        data["type"] = self.__class__.__name__
        data["data"].update(self.to_dict())

        return data

    def to_dict(self):
        """
        Returns this object as a dict.
        """
        data = super().to_dict()
        data["credentials"] = {
            "userId": self.userId, "password": self.password
        }

        return data

    @classmethod
    def from_json(cls, serviceStr: str):
        """
        Returns an oauthservice object from a json string.
        """

        data = serviceStr
        while (
            type(data) is not dict
        ):  # FIX for bug: JSON.loads sometimes returns a string
            data = json.loads(data)

        service = super().from_json(serviceStr)

        try:
            data = data["data"]
            cred = data.get("credentials", {})

            return cls.from_service(
                service,
                cred.get("userId", True),
                cred.get("password", True)
            )
        except:
            raise ValueError("not a valid oauthservice json string.")

    @classmethod
    def from_dict(cls, serviceDict: dict):
        """
        Returns an oauthservice object from a dict.
        """

        service = super().from_dict(serviceDict)

        try:
            cred = serviceDict.get("credentials", {})
            return cls.from_service(
                service,
                cred.get("userId", True),
                cred.get("password", True)
            )
        except:
            raise ValueError("not a valid loginservice dict.")

    @classmethod
    def from_service(
        cls,
        service: BaseService,
        userId: bool,
        password: bool
    ):
        return cls(
            userId=userId,
            password=password,
            servicename=service.servicename,
            implements=service.implements,
            fileTransferMode=service.fileTransferMode,
            fileTransferArchive=service.fileTransferArchive,
            description=service.description,
            icon=service.icon,
            infoUrl=service.infoUrl,
            helpUrl=service.helpUrl,
            displayName=service.displayName,
            metadataProfile=service.metadataProfile,
            projectLinkTemplate=service.projectLinkTemplate
        )


class OAuth2Service(BaseService):
    """
    Represents an OAuth2 service, which can be used in RDS.
    This service enables the oauth2 workflow.
    """

    _authorize_url = None
    _refresh_url = None
    _client_id = None
    _client_secret = None

    def __init__(
        self,
        authorize_url: str = "",
        refresh_url: str = "",
        client_id: str = "",
        client_secret: str = "",
        *args, **kwargs
    ):
        """Initialize a service for oauth2.

        Args:
            authorize_url (str, optional): The authorize url from oauth2 workflow. Defaults to "".
            refresh_url (str, optional): The refresh url from oauth2 workflow. Defaults to "".
            client_id (str, optional): The client id from oauth2 workflow. Defaults to "".
            client_secret (str, optional): The client secret from oauth2 workflow. Defaults to "".
        """
        super().__init__(*args, **kwargs)

        self.check_string(authorize_url, "authorize_url")
        self.check_string(refresh_url, "refresh_url")
        self.check_string(client_id, "client_id")
        self.check_string(client_secret, "client_secret")

        self._authorize_url = self.parse_url(authorize_url)
        self._refresh_url = self.parse_url(refresh_url)

        self._client_id = client_id
        self._client_secret = client_secret

    def parse_url(self, url: str):
        u = urlparse(url)
        if not u.netloc:
            raise ValueError("URL needs a protocoll")

        # check for trailing slash for url
        if u.path and u.path[-1] == "/":
            u = u._replace(path=u.path[:-1])

        return u

    def refresh(self, token: OAuth2Token):
        """
        Refresh the given oauth2 token for specified user.
        """

        if not isinstance(token, OAuth2Token):
            logger.debug("call refresh on non oauth token.")
            raise ValueError("parameter token is not an oauthtoken.")

        import os

        data = {
            "grant_type": "refresh_token",
            "refresh_token": token.refresh_token,
            "redirect_uri": "{}".format(
                os.getenv("RDS_OAUTH_REDIRECT_URI",
                          "http://localhost:8080/redirect")
            ),
            "client_id": self.client_id,
            "client_secret": self.client_secret,
        }

        logger.debug(f"send data {data}")

        try:
            req = requests.post(
                self.refresh_url,
                data=data,
                auth=(self.client_id, self.client_secret),
                verify=(os.environ.get("VERIFY_SSL", "True") == "True"),
                timeout=15
            )
        except Timeout:
            from RDS.ServiceException import OAuth2UnsuccessfulResponseError
            raise OAuth2UnsuccessfulResponseError()

        logger.debug(f"status code: {req.status_code}")

        if req.status_code >= 400:
            data = json.loads(req.text)

            if "error" in data:
                error_type = data["error"]

                if error_type == "invalid_request":
                    from RDS.ServiceException import OAuth2InvalidRequestError

                    raise OAuth2InvalidRequestError()
                elif error_type == "invalid_client":
                    from RDS.ServiceException import OAuth2InvalidClientError

                    raise OAuth2InvalidClientError()
                elif error_type == "invalid_grant":
                    from RDS.ServiceException import OAuth2InvalidGrantError

                    raise OAuth2InvalidGrantError()
                elif error_type == "unauthorized_client":
                    from RDS.ServiceException import OAuth2UnauthorizedClient

                    raise OAuth2UnauthorizedClient()
                elif error_type == "unsupported_grant_type":
                    from RDS.ServiceException import OAuth2UnsupportedGrantType

                    raise OAuth2UnsupportedGrantType()

            from RDS.ServiceException import OAuth2UnsuccessfulResponseError

            raise OAuth2UnsuccessfulResponseError()

        data = json.loads(req.text)

        logger.debug(f"response data {data}")

        exp_date = data["expires_in"]
        if exp_date > 3600:
            exp_date = 3600

        date = datetime.now() + timedelta(seconds=exp_date)
        new_token = OAuth2Token(
            token.user,
            self,
            data["access_token"],
            data.get("refresh_token", token.refresh_token),
            date,
        )

        logger.debug(f"new token {new_token}")
        return new_token

    @property
    def refresh_url(self):
        return urlunparse(self._refresh_url)

    @property
    def authorize_url(self):
        return urlunparse(self._authorize_url)

    @property
    def client_id(self):
        return self._client_id

    @property
    def client_secret(self):
        return self._client_secret

    @classmethod
    def from_service(
        cls,
        service: BaseService,
        authorize_url: str,
        refresh_url: str,
        client_id: str,
        client_secret: str,
    ):
        """
        Converts the given Service to an oauth2service.
        """
        return cls(
            authorize_url=authorize_url,
            refresh_url=refresh_url,
            client_id=client_id,
            client_secret=client_secret,
            servicename=service.servicename,
            implements=service.implements,
            fileTransferMode=service.fileTransferMode,
            fileTransferArchive=service.fileTransferArchive,
            description=service.description,
            icon=service.icon,
            infoUrl=service.infoUrl,
            helpUrl=service.helpUrl,
            displayName=service.displayName,
            metadataProfile=service.metadataProfile,
            projectLinkTemplate=service.projectLinkTemplate
        )

    def __eq__(self, obj):
        return super().__eq__(obj)

    def to_json(self):
        """
        Returns this object as a json string.
        """

        data = super().to_json()

        data["type"] = self.__class__.__name__
        data["data"].update(self.to_dict())

        return data

    def to_dict(self):
        """
        Returns this object as a dict.
        """
        data = super().to_dict()
        data["authorize_url"] = self.authorize_url
        data["refresh_url"] = self.refresh_url
        data["client_id"] = self._client_id
        data["client_secret"] = self._client_secret

        return data

    @classmethod
    def from_json(cls, serviceStr: str):
        """
        Returns an oauthservice object from a json string.
        """

        data = serviceStr
        while (
            type(data) is not dict
        ):  # FIX for bug: JSON.loads sometimes returns a string
            data = json.loads(data)

        service = super().from_json(serviceStr)

        try:
            data = data["data"]
            return cls.from_service(
                service,
                data["authorize_url"],
                data["refresh_url"],
                data["client_id"],
                data.get("client_secret", ""),
            )
        except:
            raise ValueError("not a valid oauthservice json string.")

    @classmethod
    def from_dict(cls, serviceDict: dict):
        """
        Returns an oauthservice object from a dict.
        """

        service = super().from_dict(serviceDict)

        try:
            return cls.from_service(
                service,
                serviceDict["authorize_url"],
                serviceDict["refresh_url"],
                serviceDict["client_id"],
                serviceDict.get("client_secret", ""),
            )
        except:
            raise ValueError("not a valid oauthservice dict.")