mre/kafka-influxdb

View on GitHub
kafka_influxdb/__main__.py

Summary

Maintainability
A
0 mins
Test Coverage
import sys
import logging
from kafka_influxdb.worker import Worker
from kafka_influxdb.writer import influxdb_writer
from kafka_influxdb.encoder import load_encoder
from kafka_influxdb.reader import load_reader
from kafka_influxdb.config import loader

__title__ = 'kafka_influxdb'
__author__ = 'Matthias Endler'
__license__ = 'Apache License 2.0'
__copyright__ = 'Copyright 2015, Matthias Endler under Apache License, v2.0'


def main():
    """
    Setup consumer
    """
    config = loader.load_config()
    if config.version:
        show_version()
    start_consumer(config)


def show_version():
    """
    Output current version and exit
    """
    from .version import __version__
    print("{} {}".format(__package__, __version__))
    sys.exit(0)


def start_consumer(config):
    """
    Start metrics consumer
    :param config:
    """
    logging.debug("Initializing Kafka Consumer")
    reader = load_reader(
        config.kafka_reader,
        config.kafka_host,
        config.kafka_port,
        config.kafka_group,
        config.kafka_topic,
        config.kafka_offset
    )
    logging.debug("Initializing connection to InfluxDB at %s:%s",
                  config.influxdb_host, config.influxdb_port)
    writer = create_writer(config)
    logging.debug("Initializing message encoder: %s", config.encoder)
    encoder = load_encoder(config.encoder)
    client = Worker(reader, encoder, writer, config)
    client.consume()


def create_writer(config):
    """
    Create InfluxDB writer
    """
    return influxdb_writer.InfluxDBWriter(config.influxdb_host,
                                          config.influxdb_port,
                                          config.influxdb_user,
                                          config.influxdb_password,
                                          config.influxdb_dbname,
                                          config.influxdb_use_ssl,
                                          config.influxdb_verify_ssl,
                                          config.influxdb_timeout,
                                          config.influxdb_use_udp,
                                          config.influxdb_retention_policy,
                                          config.influxdb_time_precision)


if __name__ == '__main__':
    main()