atlassian/themis

View on GitHub
themis/config.py

Summary

Maintainability
D
1 day
Test Coverage
import os
import json
import re
import threading
from themis.util import common
from themis.util.common import run, now
from themis.model.aws_model import *
from themis.constants import *
from themis.monitoring import database


# config file location
CONFIG_FILE_LOCATION = os.path.join(os.getcwd(), 'themis.config.json')
RESOURCES_FILE_LOCATION = os.path.join(os.getcwd(), 'themis.resources.json')

# configuration sections
SECTION_GLOBAL = 'general'
SECTION_EMR = 'emr'
SECTION_KINESIS = 'kinesis'

# environment variable names
ENV_THEMIS_DB_URL = 'THEMIS_DB_URL'

# set this to override config for testing
TEST_CONFIG = None

# logger
LOG = common.get_logger(__name__)

# maps config keys to their descriptions
ALL_DESCRIPTIONS = {}

# configuration change listeners
CONFIG_LISTENERS = set()

# seconds to cache the config for
CONFIG_CACHE_DURATION = 10
last_config_load_time = 0
CACHED_CONFIG = None
# reentrant lock for config loading
CONFIG_LOCK = threading.RLock()


class ConfigObject(JsonObject):
    def get(self, *keys, **kwargs):
        default = kwargs.get('default')
        if len(keys) > 1:
            first_key = keys[0]
            remaining_keys = keys[1:]
            item = self.__dict__.get(first_key)
            if item is None:
                return default
            return item.get(*remaining_keys, **kwargs)
        key = keys[0]
        result = self.__dict__.get(key)
        if result is None:
            return default
        return result

    def set(self, key, value):
        self.__dict__[key] = value
        return value

    @classmethod
    def from_json(cls, j):
        result = cls()
        if not j:
            return result
        for k, v in j.iteritems():
            result.set(k, v)
        return result


class SystemConfiguration(ConfigObject):
    """Main configuration class representing the content of file themis.config.json"""

    def __init__(self):
        self.general = GeneralConfiguration()
        self.emr = EmrConfiguration()
        self.kinesis = KinesisConfiguration()

    @classmethod
    def from_json(cls, j):
        result = SystemConfiguration()
        result.general = GeneralConfiguration.from_json(j.get(SECTION_GLOBAL))
        result.emr = EmrConfiguration.from_json(j.get(SECTION_EMR))
        result.kinesis = KinesisConfiguration.from_json(j.get(SECTION_KINESIS))
        return result

    def set(self, key, value):
        if isinstance(value, dict):
            if key == SECTION_GLOBAL:
                value = GeneralConfiguration.from_dict(value)
            elif key == SECTION_EMR:
                value = EmrConfiguration.from_dict(value)
            elif key == SECTION_KINESIS:
                value = KinesisConfiguration.from_dict(value)
        return super(SystemConfiguration, self).set(key, value)


class GeneralConfiguration(ConfigObject):
    CONFIG_ITEMS = {
        'roles_to_assume': ('Comma-separated list of ARNs of IAM roles to assume via STS. ' +
            'Changing this property triggers reloading of the list of streams and clusters.'),
        'ssh_keys': ('Comma-separated list of SSH key files or environment variables to use for ' +
            'connecting to the clusters. If a value is "$var", the key is read from env variable $var'),
        # TODO move to EMR config?
        'autoscaling_clusters': 'Comma-separated list of cluster IDs to auto-scale',
        # TODO move to Kinesis config?
        'autoscaling_kinesis_streams': 'Comma-separated list of Kinesis stream names to auto-scale',
        'scaling_loop_interval': 'Loop interval seconds',
        'db_url': ('Database connection URL. ' +
            'Examples: sqlite:///themis.data.db or mysql://user:pass@host:port/dbname ' +
            'This value can be initialized via the $THEMIS_DB_URL environment variable.'),
        'monitoring_time_window': 'Time period (seconds) of historical monitoring data to consider for scaling'
    }

    def __init__(self):
        self.ssh_keys = '$SSH_KEY_ETL_PROD'
        self.db_url = os.environ.get(ENV_THEMIS_DB_URL) or 'sqlite:///themis.data.db'
        self.roles_to_assume = ''
        self.autoscaling_clusters = ''
        self.autoscaling_kinesis_streams = ''
        self.monitoring_time_window = 60 * 10
        self.scaling_loop_interval = LOOP_SLEEP_TIMEOUT_SECS

    def get_autoscaling_clusters(self):
        return re.split(r'\s*,\s*', self.autoscaling_clusters)

    def get_autoscaling_kinesis_streams(self):
        return re.split(r'\s*,\s*', self.autoscaling_kinesis_streams)


