Orange-OpenSource/python-onapsdk

View on GitHub
src/onapsdk/cds/blueprint.py

Summary

Maintainability
D
1 day
Test Coverage
A
99%
# SPDX-License-Identifier: Apache-2.0
"""CDS Blueprint module."""
import json
import re
from dataclasses import dataclass, field
from datetime import datetime
from io import BytesIO
from typing import Any, Dict, Generator, Iterator, List, Optional
from urllib.parse import urlencode
from uuid import uuid4
from zipfile import ZipFile

import oyaml as yaml

from onapsdk.utils.jinja import jinja_env
from onapsdk.exceptions import FileError, ParameterError, ValidationError

from .cds_element import CdsElement
from .data_dictionary import DataDictionary, DataDictionarySet


@dataclass
class CbaMetadata:
    """Class to hold CBA metadata values."""

    tosca_meta_file_version: str
    csar_version: str
    created_by: str
    entry_definitions: str
    template_name: str
    template_version: str
    template_tags: str


@dataclass
class Mapping:
    """Blueprint's template mapping.

    Stores mapping data:
      - name,
      - type,
      - name of dictionary from which value should be get,
      - dictionary source of value.
    """

    name: str
    mapping_type: str
    dictionary_name: str
    dictionary_sources: List[str] = field(default_factory=list)

    def __hash__(self) -> int:  # noqa: D401
        """Mapping object hash.

        Based on mapping name.

        Returns:
            int: Mapping hash

        """
        return hash(self.name)

    def __eq__(self, mapping: "Mapping") -> bool:
        """Compare two mapping objects.

        Mappings are equal if have the same name.

        Args:
            mapping (Mapping): Mapping object to compare with.

        Returns:
            bool: True if objects have the same name, False otherwise.

        """
        return self.name == mapping.name

    def merge(self, mapping: "Mapping") -> None:
        """Merge mapping objects.

        Merge objects dictionary sources.

        Args:
            mapping (Mapping): Mapping object to merge.

        """
        self.dictionary_sources = list(
            set(self.dictionary_sources) | set(mapping.dictionary_sources)
        )

    def generate_data_dictionary(self) -> dict:
        """Generate data dictionary for mapping.

        Data dictionary with required data sources, type and name for mapping will be created from
            Jinja2 template.

        Returns:
            dict: Data dictionary

        """
        return json.loads(
            jinja_env().get_template("data_dictionary_base.json.j2").render(mapping=self)
        )


class MappingSet:
    """Set of mapping objects.

    Mapping objects will be stored in dictionary where mapping name is a key.
    No two mappings with the same name can be stored in this collection.
    """

    def __init__(self) -> None:
        """Initialize mappings collection.

        Create dictionary to store mappings.
        """
        self.mappings = {}

    def __len__(self) -> int:  # noqa: D401
        """Mapping set length.

        Returns:
            int: Number of stored mapping objects.

        """
        return len(self.mappings)

    def __iter__(self) -> Iterator[Mapping]:
        """Iterate through mapping stored in set.

        Returns:
            Iterator[Mapping]: Stored mappings iterator.

        """
        return iter(list(self.mappings.values()))

    def __getitem__(self, index: int) -> Mapping:
        """Get item stored on given index.

        Args:
            index (int): Index number.

        Returns:
            Mapping: Mapping stored on given index.

        """
        return list(self.mappings.values())[index]

    def add(self, mapping: Mapping) -> None:
        """Add mapping to set.

        If there is already mapping object with the same name in collection
            they will be merged.

        Args:
            mapping (Mapping): Mapping to add to collection.

        """
        if mapping.name not in self.mappings:
            self.mappings.update({mapping.name: mapping})
        else:
            self.mappings[mapping.name].merge(mapping)

    def extend(self, iterable: Iterator[Mapping]) -> None:
        """Extend set with an iterator of mappings.

        Args:
            iterable (Iterator[Mapping]): Mappings iterator.

        """
        for mapping in iterable:
            self.add(mapping)


