mre/kafka-influxdb

View on GitHub
kafka_influxdb/encoder/heapster_json_encoder.py

Summary

Maintainability
A
2 hrs
Test Coverage
try:
    import ujson as json
except ImportError:
    import json

import logging

try:
    # Test for mypy support (requires Python 3)
    from typing import List, Text
except:
    pass

from datetime import datetime
from kafka_influxdb.encoder.escape_functions import influxdb_tag_escaper


class Encoder(object):
    """
    @see https://github.com/kubernetes/heapster/blob/master/metrics/sinks/kafka/driver.go

    Sample measurements:

    {
     "MetricsName":"memory/major_page_faults",
     "MetricsValue":{"value":56},
     "MetricsTimestamp":"2017-01-19T17:26:00Z",
     "MetricsTags":{"cluster":"c","container_name":"docker/9be430d3a1a28601292aebd76e15512d5471c630a7fa164d6a2a2fd9cbc19e3d","host_id":"10.58.9.95","hostname":"10.58.9.95","nodename":"10.58.9.95","type":"sys_container"}
    }    

    or

    {"MetricsName":"memory/usage",
    "MetricsValue":{"value":1036288},
    "MetricsTimestamp":"2017-01-19T17:26:00Z",
    "MetricsTags":{"cluster":"c","container_base_image":"gcr.io/google_containers/kube-dnsmasq-amd64:1.4","container_name":"dnsmasq","host_id":"10.58.9.96","hostname":"10.58.9.96",
            "labels":"k8s-app:kube-dns,pod-template-hash:4101612645","namespace_id":"a8bf368b-489b-11e6-91f3-005056923a7e","namespace_name":"kube-system","nodename":"10.58.9.96",
            "pod_id":"f7570b50-d637-11e6-bd3e-005056923a7e","pod_name":"kube-dns-4101612645-8pn6x","pod_namespace":"kube-system","type":"pod_container"}} 
    """

    def __init__(self):
        self.escape_tag = influxdb_tag_escaper()

    def encode(self, msg):
        # type: (bytes) -> List[Text]
        measurements = []

        try:
            entry = self.parse_line(msg.decode())
        except ValueError as e:
            logging.debug("Error in encoder: %s", e)
            return measurements

        try:
            measurement = entry["MetricsName"]
            tags = self.format_tags(entry)
            value = self.format_value(entry)
            time = self.format_time(entry)
            measurements.append(self.compose_data(
                measurement, tags, value, time))
        except Exception as e:
            logging.debug("Error in input data: %s. Skipping.", e)

        return measurements

    def parse_line(self, line):
        # return json.loads(line, {'precise_float': True})
        # for influxdb version > 0.9, timestamp is an integer
        return json.loads(line)

    # following methods are added to support customizing measurement name, tags much more flexible
    def compose_data(self, measurement, tags, value, time):
        data = "{0!s},{1!s} {2!s} {3!s}".format(measurement, tags, value, time)
        return data

    def format_tags(self, entry):
        tag = []
        tags = entry["MetricsTags"]
        if tags == '':
            return ''

        for kv in tags.items():
            # to avoid add None as tag value
            if kv[0] != '' and kv[1] != '':
                tag.append("{0!s}={1!s}".format(
                    self.escape_tag(kv[0]), self.escape_tag(kv[1])))

        return ','.join(tag)

    def format_time(self, entry):
        d = datetime.strptime(entry['MetricsTimestamp'], '%Y-%m-%dT%H:%M:%SZ')
        return int((d-datetime(1970, 1, 1)).total_seconds())

    def format_value(self, entry):
        return "value={0!s}".format(entry['MetricsValue']["value"])