src/onapsdk/cds/blueprint.py
# SPDX-License-Identifier: Apache-2.0
"""CDS Blueprint module."""
import json
import re
from dataclasses import dataclass, field
from datetime import datetime
from io import BytesIO
from typing import Any, Dict, Generator, Iterator, List, Optional
from urllib.parse import urlencode
from uuid import uuid4
from zipfile import ZipFile
import oyaml as yaml
from onapsdk.utils.jinja import jinja_env
from onapsdk.exceptions import FileError, ParameterError, ValidationError
from .cds_element import CdsElement
from .data_dictionary import DataDictionary, DataDictionarySet
@dataclass
class CbaMetadata:
"""Class to hold CBA metadata values."""
tosca_meta_file_version: str
csar_version: str
created_by: str
entry_definitions: str
template_name: str
template_version: str
template_tags: str
@dataclass
class Mapping:
"""Blueprint's template mapping.
Stores mapping data:
- name,
- type,
- name of dictionary from which value should be get,
- dictionary source of value.
"""
name: str
mapping_type: str
dictionary_name: str
dictionary_sources: List[str] = field(default_factory=list)
def __hash__(self) -> int: # noqa: D401
"""Mapping object hash.
Based on mapping name.
Returns:
int: Mapping hash
"""
return hash(self.name)
def __eq__(self, mapping: "Mapping") -> bool:
"""Compare two mapping objects.
Mappings are equal if have the same name.
Args:
mapping (Mapping): Mapping object to compare with.
Returns:
bool: True if objects have the same name, False otherwise.
"""
return self.name == mapping.name
def merge(self, mapping: "Mapping") -> None:
"""Merge mapping objects.
Merge objects dictionary sources.
Args:
mapping (Mapping): Mapping object to merge.
"""
self.dictionary_sources = list(
set(self.dictionary_sources) | set(mapping.dictionary_sources)
)
def generate_data_dictionary(self) -> dict:
"""Generate data dictionary for mapping.
Data dictionary with required data sources, type and name for mapping will be created from
Jinja2 template.
Returns:
dict: Data dictionary
"""
return json.loads(
jinja_env().get_template("data_dictionary_base.json.j2").render(mapping=self)
)
class MappingSet:
"""Set of mapping objects.
Mapping objects will be stored in dictionary where mapping name is a key.
No two mappings with the same name can be stored in this collection.
"""
def __init__(self) -> None:
"""Initialize mappings collection.
Create dictionary to store mappings.
"""
self.mappings = {}
def __len__(self) -> int: # noqa: D401
"""Mapping set length.
Returns:
int: Number of stored mapping objects.
"""
return len(self.mappings)
def __iter__(self) -> Iterator[Mapping]:
"""Iterate through mapping stored in set.
Returns:
Iterator[Mapping]: Stored mappings iterator.
"""
return iter(list(self.mappings.values()))
def __getitem__(self, index: int) -> Mapping:
"""Get item stored on given index.
Args:
index (int): Index number.
Returns:
Mapping: Mapping stored on given index.
"""
return list(self.mappings.values())[index]
def add(self, mapping: Mapping) -> None:
"""Add mapping to set.
If there is already mapping object with the same name in collection
they will be merged.
Args:
mapping (Mapping): Mapping to add to collection.
"""
if mapping.name not in self.mappings:
self.mappings.update({mapping.name: mapping})
else:
self.mappings[mapping.name].merge(mapping)
def extend(self, iterable: Iterator[Mapping]) -> None:
"""Extend set with an iterator of mappings.
Args:
iterable (Iterator[Mapping]): Mappings iterator.
"""
for mapping in iterable:
self.add(mapping)
class Workflow(CdsElement):
"""Blueprint's workflow.
Stores workflow steps, inputs, outputs.
Executes workflow using CDS HTTP API.
"""
@dataclass
class WorkflowStep:
"""Workflow step class.
Stores step name, description, target and optional activities.
"""
name: str
description: str
target: str
activities: List[Dict[str, str]] = field(default_factory=list)
@dataclass
class WorkflowInput:
"""Workflow input class.
Stores input name, information if it's required, type, and optional description.
"""
name: str
required: bool
type: str
description: str = ""
@dataclass
class WorkflowOutput:
"""Workflow output class.
Stores output name, type na value.
"""
name: str
type: str
value: Dict[str, Any]
def __init__(self,
cba_workflow_name: str,
cba_workflow_data: dict,
blueprint: "Blueprint") -> None:
"""Workflow initialization.
Args:
cba_workflow_name (str): Workflow name.
cba_workflow_data (dict): Workflow data.
blueprint (Blueprint): Blueprint object which contains workflow.
"""
super().__init__()
self.name: str = cba_workflow_name
self.workflow_data: dict = cba_workflow_data
self.blueprint: "Blueprint" = blueprint
self._steps: List[self.WorkflowStep] = None
self._inputs: List[self.WorkflowInput] = None
self._outputs: List[self.WorkflowOutput] = None
def __repr__(self) -> str:
"""Representation of object.
Returns:
str: Object's string representation
"""
return (f"Workflow(name='{self.name}', "
f"blueprint_name='{self.blueprint.metadata.template_name})'")
@property
def steps(self) -> List["Workflow.WorkflowStep"]:
"""Workflow's steps property.
Returns:
List[Workflow.WorkflowStep]: List of workflow's steps.
"""
if self._steps is None:
self._steps = []
for step_name, step_data in self.workflow_data.get("steps", {}).items():
self._steps.append(
self.WorkflowStep(
name=step_name,
description=step_data.get("description"),
target=step_data.get("target"),
activities=step_data.get("activities", []),
)
)
return self._steps
@property
def inputs(self) -> List["Workflow.WorkflowInput"]:
"""Workflow's inputs property.
Returns:
List[Workflow.WorkflowInput]: List of workflows's inputs.
"""
if self._inputs is None:
self._inputs = []
for input_name, input_data in self.workflow_data.get("inputs", {}).items():
self._inputs.append(
self.WorkflowInput(
name=input_name,
required=input_data.get("required"),
type=input_data.get("type"),
description=input_data.get("description"),
)
)
return self._inputs
@property
def outputs(self) -> List["Workflow.WorkflowOutput"]:
"""Workflow's outputs property.
Returns:
List[Workflow.WorkflowOutput]: List of workflows's outputs.
"""
if self._outputs is None:
self._outputs = []
for output_name, output_data in self.workflow_data.get("outputs", {}).items():
self._outputs.append(
self.WorkflowOutput(
name=output_name,
type=output_data.get("type"),
value=output_data.get("value"),
)
)
return self._outputs
@property
def url(self) -> str:
"""Workflow execution url.
Returns:
str: Url to call warkflow execution.
"""
return f"{self._url}/api/v1/execution-service/process"
def execute(self, inputs: dict) -> dict:
"""Execute workflow.
Call CDS HTTP API to execute workflow.
Args:
inputs (dict): Inputs dictionary.
Returns:
dict: Response's payload.
"""
# There should be some flague to check if CDS UI API is used or blueprintprocessor.
# For CDS UI API there is no endporint to execute workflow, so it has to be turned off.
execution_service_input: dict = {
"commonHeader": {
"originatorId": "onapsdk",
"requestId": str(uuid4()),
"subRequestId": str(uuid4()),
"timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
},
"actionIdentifiers": {
"blueprintName": self.blueprint.metadata.template_name,
"blueprintVersion": self.blueprint.metadata.template_version,
"actionName": self.name,
"mode": "SYNC", # Has to be SYNC for REST call
},
"payload": {f"{self.name}-request": inputs},
}
response: "requests.Response" = self.send_message_json(
"POST",
f"Execute {self.blueprint.metadata.template_name} blueprint {self.name} workflow",
self.url,
auth=self.auth,
data=json.dumps(execution_service_input)
)
return response["payload"]
class ResolvedTemplate(CdsElement):
"""Resolved template class.
Store and retrieve rendered template results.
"""
def __init__(self, blueprint: "Blueprint", # pylint: disable=too-many-arguments
artifact_name: Optional[str] = None,
resolution_key: Optional[str] = None,
resource_id: Optional[str] = None,
resource_type: Optional[str] = None,
occurrence: Optional[str] = None,
response_format: str = "application/json") -> None:
"""Init resolved template class instance.
Args:
blueprint (Blueprint): Blueprint object.
artifact_name (Optional[str], optional): Artifact name for which to retrieve
a resolved resource. Defaults to None.
resolution_key (Optional[str], optional): Resolution Key associated with
the resolution. Defaults to None.
resource_id (Optional[str], optional): Resource Id associated with
the resolution. Defaults to None.
resource_type (Optional[str], optional): Resource Type associated
with the resolution. Defaults to None.
occurrence (Optional[str], optional): Occurrence of the template resolution (1-n).
Defaults to None.
response_format (str): Expected format of the template being retrieved.
"""
super().__init__()
self.blueprint: "Blueprint" = blueprint
self.artifact_name: Optional[str] = artifact_name
self.resolution_key: Optional[str] = resolution_key
self.resource_id: Optional[str] = resource_id
self.resource_type: Optional[str] = resource_type
self.occurrence: Optional[str] = occurrence
self.response_format: str = response_format
@property
def url(self) -> str:
"""Url property.
Returns:
str: Url
"""
return f"{self._url}/api/v1/template"
@property
def resolved_template_url(self) -> str:
"""Url to retrieve resolved template.
Filter None parameters.
Returns:
str: Retrieve resolved template url
"""
params_dict: Dict[str, str] = urlencode(dict(filter(lambda item: item[1] is not None, {
"bpName": self.blueprint.metadata.template_name,
"bpVersion": self.blueprint.metadata.template_version,
"artifactName": self.artifact_name,
"resolutionKey": self.resolution_key,
"resourceType": self.resource_type,
"resourceId": self.resource_id,
"occurrence": self.occurrence,
"format": self.response_format
}.items())))
return f"{self.url}?{params_dict}"
def get_resolved_template(self) -> Dict[str, str]:
"""Get resolved template.
Returns:
Dict[str, str]: Resolved template
"""
return self.send_message_json(
"GET",
f"Get resolved template {self.artifact_name} for "
f"{self.blueprint.metadata.template_name} version "
f"{self.blueprint.metadata.template_version}",
self.resolved_template_url,
auth=self.auth
)
def store_resolved_template(self, resolved_template: str) -> None:
"""Store resolved template.
Args:
resolved_template (str): Template to store
Raises:
ParameterError: To store template it's needed to pass artifact name and:
- resolution key, or
- resource type and resource id.
If not all needed parameters are given that exception will be raised.
"""
if self.artifact_name and self.resolution_key:
return self.store_resolved_template_with_resolution_key(resolved_template)
if self.artifact_name and self.resource_type and self.resource_id:
return self.store_resolved_template_with_resource_type_and_id(resolved_template)
raise ParameterError("To store template artifact name with resolution key or both "
"resource type and id is needed")
def store_resolved_template_with_resolution_key(self, resolved_template: str) -> None:
"""Store template using resolution key.
Args:
resolved_template (str): Template to store
"""
return self.send_message(
"POST",
f"Store resolved template {self.artifact_name} for "
f"{self.blueprint.metadata.template_name} version "
f"{self.blueprint.metadata.template_version}",
f"{self.url}/{self.blueprint.metadata.template_name}/"
f"{self.blueprint.metadata.template_version}/{self.artifact_name}/"
f"{self.resolution_key}",
auth=self.auth,
data=resolved_template
)
def store_resolved_template_with_resource_type_and_id(self, resolved_template: str) -> None:
"""Store template using resource type and resource ID.
Args:
resolved_template (str): Template to store
"""
return self.send_message(
"POST",
f"Store resolved template {self.artifact_name} for "
f"{self.blueprint.metadata.template_name} version "
f"{self.blueprint.metadata.template_version}",
f"{self.url}/{self.blueprint.metadata.template_name}/"
f"{self.blueprint.metadata.template_version}/{self.artifact_name}/"
f"{self.resource_type}/{self.resource_id}",
auth=self.auth,
data=resolved_template
)
class Blueprint(CdsElement):
"""CDS blueprint representation."""
TEMPLATES_RE = r"Templates\/.*json$"
TOSCA_META = "TOSCA-Metadata/TOSCA.meta"
def __init__(self, cba_file_bytes: bytes) -> None:
"""Blueprint initialization.
Save blueprint zip file bytes.
You can create that object using opened file or bytes:
blueprint = Blueprint(open("path/to/CBA.zip", "rb"))
or
with open("path/to/CBA.zip", "rb") as cba:
blueprint = Blueprint(cba.read())
It is even better to use second example due to CBA file will be correctly closed for sure.
Args:
cba_file_bytes (bytes): CBA ZIP file bytes
"""
super().__init__()
self.cba_file_bytes: bytes = cba_file_bytes
self._cba_metadata: CbaMetadata = None
self._cba_mappings: MappingSet = None
self._cba_workflows: List[Workflow] = None
@property
def url(self) -> str:
"""URL address to use for CDS API call.
Returns:
str: URL to CDS blueprintprocessor.
"""
return f"{self._url}/api/v1/blueprint-model"
@property
def metadata(self) -> CbaMetadata:
"""Blueprint metadata.
Data from TOSCA.meta file.
Returns:
CbaMetadata: Blueprint metadata object.
"""
if not self._cba_metadata:
with ZipFile(BytesIO(self.cba_file_bytes)) as cba_zip_file:
self._cba_metadata = self.get_cba_metadata(cba_zip_file.read(self.TOSCA_META))
return self._cba_metadata
@property
def mappings(self) -> MappingSet:
"""Blueprint mappings collection.
Returns:
MappingSet: Mappings collection.
"""
if not self._cba_mappings:
with ZipFile(BytesIO(self.cba_file_bytes)) as cba_zip_file:
self._cba_mappings = self.get_mappings(cba_zip_file)
return self._cba_mappings
@property
def workflows(self) -> List["Workflow"]:
"""Blueprint's workflows property.
Returns:
List[Workflow]: Blueprint's workflow list.
"""
if not self._cba_workflows:
with ZipFile(BytesIO(self.cba_file_bytes)) as cba_zip_file:
self._cba_workflows = list(
self.get_workflows(cba_zip_file.read(self.metadata.entry_definitions))
)
return self._cba_workflows
@classmethod
def load_from_file(cls, cba_file_path: str) -> "Blueprint":
"""Load blueprint from file.
Raises:
FileError: File to load blueprint from doesn't exist.
Returns:
Blueprint: Blueprint object
"""
try:
with open(cba_file_path, "rb") as cba_file:
return Blueprint(cba_file.read())
except FileNotFoundError as exc:
msg = "The requested file with a blueprint doesn't exist."
raise FileError(msg) from exc
def enrich(self) -> "Blueprint":
"""Call CDS API to get enriched blueprint file.
Returns:
Blueprint: Enriched blueprint object
"""
response: "requests.Response" = self.send_message(
"POST",
"Enrich CDS blueprint",
f"{self.url}/enrich",
files={"file": self.cba_file_bytes},
headers={}, # Leave headers empty to fill it correctly by `requests` library
auth=self.auth
)
return Blueprint(response.content)
def publish(self) -> None:
"""Publish blueprint."""
self.send_message(
"POST",
"Publish CDS blueprint",
f"{self.url}/publish",
files={"file": self.cba_file_bytes},
headers={}, # Leave headers empty to fill it correctly by `requests` library
auth=self.auth
)
def deploy(self) -> None:
"""Deploy blueprint."""
self.send_message(
"POST",
"Deploy CDS blueprint",
f"{self.url}",
files={"file": self.cba_file_bytes},
headers={}, # Leave headers empty to fill it correctly by `requests` library
auth=self.auth
)
def save(self, dest_file_path: str) -> None:
"""Save blueprint to file.
Args:
dest_file_path (str): Path of file where blueprint is going to be saved
"""
with open(dest_file_path, "wb") as cba_file:
cba_file.write(self.cba_file_bytes)
def get_data_dictionaries(self) -> DataDictionarySet:
"""Get the generated data dictionaries required by blueprint.
If mapping reqires other source than input it should be updated before upload to CDS.
Returns:
Generator[DataDictionary, None, None]: DataDictionary objects.
"""
dd_set: DataDictionarySet = DataDictionarySet()
for mapping in self.mappings:
dd_set.add(DataDictionary(mapping.generate_data_dictionary()))
return dd_set
@staticmethod
def get_cba_metadata(cba_tosca_meta_bytes: bytes) -> CbaMetadata:
"""Parse CBA TOSCA.meta file and get values from it.
Args:
cba_tosca_meta_bytes (bytes): TOSCA.meta file bytes.
Raises:
ValidationError: TOSCA Meta file has invalid format.
Returns:
CbaMetadata: Dataclass with CBA metadata
"""
meta_dict: dict = yaml.safe_load(cba_tosca_meta_bytes)
if not isinstance(meta_dict, dict):
raise ValidationError("Invalid TOSCA Meta file")
return CbaMetadata(
tosca_meta_file_version=meta_dict.get("TOSCA-Meta-File-Version"),
csar_version=meta_dict.get("CSAR-Version"),
created_by=meta_dict.get("Created-By"),
entry_definitions=meta_dict.get("Entry-Definitions"),
template_name=meta_dict.get("Template-Name"),
template_version=meta_dict.get("Template-Version"),
template_tags=meta_dict.get("Template-Tags"),
)
@staticmethod
def get_mappings_from_mapping_file(cba_mapping_file_bytes: bytes
) -> Generator[Mapping, None, None]:
"""Read mapping file and create Mappping object for it.
Args:
cba_mapping_file_bytes (bytes): CBA mapping file bytes.
Yields:
Generator[Mapping, None, None]: Mapping object.
"""
mapping_file_json = json.loads(cba_mapping_file_bytes)
for mapping in mapping_file_json:
yield Mapping(
name=mapping["name"],
mapping_type=mapping["property"]["type"],
dictionary_name=mapping["dictionary-name"],
dictionary_sources=[mapping["dictionary-source"]],
)
def get_mappings(self, cba_zip_file: ZipFile) -> MappingSet:
"""Read mappings from CBA file.
Search mappings in CBA file and create Mapping object for each of them.
Args:
cba_zip_file (ZipFile): CBA file to get mappings from.
Returns:
MappingSet: Mappings set object.
"""
mapping_set = MappingSet()
for info in cba_zip_file.infolist():
if re.match(self.TEMPLATES_RE, info.filename):
mapping_set.extend(
self.get_mappings_from_mapping_file(cba_zip_file.read(info.filename))
)
return mapping_set
def get_workflows(self,
cba_entry_definitions_file_bytes: bytes) -> Generator[Workflow, None, None]:
"""Get worfklows from entry_definitions file.
Parse entry_definitions file and create Workflow objects for workflows stored in.
Args:
cba_entry_definitions_file_bytes (bytes): entry_definition file.
Yields:
Generator[Workflow, None, None]: Workflow object.
"""
entry_definitions_json: dict = json.loads(cba_entry_definitions_file_bytes)
workflows: dict = entry_definitions_json.get("topology_template", {}).get("workflows", {})
for workflow_name, workflow_data in workflows.items():
yield Workflow(workflow_name, workflow_data, self)
def get_workflow_by_name(self, workflow_name: str) -> Workflow:
"""Get workflow by name.
If there is no workflow with given name `ParameterError` is going to be raised.
Args:
workflow_name (str): Name of the workflow
Returns:
Workflow: Workflow with given name
"""
try:
return next(filter(lambda workflow: workflow.name == workflow_name, self.workflows))
except StopIteration:
raise ParameterError("Workflow with given name does not exist")
def get_resolved_template(self, # pylint: disable=too-many-arguments
artifact_name: str,
resolution_key: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
occurrence: Optional[str] = None) -> Dict[str, str]:
"""Get resolved template for Blueprint.
Args:
artifact_name (str): Resolved template's artifact name
resolution_key (Optional[str], optional): Resolved template's resolution key.
Defaults to None.
resource_type (Optional[str], optional): Resolved template's resource type.
Defaults to None.
resource_id (Optional[str], optional): Resolved template's resource ID.
Defaults to None.
occurrence: (Optional[str], optional): Resolved template's occurrence value.
Defaults to None.
Returns:
Dict[str, str]: Resolved template
"""
return ResolvedTemplate(blueprint=self,
artifact_name=artifact_name,
resolution_key=resolution_key,
resource_type=resource_type,
resource_id=resource_id,
occurrence=occurrence).get_resolved_template()
def store_resolved_template(self, # pylint: disable=too-many-arguments
artifact_name: str,
data: str,
resolution_key: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None) -> None:
"""Store resolved template for Blueprint.
Args:
artifact_name (str): Resolved template's artifact name
data (str): Resolved template
resolution_key (Optional[str], optional): Resolved template's resolution key.
Defaults to None.
resource_type (Optional[str], optional): Resolved template's resource type.
Defaults to None.
resource_id (Optional[str], optional): Resolved template's resource ID.
Defaults to None.
"""
ResolvedTemplate(blueprint=self,
artifact_name=artifact_name,
resolution_key=resolution_key,
resource_type=resource_type,
resource_id=resource_id).store_resolved_template(data)