class Workflow(CdsElement):
    """Blueprint's workflow.

    Stores workflow steps, inputs, outputs.
    Executes workflow using CDS HTTP API.
    """

    @dataclass
    class WorkflowStep:
        """Workflow step class.

        Stores step name, description, target and optional activities.
        """

        name: str
        description: str
        target: str
        activities: List[Dict[str, str]] = field(default_factory=list)

    @dataclass
    class WorkflowInput:
        """Workflow input class.

        Stores input name, information if it's required, type, and optional description.
        """

        name: str
        required: bool
        type: str
        description: str = ""

    @dataclass
    class WorkflowOutput:
        """Workflow output class.

        Stores output name, type na value.
        """

        name: str
        type: str
        value: Dict[str, Any]

    def __init__(self,
                 cba_workflow_name: str,
                 cba_workflow_data: dict,
                 blueprint: "Blueprint") -> None:
        """Workflow initialization.

        Args:
            cba_workflow_name (str): Workflow name.
            cba_workflow_data (dict): Workflow data.
            blueprint (Blueprint): Blueprint object which contains workflow.

        """
        super().__init__()
        self.name: str = cba_workflow_name
        self.workflow_data: dict = cba_workflow_data
        self.blueprint: "Blueprint" = blueprint
        self._steps: List[self.WorkflowStep] = None
        self._inputs: List[self.WorkflowInput] = None
        self._outputs: List[self.WorkflowOutput] = None

    def __repr__(self) -> str:
        """Representation of object.

        Returns:
            str: Object's string representation

        """
        return (f"Workflow(name='{self.name}', "
                f"blueprint_name='{self.blueprint.metadata.template_name})'")

    @property
    def steps(self) -> List["Workflow.WorkflowStep"]:
        """Workflow's steps property.

        Returns:
            List[Workflow.WorkflowStep]: List of workflow's steps.

        """
        if self._steps is None:
            self._steps = []
            for step_name, step_data in self.workflow_data.get("steps", {}).items():
                self._steps.append(
                    self.WorkflowStep(
                        name=step_name,
                        description=step_data.get("description"),
                        target=step_data.get("target"),
                        activities=step_data.get("activities", []),
                    )
                )
        return self._steps

    @property
    def inputs(self) -> List["Workflow.WorkflowInput"]:
        """Workflow's inputs property.

        Returns:
            List[Workflow.WorkflowInput]: List of workflows's inputs.

        """
        if self._inputs is None:
            self._inputs = []
            for input_name, input_data in self.workflow_data.get("inputs", {}).items():
                self._inputs.append(
                    self.WorkflowInput(
                        name=input_name,
                        required=input_data.get("required"),
                        type=input_data.get("type"),
                        description=input_data.get("description"),
                    )
                )
        return self._inputs

    @property
    def outputs(self) -> List["Workflow.WorkflowOutput"]:
        """Workflow's outputs property.

        Returns:
            List[Workflow.WorkflowOutput]: List of workflows's outputs.

        """
        if self._outputs is None:
            self._outputs = []
            for output_name, output_data in self.workflow_data.get("outputs", {}).items():
                self._outputs.append(
                    self.WorkflowOutput(
                        name=output_name,
                        type=output_data.get("type"),
                        value=output_data.get("value"),
                    )
                )
        return self._outputs

    @property
    def url(self) -> str:
        """Workflow execution url.

        Returns:
            str: Url to call warkflow execution.

        """
        return f"{self._url}/api/v1/execution-service/process"

    def execute(self, inputs: dict) -> dict:
        """Execute workflow.

        Call CDS HTTP API to execute workflow.

        Args:
            inputs (dict): Inputs dictionary.

        Returns:
            dict: Response's payload.

        """
        # There should be some flague to check if CDS UI API is used or blueprintprocessor.
        # For CDS UI API there is no endporint to execute workflow, so it has to be turned off.
        execution_service_input: dict = {
            "commonHeader": {
                "originatorId": "onapsdk",
                "requestId": str(uuid4()),
                "subRequestId": str(uuid4()),
                "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
            },
            "actionIdentifiers": {
                "blueprintName": self.blueprint.metadata.template_name,
                "blueprintVersion": self.blueprint.metadata.template_version,
                "actionName": self.name,
                "mode": "SYNC",  # Has to be SYNC for REST call
            },
            "payload": {f"{self.name}-request": inputs},
        }
        response: "requests.Response" = self.send_message_json(
            "POST",
            f"Execute {self.blueprint.metadata.template_name} blueprint {self.name} workflow",
            self.url,
            auth=self.auth,
            data=json.dumps(execution_service_input)
        )
        return response["payload"]