class EmrConfiguration(ConfigObject):

    def get(self, *keys, **kwargs):
        result = super(EmrConfiguration, self).get(*keys, **kwargs)
        if result is None and len(keys) == 1:
            # return default config
            result = EmrClusterConfiguration()
            self.set(keys[0], result)
        return result

    def set(self, key, value):
        if isinstance(value, dict):
            value = EmrClusterConfiguration.from_dict(value)
        return super(EmrConfiguration, self).set(key, value)


class EmrClusterConfiguration(ConfigObject):
    CONFIG_ITEMS = {
        'role_to_assume': 'ARN of IAM role to assume via STS when accessing this resource',
        'send_shutdown_signal': ('Whether to send Presto SHUTDOWN signal before terminating a node ' +
            '("true" or "false"). This has worked well up to EMR 4.x but does not seem to work in EMR 5+.'),
        'downscale_expr': 'Trigger cluster downscaling by the number of nodes this expression evaluates to',
        'upscale_expr': 'Trigger cluster upscaling by the number of nodes this expression evaluates to',
        'time_based_scaling': """A JSON string that maps date regular expressions to minimum number of nodes. \
            Dates to match against are formatted as "%a %Y-%m-%d %H:%M:%S". \
            Example config: { "(Mon|Tue|Wed|Thu|Fri).01:.*:.*": 1 }""".replace('    ', ''),
        'group_or_preferred_market': """Comma separated list of task instance groups and/or instance markets to \
            increase/decrease depending on order, e.g., "ig-12345,SPOT,ON_DEMAND" means to autoscale task group \
            ig-12345 if available, otherwise any SPOT group, or if necessary ON_DEMAND groups""".replace('    ', ''),
        'baseline_nodes': 'Number of baseline nodes to use for comparing costs and calculating savings',
        'custom_domain_name': 'Custom domain name to apply to all nodes in cluster (override aws-cli result)'
    }

    def __init__(self):
        self.role_to_assume = ''
        self.downscale_expr = """1 if \
            (tasknodes.running and tasknodes.active and tasknodes.count.nodes > time_based.minimum.nodes(now) \
            and tasknodes.average.cpu < 0.5 and tasknodes.average.mem < 0.9) \
            else 0""".replace('    ', '')
        self.upscale_expr = """(time_based.minimum.nodes(now) - tasknodes.count.nodes) if \
            (time_based.enabled and time_based.minimum.nodes(now) > tasknodes.count.nodes) \
            else (3 if (tasknodes.running and tasknodes.active and tasknodes.count.nodes < 25 \
            and (tasknodes.average.cpu > 0.7 or tasknodes.average.mem > 0.95)) else 0)""".replace('    ', '')
        self.time_based_scaling = '{}'
        self.group_or_preferred_market = ('"%s" if tasknodes.count.nodes < 15 ' +
            'else "%s"') % (MARKET_ON_DEMAND, MARKET_SPOT)
        self.baseline_nodes = '20'
        self.custom_domain_name = ''
        self.send_shutdown_signal = 'true'


class KinesisConfiguration(ConfigObject):
    CONFIG_ITEMS = {}

    def get(self, *keys, **kwargs):
        result = super(KinesisConfiguration, self).get(*keys, **kwargs)
        if result is None and len(keys) == 1:
            # return default config
            result = KinesisStreamConfiguration()
            self.set(keys[0], result)
        return result

    def set(self, key, value):
        if isinstance(value, dict):
            value = KinesisStreamConfiguration.from_dict(value)
        return super(KinesisConfiguration, self).set(key, value)


