florath/rmtoo

View on GitHub
rmtoo/lib/configuration/Cfg.py

Summary

Maintainability
D
2 days
Test Coverage
'''
 rmtoo
   Free and Open Source Requirements Management Tool

 The Configuration Class hold values from the different configuration
 sources, i.e. configuration class, JSON configuration objects and the
 command line.

 (c) 2011-2012,2017 by flonatel GmbH & Co. KG

 For licensing details see COPYING
'''
from __future__ import print_function

from abc import ABCMeta, abstractmethod
import os
import json
from six import iteritems
import yaml

import rmtoo.lib.configuration.DictHelper as DictHelper
from rmtoo.lib.Encoding import Encoding
from rmtoo.lib.configuration.CfgEx import CfgEx
from rmtoo.lib.configuration.CmdLineParams import CmdLineParams
from rmtoo.lib.RMTException import RMTException


class CfgFormatBase:
    """Common base class for format configuration objects"""

    __metaclass__ = ABCMeta

    def __init__(self, cfg, name):
        """Set up a CfgFormatJson object"""
        self.__cfg = cfg
        self.__name = name

    @abstractmethod
    def create_cfg_from_str(self, url):
        """Create configuration from given string"""
        return

    @abstractmethod
    def create_cfg_from_file(self, url):
        """Create configuration from given file"""
        return

    def __create_cfg_from_url(self, url):
        '''Depending on the URL the low level method to
           create the configuration is called.'''
        if url.startswith(self.__name + ":"):
            return self.create_cfg_from_str(url)
        elif url.startswith("file:"):
            return self.create_cfg_from_file(url)
        assert False

    def __evaluate_once(self, config):
        """Evaluates the given configuration and returns it"""
        cfg = {}
        for cfg_idx in config:
            DictHelper.merge(cfg, self.__create_cfg_from_url(cfg_idx))
        return cfg

    def evaluate(self):
        """As long as there are parameters, handle them."""
        try:
            while True:
                config = self.__cfg.get_value(
                    ['configuration', self.__name])
                # This must be removed before the evaluation, because it
                # is possible that during the evaluation additional
                # entries will appear.
                del self.__cfg['configuration'][self.__name]
                DictHelper.merge(self.__cfg, self.__evaluate_once(config))
        except RMTException:
            # Nothing to do: entries not available
            pass


class CfgFormatJson(CfgFormatBase):
    """Handles configuration written in JSON"""

    def create_cfg_from_str(self, url):
        """Create config from JSON string"""
        if url.startswith("json:"):
            url = url[5:]
        jdict = json.loads(url)
        if not isinstance(jdict, dict):
            raise CfgEx("Given JSON string encodes no dictionary.")
        return jdict

    def create_cfg_from_file(self, url):
        """Creates dict from JSON file"""
        if url.startswith("file://"):
            url = url[7:]
        with open(url, "r") as jfd:
            jdict = json.load(jfd)
        if not isinstance(jdict, dict):
            raise CfgEx("Given JSON string encodes no dictionary.")
        return jdict


def custom_str_constructor(loader, node):
    """This takes care that all the configuration is read in
    using unicode.
    """
    return Encoding.to_unicode(loader.construct_scalar(node))


yaml.SafeLoader.add_constructor(
    u'tag:yaml.org,2002:str', custom_str_constructor)


class CfgFormatYaml(CfgFormatBase):
    """Handles configuration written in YAML"""

    def create_cfg_from_str(self, url):
        """Create config from YAML string"""
        if url.startswith("yaml:"):
            url = url[5:]
        ydict = yaml.safe_load(url)
        if not isinstance(ydict, dict):
            raise CfgEx("Given YAML string encodes no dictionary.")
        return ydict

    def create_cfg_from_file(self, url):
        """Creates dict from YAML file"""
        if url.startswith("file://"):
            url = url[7:]
        with open(url, "r") as yfd:
            ydict = yaml.safe_load(yfd)
        if not isinstance(ydict, dict):
            raise CfgEx("Given YAML string encodes no dictionary.")
        return ydict


