atlassian/themis

View on GitHub
themis/monitoring/kinesis_monitoring.py

Summary

Maintainability
D
2 days
Test Coverage
import json
import re
import themis.model.resources_model
import themis.model.kinesis_model
import themis.monitoring.resources
import themis.config
from themis.model.aws_model import *
from themis.config import *
from themis.util.common import *
from themis.util import timeseries, math_util, aws_common

# logger
LOG = get_logger(__name__)

# constants
MONITORING_METRICS_ALL = 'ALL'
STREAM_LEVEL_METRICS = [
    'IncomingBytes', 'IncomingRecords', 'GetRecords.IteratorAgeMilliseconds',
    'ReadProvisionedThroughputExceeded', 'WriteProvisionedThroughputExceeded',
    'GetRecords.IteratorAgeMilliseconds']
SHARD_LEVEL_METRICS = ['IncomingBytes', 'IncomingRecords', 'IteratorAgeMilliseconds',
    'OutgoingBytes', 'OutgoingRecords', 'ReadProvisionedThroughputExceeded',
    'WriteProvisionedThroughputExceeded']

# get CloudWatch data from the last 20 minutes by default
CW_METRICS_TIMEWINDOW = 60 * 20
CW_METRICS_PERIOD = 60

# Maps a metric regex to the corresponding CloudWatch statistic to use. The first match is used.
CW_STATISTICS_BY_METRIC = (
    (r'.*IteratorAge.*', 'Maximum'),
    (r'.*', 'Sum')
)

# track which streams have been changed (scaled) in the previous iteration
STREAMS_CHANGED = {}


def update_config(old_config, new_config, section, resource=None):
    if section != SECTION_KINESIS:
        return
    if resource:
        old_value = old_config.enable_enhanced_monitoring
        new_value = new_config.enable_enhanced_monitoring
        if new_value != old_value:
            if new_value:
                # enable monitoring
                enable_shard_monitoring(resource)
            else:
                # disable monitoring
                disable_shard_monitoring(resource)


themis.config.CONFIG_LISTENERS.add(update_config)


def reload_resource(resource):
    stream_id = resource if isinstance(resource, basestring) else resource.id
    stream = retrieve_stream_details(stream_id)
    themis.monitoring.resources.save_resource(SECTION_KINESIS, stream)
    return stream


def update_resources(resource_config):
    for res in resource_config:
        enabled = themis.config.get_value('enable_enhanced_monitoring',
            section=SECTION_KINESIS, resource=res.id)
        if enabled == 'true':
            res.enhanced_monitoring = [MONITORING_METRICS_ALL]
    return resource_config


def get_statistic_for_metric(metric):
    for stat in CW_STATISTICS_BY_METRIC:
        if re.match(stat[0], metric):
            return stat[1]
    return None


def get_cloudwatch_metrics(metric, namespace, dimensions, role=None,
        time_window=CW_METRICS_TIMEWINDOW, period=CW_METRICS_PERIOD, fill_zeros=True):
    start_time, end_time = get_start_and_end(diff_secs=time_window, format=None)
    metric_names = metric if isinstance(metric, basestring) else ','.join(metric)
    cloudwatch_client = aws_common.connect_cloudwatch(role=role)
    statistic = get_statistic_for_metric(metric)
    datapoints = cloudwatch_client.get_metric_statistics(Namespace=namespace, MetricName=metric_names,
        StartTime=start_time, EndTime=end_time, Period=period, Dimensions=dimensions,
        Statistics=[statistic])
    datapoints = datapoints['Datapoints']

    # sort datapoints by timestamp
    datapoints.sort(key=lambda item: item['Timestamp'])
    # fill empty slots with zeros
    if fill_zeros:
        # make sure we don't fill in zeros at the very end of the
        # series (data may be missing due to Cloudwatch delay)
        time_window_to_fill = time_window - 2 * period
        timeseries.fillup_with_zeros(datapoints, start_time=start_time,
            time_window=time_window_to_fill, period=period, statistic=statistic)

    return datapoints


def get_iam_role_for_stream(stream):
    if not isinstance(stream, basestring):
        stream = stream.id
    return themis.config.get_value('role_to_assume', section=SECTION_KINESIS, resource=stream)


def enable_shard_monitoring(stream, metrics=['ALL']):
    if not isinstance(stream, basestring):
        stream = stream.id
    role = get_iam_role_for_stream(stream)
    kinesis_client = aws_common.connect_kinesis(role=role)
    return kinesis_client.enable_enhanced_monitoring(StreamName=stream, ShardLevelMetrics=metrics)


def disable_shard_monitoring(stream, metrics=['ALL']):
    if not isinstance(stream, basestring):
        stream = stream.id
    role = get_iam_role_for_stream(stream)
    kinesis_client = aws_common.connect_kinesis(role=role)
    return kinesis_client.disable_enhanced_monitoring(StreamName=stream, ShardLevelMetrics=metrics)


def get_kinesis_cloudwatch_metrics(stream, metric, shard=None):
    dimensions = [{'Name': 'StreamName', 'Value': stream.id}]
    if shard:
        shard = shard if isinstance(shard, basestring) else shard['id'] if isinstance(shard, dict) else shard.id
        dimensions.append({'Name': 'ShardId', 'Value': shard})
    role = get_iam_role_for_stream(stream)
    return get_cloudwatch_metrics(metric=metric, namespace='AWS/Kinesis', dimensions=dimensions, role=role)