class ResolvedTemplate(CdsElement):
    """Resolved template class.

    Store and retrieve rendered template results.
    """

    def __init__(self, blueprint: "Blueprint",  # pylint: disable=too-many-arguments
                 artifact_name: Optional[str] = None,
                 resolution_key: Optional[str] = None,
                 resource_id: Optional[str] = None,
                 resource_type: Optional[str] = None,
                 occurrence: Optional[str] = None,
                 response_format: str = "application/json") -> None:
        """Init resolved template class instance.

        Args:
            blueprint (Blueprint): Blueprint object.
            artifact_name (Optional[str], optional): Artifact name for which to retrieve
                a resolved resource. Defaults to None.
            resolution_key (Optional[str], optional): Resolution Key associated with
                the resolution. Defaults to None.
            resource_id (Optional[str], optional): Resource Id associated with
                the resolution. Defaults to None.
            resource_type (Optional[str], optional): Resource Type associated
                with the resolution. Defaults to None.
            occurrence (Optional[str], optional): Occurrence of the template resolution (1-n).
                Defaults to None.
            response_format (str): Expected format of the template being retrieved.

        """
        super().__init__()
        self.blueprint: "Blueprint" = blueprint
        self.artifact_name: Optional[str] = artifact_name
        self.resolution_key: Optional[str] = resolution_key
        self.resource_id: Optional[str] = resource_id
        self.resource_type: Optional[str] = resource_type
        self.occurrence: Optional[str] = occurrence
        self.response_format: str = response_format

    @property
    def url(self) -> str:
        """Url property.

        Returns:
            str: Url

        """
        return f"{self._url}/api/v1/template"

    @property
    def resolved_template_url(self) -> str:
        """Url to retrieve resolved template.

        Filter None parameters.

        Returns:
            str: Retrieve resolved template url

        """
        params_dict: Dict[str, str] = urlencode(dict(filter(lambda item: item[1] is not None, {
            "bpName": self.blueprint.metadata.template_name,
            "bpVersion": self.blueprint.metadata.template_version,
            "artifactName": self.artifact_name,
            "resolutionKey": self.resolution_key,
            "resourceType": self.resource_type,
            "resourceId": self.resource_id,
            "occurrence": self.occurrence,
            "format": self.response_format
        }.items())))
        return f"{self.url}?{params_dict}"

    def get_resolved_template(self) -> Dict[str, str]:
        """Get resolved template.

        Returns:
            Dict[str, str]: Resolved template

        """
        return self.send_message_json(
            "GET",
            f"Get resolved template {self.artifact_name} for "
            f"{self.blueprint.metadata.template_name} version "
            f"{self.blueprint.metadata.template_version}",
            self.resolved_template_url,
            auth=self.auth
        )

    def store_resolved_template(self, resolved_template: str) -> None:
        """Store resolved template.

        Args:
            resolved_template (str): Template to store

        Raises:
            ParameterError: To store template it's needed to pass artifact name and:
             - resolution key, or
             - resource type and resource id.
            If not all needed parameters are given that exception will be raised.

        """
        if self.artifact_name and self.resolution_key:
            return self.store_resolved_template_with_resolution_key(resolved_template)
        if self.artifact_name and self.resource_type and self.resource_id:
            return self.store_resolved_template_with_resource_type_and_id(resolved_template)
        raise ParameterError("To store template artifact name with resolution key or both "
                             "resource type and id is needed")

    def store_resolved_template_with_resolution_key(self, resolved_template: str) -> None:
        """Store template using resolution key.

        Args:
            resolved_template (str): Template to store

        """
        return self.send_message(
            "POST",
            f"Store resolved template {self.artifact_name} for "
            f"{self.blueprint.metadata.template_name} version "
            f"{self.blueprint.metadata.template_version}",
            f"{self.url}/{self.blueprint.metadata.template_name}/"
            f"{self.blueprint.metadata.template_version}/{self.artifact_name}/"
            f"{self.resolution_key}",
            auth=self.auth,
            data=resolved_template
        )

    def store_resolved_template_with_resource_type_and_id(self, resolved_template: str) -> None:
        """Store template using resource type and resource ID.

        Args:
            resolved_template (str): Template to store

        """
        return self.send_message(
            "POST",
            f"Store resolved template {self.artifact_name} for "
            f"{self.blueprint.metadata.template_name} version "
            f"{self.blueprint.metadata.template_version}",
            f"{self.url}/{self.blueprint.metadata.template_name}/"
            f"{self.blueprint.metadata.template_version}/{self.artifact_name}/"
            f"{self.resource_type}/{self.resource_id}",
            auth=self.auth,
            data=resolved_template
        )