class Cfg(dict):
    '''
    Configuration Class

    Stores all the values from different sources into one data container
    which is a dictionary.

    Each part of a Cfg is again a Cfg - except the last step where it is
    a value.

    Has some special access methods like get_value("key.subkey.subsubkey")
    to simplify configuration access.
    '''

    @staticmethod
    def new_by_json_str(jstr):
        """Creates a new Cfg object with the contents of the given
        string.  The string must be a valid JSON structure.
        This is a static factory method.
        """
        config = Cfg()
        cfg_format = CfgFormatJson(config, 'json')
        DictHelper.merge(config, cfg_format.create_cfg_from_str(jstr))
        return config

    def merge_json_str(self, jstr):
        """Merges a JSON config into an already existing"""
        cfg_format = CfgFormatJson(self, 'json')
        DictHelper.merge(self, cfg_format.create_cfg_from_str(jstr))

    def merge_cmd_line_params(self, args):
        '''Merges the command line arguments into the
           existing configuration.'''
        ldicts = CmdLineParams.create_dicts(args)
        for ldict in ldicts:
            DictHelper.merge(self, ldict)

    def evaluate(self):
        """Evaluates the configuration.

        This does two things: read in the different configurations
        of different formats and merges them into one big configuration
        object.
        """
        cfgs = {'json': CfgFormatJson(self, 'json'),
                'yaml': CfgFormatYaml(self, 'yaml')}
        for _, cfg_obj in iteritems(cfgs):
            cfg_obj.evaluate()

    def get_value(self, key):
        '''Returns the value of the given key.
           If key is not found a RMTException is thrown.'''
        try:
            rval = DictHelper.get_raw(self, key)
            if isinstance(rval, dict):
                return Cfg(rval)
            return rval
        except CfgEx as cex:
            raise RMTException(96, "Mandatory configuration parameter "
                               "[%s] not found. (Root cause: [%s])"
                               % (key, cex))

    @staticmethod
    def __replace_env(estr):
        '''Resolves the environment variable and returns it.'''
        try:
            return os.environ[estr]
        except KeyError:
            # If not there, use original string.
            return estr

    def __replace_key(self, key):
        '''Resolves the key and returns is.'''
        return self.get_value(key)

    def __replace(self, cstr):
        '''Looks if the given string exists as variable.
           If so, returns the resolved string.'''
        if cstr.startswith("ENV:"):
            return self.__replace_env(cstr[4:])
        return self.__replace_key(cstr)

    def __dollar_replace_string(self, cstr):
        '''Replaces all occurrences of ${} with the appropriate value.'''
        while True:
            dstart = cstr.find("${")
            if dstart == -1:
                # No ${} any more...
                return cstr
            dend = cstr.find("}", dstart + 2)
            if dend == -1:
                # No }: no way to replace things
                return cstr
            vname = cstr[dstart + 2:dend]
            rep = self.__replace(vname)
            cstr = cstr[:dstart] + rep + cstr[dend + 1:]

    def __dollar_replace_list(self, value):
        '''Replaces all occurrences of ${} for list.'''
        res = []
        for vstr in value:
            res.append(self.__dollar_replace_string(vstr))
        return res

    def dollar_replace(self, value):
        '''Replaces all occurrences of ${} for different types.'''
        if Encoding.is_unicode(value):
            return self.__dollar_replace_string(value)
        if isinstance(value, list):
            return self.__dollar_replace_list(value)
        # Never reached: unknown type
        print("Cfg never reached [%s] [%s]" % (type(value), value))
        assert False

    def get_rvalue(self, key):
        '''Returns the real value of the given key.
           If found the value is interpreted as string -
           and the ${} replacement takes place.
           If key is not found a RMTException is thrown.'''
        return self.dollar_replace(self.get_value(key))

    def get_rvalue_default(self, key, default_value):
        '''Return the value of the key from the configuration.
           Replacement of ${} is done, if the key is available,
           If the key is not available, the default_value is returned.'''
        try:
            return self.dollar_replace(DictHelper.get_raw(self, key))
        except CfgEx:
            return default_value

    def get_value_wo_throw(self, key):
        '''Returns the value of the given key.
           If key is not found None is returned.'''
        return self.get_value_default(key, None)

    def get_value_default(self, key, default_value):
        '''Return the value of the key from the configuration.
           If the key is not available, the default_value is returned.'''
        try:
            return DictHelper.get_raw(self, key)
        except CfgEx:
            return default_value

    def set_value(self, key, value):
        '''Sets the value. If the key is already there a CfgEx is
           raised.'''
        ckey = DictHelper.cfg_key(key)
        DictHelper.set_value(self, ckey, value)

#    def append_list(self, key, value):
#        '''Appends value to existing list under key.
#           If key does not exists, a new list is created.'''
#        key = InternalCfg.convert_key(key)
#        InternalCfg.append_list(self.config, key, value)

    def get_bool(self, key, default_value):
        '''Returns the value of the key - converted to a boolean.
           If key does not exists, the default value is returned.'''
        try:
            return DictHelper.get_raw(self, key) \
                in ['True', 'true', 'on', '1',
                    'Yes', 'yes', True]
        except CfgEx:
            return default_value

    def get_integer(self, key, default_value):
        '''Returns the value of the key - converted to an integer.
           If key does not exists, the default value is returned.'''
        try:
            return int(DictHelper.get_raw(self, key))
        except CfgEx:
            return default_value

    def is_available(self, key):
        '''Returns true if the given key is available in the configuration
        and the valus is not None.'''
        return self.get_value_wo_throw(key) is not None