def replace_nan(value, default=0):
    if math.isnan(value):
        return default
    return value


def collect_info(stream, monitoring_interval_secs=600, config=None):
    result = {}
    # check if we need to re-load and save the stream details
    if STREAMS_CHANGED.get(stream.id):
        stream = save_modified_stream(stream)
    # start initializing the list of shards
    shards_list = result['shards_list'] = []
    for shard in stream.shards:
        shard = shard.to_dict()
        shards_list.append(shard)
    total = result['total'] = result['stream'] = {}
    shards = result['shards'] = {}
    shards['count'] = len(shards_list)
    metrics_map = {}

    # check if shard-level monitoring is enabled
    shard_monitoring_enabled = themis.config.get_value('enable_enhanced_monitoring',
        section=SECTION_KINESIS, resource=stream.id)

    for metric in STREAM_LEVEL_METRICS:
        metric_name = metric
        metric = metric.replace('.', '_')
        datapoints = get_kinesis_cloudwatch_metrics(stream=stream, metric=metric_name)
        series = timeseries.get_cloudwatch_timeseries(datapoints)
        shards[metric] = {}
        total[metric] = {}
        total[metric]['average'] = replace_nan(series.mean())
        total[metric]['max'] = replace_nan(series.max())
        total[metric]['min'] = replace_nan(series.min())
        total[metric]['sum'] = replace_nan(series.sum())
        total[metric]['last'] = series.values[-1] if len(series.values) > 0 else 0
    if shard_monitoring_enabled == 'true':
        for metric in SHARD_LEVEL_METRICS:
            # collect detailed shard-level monitoring metrics
            for shard in shards_list:
                datapoints = get_kinesis_cloudwatch_metrics(stream=stream, metric=metric, shard=shard)
                series = timeseries.get_cloudwatch_timeseries(datapoints)
                shard[metric] = {}
                shard[metric]['average'] = replace_nan(series.mean())
                shard[metric]['max'] = replace_nan(series.max())
                shard[metric]['min'] = replace_nan(series.min())
                shard[metric]['sum'] = replace_nan(series.sum())
                shard[metric]['last'] = series.values[-1] if len(series.values) > 0 else 0
                # append metrics to map
                if metric not in metrics_map:
                    # map for shard-level metrics
                    metrics_map[metric] = {}
                    metrics_map[metric]['average'] = []
                    metrics_map[metric]['max'] = []
                    metrics_map[metric]['min'] = []
                    metrics_map[metric]['sum'] = []
                metrics_map[metric]['average'].append(replace_nan(series.mean()))
                metrics_map[metric]['max'].append(replace_nan(series.max()))
                metrics_map[metric]['min'].append(replace_nan(series.min()))
                metrics_map[metric]['sum'].append(replace_nan(series.sum()))
    for m_name, m_lists in metrics_map.iteritems():
        shards[metric]['average'] = math_util.get_stats(m_lists['average'])['avg']
        shards[metric]['min'] = math_util.get_stats(m_lists['min'])['min']
        shards[metric]['max'] = math_util.get_stats(m_lists['max'])['max']
        shards[metric]['sum'] = math_util.get_stats(m_lists['sum'])['sum']
    remove_NaN(result, delete_values=False)
    return result


def retrieve_stream_details(stream_name, role=None):
    LOG.info('Getting details for Kinesis stream %s' % stream_name)
    if role is None:
        role = get_iam_role_for_stream(stream_name)
    kinesis_client = aws_common.connect_kinesis(role=role)
    out = kinesis_client.describe_stream(StreamName=stream_name)
    stream_shards = out['StreamDescription']['Shards']
    stream = themis.model.kinesis_model.KinesisStream(stream_name)
    num_shards = len(stream_shards)
    for shard in stream_shards:
        if 'EndingSequenceNumber' not in shard['SequenceNumberRange']:
            key_range = shard['HashKeyRange']
            shard = themis.model.kinesis_model.KinesisShard(id=shard['ShardId'])
            shard.start_key = key_range['StartingHashKey']
            shard.end_key = key_range['EndingHashKey']
            stream.shards.append(shard)
    return stream


def save_modified_stream(stream):
    stream = retrieve_stream_details(stream.id)
    themis.monitoring.resources.save_resource(SECTION_KINESIS, stream)
    return stream


def init_kinesis_config(run_parallel=False, role=None):
    cfg = themis.model.resources_model.ResourcesConfiguration()

    def init_kinesis_stream_config(stream_name):
        stream_config = retrieve_stream_details(stream_name, role=role)
        cfg.kinesis.append(stream_config)

    # load Kinesis streams
    kinesis_client = aws_common.connect_kinesis(role=role)
    try:
        out = kinesis_client.list_streams()
        if run_parallel:
            common.parallelize(out['StreamNames'], init_kinesis_stream_config)
        else:
            for c in out['StreamNames']:
                init_kinesis_stream_config(c)
    except Exception, e:
        LOG.info('Unable to list Kinesis streams using IAM role "%s"' % role)
    return cfg