class Blueprint(CdsElement):
    """CDS blueprint representation."""

    TEMPLATES_RE = r"Templates\/.*json$"
    TOSCA_META = "TOSCA-Metadata/TOSCA.meta"

    def __init__(self, cba_file_bytes: bytes) -> None:
        """Blueprint initialization.

        Save blueprint zip file bytes.
        You can create that object using opened file or bytes:
            blueprint = Blueprint(open("path/to/CBA.zip", "rb"))
        or
            with open("path/to/CBA.zip", "rb") as cba:
                blueprint = Blueprint(cba.read())
        It is even better to use second example due to CBA file will be correctly closed for sure.

        Args:
            cba_file_bytes (bytes): CBA ZIP file bytes

        """
        super().__init__()
        self.cba_file_bytes: bytes = cba_file_bytes
        self._cba_metadata: CbaMetadata = None
        self._cba_mappings: MappingSet = None
        self._cba_workflows: List[Workflow] = None

    @property
    def url(self) -> str:
        """URL address to use for CDS API call.

        Returns:
            str: URL to CDS blueprintprocessor.

        """
        return f"{self._url}/api/v1/blueprint-model"

    @property
    def metadata(self) -> CbaMetadata:
        """Blueprint metadata.

        Data from TOSCA.meta file.

        Returns:
            CbaMetadata: Blueprint metadata object.

        """
        if not self._cba_metadata:
            with ZipFile(BytesIO(self.cba_file_bytes)) as cba_zip_file:
                self._cba_metadata = self.get_cba_metadata(cba_zip_file.read(self.TOSCA_META))
        return self._cba_metadata

    @property
    def mappings(self) -> MappingSet:
        """Blueprint mappings collection.

        Returns:
            MappingSet: Mappings collection.

        """
        if not self._cba_mappings:
            with ZipFile(BytesIO(self.cba_file_bytes)) as cba_zip_file:
                self._cba_mappings = self.get_mappings(cba_zip_file)
        return self._cba_mappings

    @property
    def workflows(self) -> List["Workflow"]:
        """Blueprint's workflows property.

        Returns:
            List[Workflow]: Blueprint's workflow list.

        """
        if not self._cba_workflows:
            with ZipFile(BytesIO(self.cba_file_bytes)) as cba_zip_file:
                self._cba_workflows = list(
                    self.get_workflows(cba_zip_file.read(self.metadata.entry_definitions))
                )
        return self._cba_workflows

    @classmethod
    def load_from_file(cls, cba_file_path: str) -> "Blueprint":
        """Load blueprint from file.

        Raises:
            FileError: File to load blueprint from doesn't exist.

        Returns:
            Blueprint: Blueprint object

        """
        try:
            with open(cba_file_path, "rb") as cba_file:
                return Blueprint(cba_file.read())
        except FileNotFoundError as exc:
            msg = "The requested file with a blueprint doesn't exist."
            raise FileError(msg) from exc

    def enrich(self) -> "Blueprint":
        """Call CDS API to get enriched blueprint file.

        Returns:
            Blueprint: Enriched blueprint object

        """
        response: "requests.Response" = self.send_message(
            "POST",
            "Enrich CDS blueprint",
            f"{self.url}/enrich",
            files={"file": self.cba_file_bytes},
            headers={},  # Leave headers empty to fill it correctly by `requests` library
            auth=self.auth
        )
        return Blueprint(response.content)

    def publish(self) -> None:
        """Publish blueprint."""
        self.send_message(
            "POST",
            "Publish CDS blueprint",
            f"{self.url}/publish",
            files={"file": self.cba_file_bytes},
            headers={},  # Leave headers empty to fill it correctly by `requests` library
            auth=self.auth
        )

    def deploy(self) -> None:
        """Deploy blueprint."""
        self.send_message(
            "POST",
            "Deploy CDS blueprint",
            f"{self.url}",
            files={"file": self.cba_file_bytes},
            headers={},  # Leave headers empty to fill it correctly by `requests` library
            auth=self.auth
        )

    def save(self, dest_file_path: str) -> None:
        """Save blueprint to file.

        Args:
            dest_file_path (str): Path of file where blueprint is going to be saved

        """
        with open(dest_file_path, "wb") as cba_file:
            cba_file.write(self.cba_file_bytes)

    def get_data_dictionaries(self) -> DataDictionarySet:
        """Get the generated data dictionaries required by blueprint.

        If mapping reqires other source than input it should be updated before upload to CDS.

        Returns:
            Generator[DataDictionary, None, None]: DataDictionary objects.

        """
        dd_set: DataDictionarySet = DataDictionarySet()
        for mapping in self.mappings:
            dd_set.add(DataDictionary(mapping.generate_data_dictionary()))
        return dd_set

    @staticmethod
    def get_cba_metadata(cba_tosca_meta_bytes: bytes) -> CbaMetadata:
        """Parse CBA TOSCA.meta file and get values from it.

        Args:
            cba_tosca_meta_bytes (bytes): TOSCA.meta file bytes.

        Raises:
            ValidationError: TOSCA Meta file has invalid format.

        Returns:
            CbaMetadata: Dataclass with CBA metadata

        """
        meta_dict: dict = yaml.safe_load(cba_tosca_meta_bytes)
        if not isinstance(meta_dict, dict):
            raise ValidationError("Invalid TOSCA Meta file")
        return CbaMetadata(
            tosca_meta_file_version=meta_dict.get("TOSCA-Meta-File-Version"),
            csar_version=meta_dict.get("CSAR-Version"),
            created_by=meta_dict.get("Created-By"),
            entry_definitions=meta_dict.get("Entry-Definitions"),
            template_name=meta_dict.get("Template-Name"),
            template_version=meta_dict.get("Template-Version"),
            template_tags=meta_dict.get("Template-Tags"),
        )

    @staticmethod
    def get_mappings_from_mapping_file(cba_mapping_file_bytes: bytes
                                       ) -> Generator[Mapping, None, None]:
        """Read mapping file and create Mappping object for it.

        Args:
            cba_mapping_file_bytes (bytes): CBA mapping file bytes.

        Yields:
            Generator[Mapping, None, None]: Mapping object.

        """
        mapping_file_json = json.loads(cba_mapping_file_bytes)
        for mapping in mapping_file_json:
            yield Mapping(
                name=mapping["name"],
                mapping_type=mapping["property"]["type"],
                dictionary_name=mapping["dictionary-name"],
                dictionary_sources=[mapping["dictionary-source"]],
            )

    def get_mappings(self, cba_zip_file: ZipFile) -> MappingSet:
        """Read mappings from CBA file.

        Search mappings in CBA file and create Mapping object for each of them.

        Args:
            cba_zip_file (ZipFile): CBA file to get mappings from.

        Returns:
            MappingSet: Mappings set object.

        """
        mapping_set = MappingSet()
        for info in cba_zip_file.infolist():
            if re.match(self.TEMPLATES_RE, info.filename):
                mapping_set.extend(
                    self.get_mappings_from_mapping_file(cba_zip_file.read(info.filename))
                )
        return mapping_set

    def get_workflows(self,
                      cba_entry_definitions_file_bytes: bytes) -> Generator[Workflow, None, None]:
        """Get worfklows from entry_definitions file.

        Parse entry_definitions file and create Workflow objects for workflows stored in.

        Args:
            cba_entry_definitions_file_bytes (bytes): entry_definition file.

        Yields:
            Generator[Workflow, None, None]: Workflow object.

        """
        entry_definitions_json: dict = json.loads(cba_entry_definitions_file_bytes)
        workflows: dict = entry_definitions_json.get("topology_template", {}).get("workflows", {})
        for workflow_name, workflow_data in workflows.items():
            yield Workflow(workflow_name, workflow_data, self)

    def get_workflow_by_name(self, workflow_name: str) -> Workflow:
        """Get workflow by name.

        If there is no workflow with given name `ParameterError` is going to be raised.

        Args:
            workflow_name (str): Name of the workflow

        Returns:
            Workflow: Workflow with given name

        """
        try:
            return next(filter(lambda workflow: workflow.name == workflow_name, self.workflows))
        except StopIteration:
            raise ParameterError("Workflow with given name does not exist")

    def get_resolved_template(self,  # pylint: disable=too-many-arguments
                              artifact_name: str,
                              resolution_key: Optional[str] = None,
                              resource_type: Optional[str] = None,
                              resource_id: Optional[str] = None,
                              occurrence: Optional[str] = None) -> Dict[str, str]:
        """Get resolved template for Blueprint.

        Args:
            artifact_name (str): Resolved template's artifact name
            resolution_key (Optional[str], optional): Resolved template's resolution key.
                Defaults to None.
            resource_type (Optional[str], optional): Resolved template's resource type.
                Defaults to None.
            resource_id (Optional[str], optional): Resolved template's resource ID.
                Defaults to None.
            occurrence: (Optional[str], optional): Resolved template's occurrence value.
                Defaults to None.

        Returns:
            Dict[str, str]: Resolved template

        """
        return ResolvedTemplate(blueprint=self,
                                artifact_name=artifact_name,
                                resolution_key=resolution_key,
                                resource_type=resource_type,
                                resource_id=resource_id,
                                occurrence=occurrence).get_resolved_template()

    def store_resolved_template(self,  # pylint: disable=too-many-arguments
                                artifact_name: str,
                                data: str,
                                resolution_key: Optional[str] = None,
                                resource_type: Optional[str] = None,
                                resource_id: Optional[str] = None) -> None:
        """Store resolved template for Blueprint.

        Args:
            artifact_name (str): Resolved template's artifact name
            data (str): Resolved template
            resolution_key (Optional[str], optional): Resolved template's resolution key.
                Defaults to None.
            resource_type (Optional[str], optional): Resolved template's resource type.
                Defaults to None.
            resource_id (Optional[str], optional): Resolved template's resource ID.
                Defaults to None.
        """
        ResolvedTemplate(blueprint=self,
                         artifact_name=artifact_name,
                         resolution_key=resolution_key,
                         resource_type=resource_type,
                         resource_id=resource_id).store_resolved_template(data)