Orange-OpenSource/python-onapsdk

View on GitHub
src/onapsdk/clamp/loop_instance.py

Summary

Maintainability
A
2 hrs
Test Coverage
A
99%
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: Apache-2.0
"""Control Loop module."""
import json
from pathlib import Path
from jsonschema import validate, ValidationError

from onapsdk.clamp.clamp_element import Clamp
from onapsdk.utils.jinja import jinja_env
from onapsdk.exceptions import ParameterError

CLAMP_UPDDATE_REFRESH_TIMER = 60

class LoopInstance(Clamp):
    """Control Loop instantiation class."""

    # class variable
    _loop_schema = None
    operational_policies = ""

    def __init__(self, template: str, name: str, details: dict) -> None:
        """
        Initialize loop instance object.

        Args:
            template (str): template from which we build the loop
            name (str) : loop creation name
            details (dict) : dictionnary containing all loop details

        """
        super().__init__()
        self.template = template
        self.name = "LOOP_" + name
        self._details = details

    @property
    def details(self) -> dict:
        """Return and lazy load the details."""
        if not self._details:
            self._update_loop_details()
        return self._details

    @details.setter
    def details(self, details: dict) -> None:
        """Set value for details."""
        self._details = details

    def _update_loop_details(self) -> dict:
        """
        Update loop details.

        Returns:
            the dictionnary of loop details

        """
        url = f"{self.base_url()}/loop/{self.name}"
        loop_details = self.send_message_json('GET',
                                              'Get loop details',
                                              url)
        return loop_details

    def refresh_status(self) -> None:
        """Reshresh loop status."""
        url = f"{self.base_url()}/loop/getstatus/{self.name}"
        loop_details = self.send_message_json('GET',
                                              'Get loop status',
                                              url)

        self.details = loop_details

    @property
    def loop_schema(self) -> dict:
        """
        Return and lazy load the details schema.

        Returns:
            schema to be respected to accede to loop details

        """
        if not self._loop_schema:
            schema_file = Path.cwd() / 'src' / 'onapsdk' / 'clamp' / 'schema_details.json'
            with open(schema_file, "rb") as plan:
                self._loop_schema = json.load(plan)
        return self._loop_schema

    def validate_details(self) -> bool:
        """
        Validate Loop Instance details.

        Returns:
            schema validation status (True, False)

        """
        try:
            validate(self.details, self.loop_schema)
        except ValidationError as error:
            self._logger.error(error)
            self._logger.error("---------")
            self._logger.error(error.absolute_path)
            self._logger.error("---------")
            self._logger.error(error.absolute_schema_path)
            return False
        return True

    def create(self) -> None:
        """Create instance and load loop details."""
        url = f"{self.base_url()}/loop/create/{self.name}?templateName={self.template}"
        instance_details = self.send_message_json('POST',
                                                  'Create Loop Instance',
                                                  url)
        self.details = instance_details

    def add_operational_policy(self, policy_type: str, policy_version: str) -> None:
        """
        Add operational policy to the loop instance.

        Args:
            policy_type (str): the full policy model type
            policy_version (str): policy version

        Raises:
            ParameterError : Corrupt response or a key in a dictionary not found.
                It will also be raised when the response contains more operational
                policies than there are currently.

        """
        url = (f"{self.base_url()}/loop/addOperationaPolicy/{self.name}/"
               f"policyModel/{policy_type}/{policy_version}")
        add_response = self.send_message_json('PUT',
                                              'Create Operational Policy',
                                              url)

        key = "operationalPolicies"

        try:
            if self.details[key] is None:
                self.details[key] = []

            response_policies = add_response[key]
            current_policies = self.details[key]
        except KeyError as exc:
            msg = 'Corrupt response, current loop details. Key not found.'
            raise ParameterError(msg) from exc

        if len(response_policies) > len(current_policies):
            self.details = add_response
        else:
            raise ParameterError("Couldn't add the operational policy.")

    def remove_operational_policy(self, policy_type: str, policy_version: str) -> None:
        """
        Remove operational policy from the loop instance.

        Args:
            policy_type (str): the full policy model type
            policy_version (str): policy version

        """
        url = (f"{self.base_url()}/loop/removeOperationaPolicy/"
               f"{self.name}/policyModel/{policy_type}/{policy_version}")
        self.details = self.send_message_json('PUT',
                                              'Remove Operational Policy',
                                              url)

    def update_microservice_policy(self) -> None:
        """
        Update microservice policy configuration.

        Update microservice policy configuration using payload data.

        """
        url = f"{self.base_url()}/loop/updateMicroservicePolicy/{self.name}"
        template = jinja_env().get_template("clamp_add_tca_config.json.j2")
        microservice_name = self.details["globalPropertiesJson"]["dcaeDeployParameters"]\
                                        ["uniqueBlueprintParameters"]["policy_id"]
        data = template.render(name=microservice_name,
                               LOOP_name=self.name)

        self.send_message('POST',
                          'ADD TCA config',
                          url,
                          data=data)

    def extract_operational_policy_name(self, policy_type: str) -> str:
        """
        Return operational policy name for a closed loop and a given policy.

        Args:
            policy_type (str): the policy acronym.

        Raises:
            ParameterError : Couldn't load the operational policy name.

        Returns:
            Operational policy name in the loop details after adding a policy.

        """
        for policy in filter(lambda x: x["policyModel"]["policyAcronym"] == policy_type,
                             self.details["operationalPolicies"]):
            return policy["name"]

        raise ParameterError("Couldn't load the operational policy name.")

    def add_drools_conf(self) -> dict:
        """Add drools configuration."""
        self.validate_details()
        vfmodule_dicts = self.details["modelService"]["resourceDetails"]["VFModule"]
        entity_ids = {}
        #Get the vf module details
        for vfmodule in vfmodule_dicts.values():
            entity_ids["resourceID"] = vfmodule["vfModuleModelName"]
            entity_ids["modelInvariantId"] = vfmodule["vfModuleModelInvariantUUID"]
            entity_ids["modelVersionId"] = vfmodule["vfModuleModelUUID"]
            entity_ids["modelName"] = vfmodule["vfModuleModelName"]
            entity_ids["modelVersion"] = vfmodule["vfModuleModelVersion"]
            entity_ids["modelCustomizationId"] = vfmodule["vfModuleModelCustomizationUUID"]
        template = jinja_env().get_template("clamp_add_drools_policy.json.j2")
        data = template.render(name=self.extract_operational_policy_name("Drools"),
                               resourceID=entity_ids["resourceID"],
                               modelInvariantId=entity_ids["modelInvariantId"],
                               modelVersionId=entity_ids["modelVersionId"],
                               modelName=entity_ids["modelName"],
                               modelVersion=entity_ids["modelVersion"],
                               modelCustomizationId=entity_ids["modelCustomizationId"],
                               LOOP_name=self.name)
        return data

    def add_minmax_config(self) -> None:
        """Add MinMax operational policy config."""
        #must configure start/end time and min/max instances in json file
        template = jinja_env().get_template("clamp_MinMax_config.json.j2")
        return template.render(name=self.extract_operational_policy_name("MinMax"))

    def add_frequency_limiter(self, limit: int = 1) -> None:
        """Add frequency limiter config."""
        template = jinja_env().get_template("clamp_add_frequency.json.j2")
        return template.render(name=self.extract_operational_policy_name("FrequencyLimiter"),
                               LOOP_name=self.name,
                               limit=limit)

    def add_op_policy_config(self, func, **kwargs) -> None:
        """
        Add operational policy configuration.

        Add operation policy configuration using payload data.

        Args:
            func (function): policy configuration function in (add_drools_conf,
                                                               add_minmax_config,
                                                               add_frequency_limiter)

        """
        data = func(**kwargs)
        if not data:
            raise ParameterError("Payload data from configuration is None.")
        if self.operational_policies:
            self.operational_policies = self.operational_policies[:-1] + ","
            data = data[1:]
        self.operational_policies += data
        url = f"{self.base_url()}/loop/updateOperationalPolicies/{self.name}"
        self.send_message('POST',
                          'ADD operational policy config',
                          url,
                          data=self.operational_policies)

        self._logger.info(("Files for op policy config %s have been uploaded to loop's"
                           "Op policy"), self.name)

    def submit(self):
        """Submit policies to policy engine."""
        state = self.details["components"]["POLICY"]["componentState"]["stateName"]
        return state == "SENT_AND_DEPLOYED"

    def stop(self):
        """Undeploy Policies from policy engine."""
        state = self.details["components"]["POLICY"]["componentState"]["stateName"]
        return state == "SENT"

    def restart(self):
        """Redeploy policies to policy engine."""
        state = self.details["components"]["POLICY"]["componentState"]["stateName"]
        return state == "SENT_AND_DEPLOYED"

    def act_on_loop_policy(self, func) -> bool:
        """
        Act on loop's policy.

        Args:
            func (function): function of action to be done (submit, stop, restart)

        Returns:
            action state : failed or done

        """
        url = f"{self.base_url()}/loop/{func.__name__}/{self.name}"
        self.send_message('PUT',
                          f'{func.__name__} policy',
                          url)
        self.refresh_status()
        self.validate_details()
        return func()

    def deploy_microservice_to_dcae(self) -> bool:
        """
        Execute the deploy operation on the loop instance.

        Returns:
            loop deploy on DCAE status (True, False)

        """
        url = f"{self.base_url()}/loop/deploy/{self.name}"
        self.send_message('PUT',
                          'Deploy microservice to DCAE',
                          url)
        self.validate_details()
        state = self.details["components"]["DCAE"]["componentState"]["stateName"]
        failure = "MICROSERVICE_INSTALLATION_FAILED"
        success = "MICROSERVICE_INSTALLED_SUCCESSFULLY"
        while state not in (success, failure):
            self.refresh_status()
            self.validate_details()
            state = self.details["components"]["DCAE"]["componentState"]["stateName"]
        return state == success

    def undeploy_microservice_from_dcae(self) -> None:
        """Stop the deploy operation."""
        url = f"{self.base_url()}/loop/undeploy/{self.name}"
        self.send_message('PUT',
                          'Undeploy microservice from DCAE',
                          url)

    def delete(self) -> None:
        """Delete the loop instance."""
        self._logger.debug("Delete %s loop instance", self.name)
        url = "{}/loop/delete/{}".format(self.base_url(), self.name)
        self.send_message('PUT',
                          'Delete loop instance',
                          url)