class KinesisStreamConfiguration(ConfigObject):
    CONFIG_ITEMS = {
        'role_to_assume': 'ARN of IAM role to assume via STS when accessing this resource',
        'enable_enhanced_monitoring': """Enable enhanced monitoring. Setting the value to "true" \
            (without quotes) enables per-shard monitoring with ShardLevelMetrics=ALL""",
        'stream_upscale_expr': 'Trigger stream upscaling by the number of shards this expression evaluates to',
        'stream_downscale_expr': 'Trigger stream downscaling by the number of shards this expression evaluates to'
    }

    def __init__(self):
        self.role_to_assume = ''
        self.enable_enhanced_monitoring = 'false'
        self.stream_downscale_expr = '1 if (shards.count > 1 and stream.IncomingBytes.average < 100000) else 0'
        self.stream_upscale_expr = ('1 if (shards.count < 5 and ' +
            '(stream.IncomingBytes.last / shards.count) > 500000) else 0')


ALL_CONFIG_CLASSES = [GeneralConfiguration, EmrClusterConfiguration, KinesisConfiguration, KinesisStreamConfiguration]
# populate ALL_DESCRIPTIONS
for clazz in ALL_CONFIG_CLASSES:
    ALL_DESCRIPTIONS.update(clazz.CONFIG_ITEMS)


def convert_to_list(cfg):
    result = []
    for k, v in cfg.iteritems():
        result.append({KEY: k, VAL: v, DESC: ALL_DESCRIPTIONS.get(k)})
    return result


def convert_from_list(cfgs):
    result = {}
    for cfg in cfgs:
        result[cfg[KEY]] = cfg[VAL]
    return result


def get_config(force_load=False, config_file_only=False):
    global last_config_load_time, CACHED_CONFIG
    if TEST_CONFIG:
        return TEST_CONFIG

    CONFIG_LOCK.acquire()
    try:
        time_now = now()
        if (time_now - last_config_load_time) > CONFIG_CACHE_DURATION:
            force_load = True
        if CACHED_CONFIG and not force_load:
            return CACHED_CONFIG

        app_config = common.load_json_file(CONFIG_FILE_LOCATION)
        if not app_config:
            app_config = SystemConfiguration()
            common.save_file(CONFIG_FILE_LOCATION, app_config.to_json())
        else:
            app_config = SystemConfiguration.from_json(app_config)

        # load additional configs from DB
        if not config_file_only:
            configs = database.configs_fetch_all()
            for config in configs:
                if config['resource']:
                    app_config.get(config['section']).set(config['resource'], config['config'])
                elif config['section'] == SECTION_GLOBAL:
                    app_config.set(config['section'], config['config'])
            CACHED_CONFIG = app_config
            last_config_load_time = now()
    finally:
        CONFIG_LOCK.release()

    return app_config


def write(config, section=SECTION_GLOBAL, resource=None):
    app_config = get_config(force_load=True)
    if section == SECTION_GLOBAL:
        new_app_config = SystemConfiguration()
        config = new_app_config.set(section, config)
        # save global config as file
        common.save_file(CONFIG_FILE_LOCATION, new_app_config.to_json())

    if resource:
        target_config = app_config.get(section)
        old_config = target_config.get(resource)
        config = target_config.set(resource, config)
    else:
        old_config = app_config.get(section)
        config = app_config.set(section, config)

    # save config to database
    config_json = config.to_json()
    database.config_save(section=section, resource=resource, config=config_json)
    # notify listeners
    notify_listeners(old_config, config, section=section, resource=resource)
    return config


def notify_listeners(old_config, new_config, section, resource=None):
    for listener in CONFIG_LISTENERS:
        listener(old_config=old_config, new_config=new_config, section=section, resource=resource)


def get_value(key, config=None, default=None, section=SECTION_GLOBAL, resource=None,
        config_file_only=False):
    if not config:
        config = get_config(config_file_only=config_file_only)
    keys = (section, key)
    if resource:
        keys = (section, resource, key)
    return config.get(*keys, default=default)


def set_value(key, new_value, section=SECTION_GLOBAL, resource=None, config=None):
    given_config = config
    if not config:
        config = get_config()
    target_config = config.get(section)
    if resource:
        target_config = target_config.get(resource)
    target_config.set(key, new_value)
    if not given_config:
        # write changes to config file
        write(target_config, section=section, resource=resource)
    return target_config