linkedin/kafka-tools

View on GitHub
kafka/tools/assigner/sizers/jmx.py

Summary

Maintainability
A
0 mins
Test Coverage
from kafka.tools import log
from kafka.tools.exceptions import UnknownBrokerException, ConfigurationException
from kafka.tools.assigner.sizers import SizerModule

import jpype


class SizerJMX(SizerModule):
    name = 'jmx'
    helpstr = 'Get partition sizes by connection to each broker via JMX'

    def _validate_both_properties(self, prop1, prop2):
        if prop1 in self.properties and prop2 in self.properties:
            return True
        elif prop1 in self.properties or prop2 in self.properties:
            raise ConfigurationException("JMX sizer requires both {0} and {1} properties, or neither of them".format(prop1, prop2))
        else:
            return False

    def _set_java_provider(self, java_provider):
        if java_provider is None:
            self._java_provider = jpype
            if 'libjvm' in self.properties:
                self._java_provider.startJVM(self.properties['libjvm'])
            else:
                self._java_provider.startJVM("/export/apps/jdk/JDK-1_8_0_72/jre/lib/amd64/server/libjvm.so")
        else:
            self._java_provider = java_provider

    def __init__(self, args, cluster, java_provider=None):
        super(SizerJMX, self).__init__(args, cluster)
        self._set_java_provider(java_provider)

        # If username or password is provided, you must have both
        self._envhash = self._java_provider.java.util.HashMap()
        if self._validate_both_properties('jmxuser', 'jmxpass'):
            jarray = self._java_provider.JArray(self._java_provider.java.lang.String)([self.properties['jmxuser'], self.properties['jmxpass']])
            self._envhash.put(self._java_provider.javax.management.remote.JMXConnector.CREDENTIALS, jarray)

        # If truststore or truststorepass is provided, you must have both
        if self._validate_both_properties('truststore', 'truststorepass'):
            self._java_provider.java.lang.System.setProperty("javax.net.ssl.trustStore", self.properties['truststore'])
            self._java_provider.java.lang.System.setProperty("javax.net.ssl.trustStorePassword", self.properties['truststorepass'])

    def _fetch_bean(self, connection, bean):
        topic = bean.getKeyProperty("topic")
        partition = int(bean.getKeyProperty("partition"))
        size_bytes = connection.getAttribute(bean, "Value").value
        self.cluster.topics[topic].partitions[partition].set_size(size_bytes)

    def get_partition_sizes(self):
        # Get broker partition sizes
        for broker_id, broker in self.cluster.brokers.items():
            _validate_broker(broker)

            log.info("Getting partition sizes via JMX for {0}".format(broker.hostname))
            jmxurl = self._java_provider.javax.management.remote.JMXServiceURL(
                "service:jmx:rmi:///jndi/rmi://{0}:{1}/jmxrmi".format(broker.hostname, broker.jmx_port))
            jmxsoc = self._java_provider.javax.management.remote.JMXConnectorFactory.connect(jmxurl, self._envhash)

            connection = jmxsoc.getMBeanServerConnection()
            beans = connection.queryNames(self._java_provider.javax.management.ObjectName("kafka.log:name=Size,*"), None)
            for bean in beans:
                self._fetch_bean(connection, bean)

            jmxsoc.close()


def _validate_broker(broker):
    if broker.hostname is None:
        raise UnknownBrokerException("Cannot get sizes for broker ID {0} which has no hostname. "
                                     "Remove the broker from the cluster before balance".format(broker.id))
    if broker.jmx_port <= 0:
        raise UnknownBrokerException("Broker ID {0} does not have a JMX port configured".format(broker.id))