linkedin/kafka-tools

View on GitHub
kafka/tools/client.py

Summary

Maintainability
F
3 days
Test Coverage
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from multiprocessing.pool import ThreadPool
from random import shuffle
from threading import RLock
import collections
import six
import time

from kafka.tools.configuration import ClientConfiguration
from kafka.tools.exceptions import ConnectionError, GroupError, TopicError, ConfigurationError
from kafka.tools.protocol.requests.describe_groups_v0 import DescribeGroupsV0Request
from kafka.tools.protocol.requests.group_coordinator_v0 import GroupCoordinatorV0Request
from kafka.tools.protocol.requests.list_groups_v0 import ListGroupsV0Request
from kafka.tools.protocol.requests.list_offset_v1 import ListOffsetV1Request
from kafka.tools.protocol.requests.offset_commit_v2 import OffsetCommitV2Request
from kafka.tools.protocol.requests.offset_fetch_v1 import OffsetFetchV1Request
from kafka.tools.protocol.requests.topic_metadata_v2 import TopicMetadataV2Request
from kafka.tools.models.broker import Broker
from kafka.tools.models.cluster import Cluster
from kafka.tools.models.group import Group
from kafka.tools.models.topic import Topic, TopicOffsets
from kafka.tools.utilities import synchronized, raise_if_error


class Client:
    # Special timestamps for offset requests
    OFFSET_EARLIEST = -2
    OFFSET_LATEST = -1

    def __init__(self, **kwargs):
        """
        Create a new Kafka client. There are two ways to instantiate the client:
            1) Provide a ClientConfiguration object with configuration=obj. This specifies all the configuration options
               for the client, or uses the defaults in ClientConfiguration. If the configuration keyword argument is
               present, all other arguments are ignored

            2) Provide keyword arguments that will be passed to create a new ClientConfiguration object. If the
               configuration keyword argument is not provided, this is the option that is chosen.

        The client will not be connected, either using the broker_list or the zkconnect option, until the connect()
        method is called

        Args:
            configuration (ClientConfiguration): a ClientConfiguration object to specify client settings
            kwargs: keyword arguments that are passed to create a new ClientConfiguration object

        Returns:
            Client: the Client object, ready for a connect call
        """
        if 'configuration' in kwargs:
            if not isinstance(kwargs['configuration'], ClientConfiguration):
                raise ConfigurationError("configuration object is not an instance of ClientConfiguration")
            self.configuration = kwargs['configuration']
        else:
            self.configuration = ClientConfiguration(**kwargs)

        self._controller_id = None
        self._last_full_metadata = 0.0
        self._last_group_list = 0.0
        self.cluster = Cluster()
        self._connected = False
        self._lock = RLock()

    @synchronized
    def connect(self):
        """
        Connect to the all cluster brokers and populate topic and partition information. If the client was created with
        a zkconnect string, first we connect to Zookeeper to bootstrap the broker and topic information from there,
        and then the client connects to all brokers in the cluster. Otherwise, connect to the bootstrap broker
        specified and fetch the broker and topic metadata for the cluster.

        Todo:
            * Currently assumes everything works. Need to check for failure to connect to
              ZK or bootstrap.
        """
        if self.configuration.zkconnect is not None:
            self.cluster = Cluster.create_from_zookeeper(zkconnect=self.configuration.zkconnect, fetch_topics=False)
            self._connected = True
        else:
            # Connect to bootstrap brokers until we succeed or exhaust the list
            try_brokers = list(self.configuration.broker_list)
            self._connected = False
            while (len(try_brokers) > 0) and not self._connected:
                self._connected = self._maybe_bootstrap_cluster(try_brokers.pop())

            if not self._connected:
                raise ConnectionError("Unable to bootstrap cluster information")

        # Connect to all brokers
        self._connect_all_brokers()

    @synchronized
    def close(self):
        """
        Close connections to all brokers. The configuration information is retained, so calling connect() again will
        reconnect to all brokers.
        """
        for broker_id in self.cluster.brokers:
            self.cluster.brokers[broker_id].close()
        self._connected = False

    @synchronized
    def list_topics(self, cache=True):
        """
        Get a list of all topics in the cluster

        Args:
            cache (boolean): If False, ignore cached metadata and always fetch from the cluster

        Raises:
            ConnectionError: If there is a failure to send the request to all brokers in the cluster
        """
        self._raise_if_not_connected()
        self._maybe_update_full_metadata(cache)
        return list(self.cluster.topics.keys())

    @synchronized
    def get_topic(self, topic_name, cache=True):
        """
        Get information on a topic in the cluster. If cache is True, used cached topic metadata
        if it has not expired. Otherwise, perform a metadata request to refresh information on
        the topic first.

        Args:
            topic_name (string): The name of the topic to return
            cache (boolean): If False, ignore cached metadata and always fetch from the cluster

        Returns:
            Topic: The Topic object holding the detail for the topic

        Raises:
            ConnectionError: If there is a failure to send the request to all brokers in the cluster
            TopicError: If the topic does not exist, or there is a problem getting metadata for it
        """
        self._raise_if_not_connected()
        try:
            topic = self.cluster.topics[topic_name]
            force_update = (not cache) or (not topic.updated_since(time.time() - self.configuration.metadata_refresh))
        except KeyError:
            force_update = True

        if force_update:
            # It doesn't matter what broker we fetch topic metadata from
            metadata = self._send_any_broker(TopicMetadataV2Request({'topics': [topic_name]}))
            raise_if_error(TopicError, metadata['topics'][0]['error'])

            self._update_from_metadata(metadata)

        return self.cluster.topics[topic_name]

    @synchronized
    def list_groups(self, cache=True):
        """
        Get a list of all topics in the cluster. This is done by sending a ListGroups request to
        every broker in the cluster and collating the responses.

        Args:
            cache (boolean): If False, ignore cached groups and always fetch from the cluster

        Returns:
            list (string): a list of valid group names in the cluster
            int: the number of brokers that failed to response
        """
        self._raise_if_not_connected()
        error_counter = self._maybe_update_groups_list(cache)
        return list(self.cluster.groups.keys()), error_counter

    @synchronized
    def get_group(self, group_name, cache=True):
        """
        Get information on a group in the cluster. If cache is True, used cached group information
        if it has not expired. Otherwise, send a request to refresh information on the group first.

        Args:
            group_name (string): The name of the group to return
            cache (boolean): If False, ignore cached information and always fetch from the cluster

        Returns:
            Group: The Group object holding the detail for the group

        Raises:
            ConnectionError: If there is a failure to send the request to all brokers in the cluster
            GroupError: If the group does not exist or there is a problem fetching information for it
        """
        self._raise_if_not_connected()
        try:
            group = self.cluster.groups[group_name]
            force_update = (not cache) or (not group.updated_since(time.time() - self.configuration.metadata_refresh))
        except KeyError:
            force_update = True

        if force_update:
            # Group detail must come from the coordinator broker
            group_info = self._send_group_aware_request(group_name, DescribeGroupsV0Request({'group_ids': [group_name]}))
            raise_if_error(GroupError, group_info['groups'][0]['error'])

            self._update_groups_from_describe(group_info)

        return self.cluster.groups[group_name]

    @synchronized
    def get_offsets_for_topic(self, topic_name, timestamp=OFFSET_LATEST):
        """
        Get the offsets for all the partitions in the specified topic. The offsets are requested at the specified
        timestamp, which defaults to the latest offsets available (defined as the offset of the next message to be
        produced).

        Args:
            topic_name (string or list): The name of the topic to fetch offsets for. For multiple topics, a list of
                topics can be provided
            timestamp (int): The timestamp (in millis) to fetch offsets at. If not specified, the default is the special
                timestamp OFFSET_LATEST, which requests the current end of the partitions (tail). You can also specify
                the special offset OFFSET_EARLIEST, which requests the oldest offset for each partition (head)

        Return:
            TopicOffsets: A TopicOffsets instance that contain offsets for all the partitions in the topic.

        Raises:
            ConnectionError: If there is a failure to send the request to a broker
            TopicError: If the topic does not exist or there is a problem getting information for it
            OffsetError: If there is a failure retrieving offsets for the specified topic or timestamp
            TypeError: If the timestamp is not an integer
        """
        return self.get_offsets_for_topics([topic_name], timestamp)[topic_name]

    @synchronized
    def get_offsets_for_topics(self, topic_list, timestamp=OFFSET_LATEST):
        """
        Get the offsets for all the partitions in the specified topics. The offsets are requested at the specified
        timestamp, which defaults to the latest offsets available (defined as the offset of the next message to be
        produced).

        Args:
            topic_list (list): a list of topic name strings to fetch offsets for.
            timestamp (int): The timestamp (in millis) to fetch offsets at. If not specified, the default is the special
                timestamp OFFSET_LATEST, which requests the current end of the partitions (tail). You can also specify
                the special offset OFFSET_EARLIEST, which requests the oldest offset for each partition (head)

        Return:
            dict (string -> TopicOffsets): A dictionary mapping topic names to TopicOffsets instances that contain
                offsets for all the partitions in the topic.

        Raises:
            ConnectionError: If there is a failure to send the request to a broker
            TopicError: If a topic does not exist or there is a problem getting information for it
            OffsetError: If there is a failure retrieving offsets for the specified topic or timestamp
            TypeError: If the timestamp is not an integer
        """
        self._raise_if_not_connected()
        if not isinstance(timestamp, six.integer_types):
            raise TypeError("timestamp must be a valid integer")

        # Get the topic information, making sure all the leadership info is current
        self._maybe_update_metadata_for_topics(topic_list)

        # Get a broker to topic-partition mapping
        broker_to_tp = self._map_topic_partitions_to_brokers(topic_list)

        request_values = {}
        for broker_id in broker_to_tp:
            request_values[broker_id] = {'replica_id': -1, 'topics': []}
            for topic_name in broker_to_tp[broker_id]:
                request_values[broker_id]['topics'].append({'topic': topic_name,
                                                            'partitions': [{'partition': i,
                                                                            'timestamp': timestamp} for i in broker_to_tp[broker_id][topic_name]]})

        return self._send_list_offsets_to_brokers(request_values)

    @synchronized
    def get_offsets_for_group(self, group_name, topic_list=None):
        """
        Get the latest offsets committed by the specified group. If a topic_name is specified, only the offsets for that
        topic are returned. Otherwise, offsets for all topics that the group is subscribed to are returned.

        Args:
            group_name (string): The name of the group to fetch offsets for
            topic_list (list): A list of string topic names to fetch offsets for. Defaults to None, which specifies all
                topics that are subscribed to by the group.

        Return:
            dict (string -> TopicOffsets): A dictionary mapping topic names to TopicOffsets instances that contain
                offsets for all the partitions in the topic. The dictionary will contain a single key if the topic_name
                provided is not None.

        Raises:
            ConnectionError: If there is a failure to send requests to brokers
            TopicError: If the topic does not exist or there is a problem getting information for it
            GroupError: If there is a failure to get information for the specified group
            OffsetError: If there is a failure retrieving offsets for the topic(s)
        """
        self._raise_if_not_connected()

        # Get the group we're fetching offsets for (potentially updating the group information)
        group = self.get_group(group_name)
        fetch_topics = self._get_topics_for_group(group, topic_list)

        # Get the topic information, making sure all the leadership info is current
        self._maybe_update_metadata_for_topics(fetch_topics)

        request_values = {'group_id': group_name,
                          'topics': [{'topic': topic, 'partitions': list(range(len(self.cluster.topics[topic].partitions)))} for topic in fetch_topics]}
        response = self._send_group_aware_request(group_name, OffsetFetchV1Request(request_values))

        rv = {}
        for topic in response['responses']:
            topic_name = topic['topic']
            rv[topic_name] = TopicOffsets(self.cluster.topics[topic_name])
            rv[topic_name].set_offsets_from_fetch(topic['partition_responses'])

        return rv

    @synchronized
    def set_offsets_for_group(self, group_name, topic_offsets):
        """
        Given a group name and a list of topics and offsets, write the offsets as the latest for the group. This can only
        be done if the consumer is an old consumer (committing to Kafka), or if it is a new consumer in the "Empty"
        state (i.e. not running)

        Args:
            group_name (string): the name of the consumer group to set offsets for
            topic_offsets (list): a list of TopicOffsets objects which describe the topics and offsets to set

        Returns:
            dict (string -> list): a map of topic names to a list of error code responses for each partition. Error code
                0 indicates no error.

        Raises:
            ConnectionError: If there is a failure to send requests to brokers
            TypeError: if the topic_offsets argument is not properly formatted
            GroupError: If the group is not in the "Empty" state (if it is a new consumer), or if the group coordinator
                is unavailable
        """
        self._raise_if_not_connected()
        if isinstance(topic_offsets, six.string_types) or (not isinstance(topic_offsets, collections.Sequence)):
            raise TypeError("topic_offsets argument is not a list")

        # Get the group we're setting offsets for (potentially updating the group information)
        group = self.get_group(group_name)
        if group.state not in (None, 'Empty', 'Dead'):
            raise GroupError("The consumer group must be in the 'Empty' or 'Dead' state to set offsets, not '{0}'".format(group.state))

        # Get the topic information, making sure all the leadership info is current
        fetch_topics = [offsets.topic.name for offsets in topic_offsets]
        self._maybe_update_metadata_for_topics(fetch_topics)

        response = self._send_set_offset_request(group_name, topic_offsets)
        return self._parse_set_offset_response(response)

    # The following interfaces are for future implementation. The names are set, but the interfaces
    # are not yet
    #
    # def create_topic(self):
    #     raise NotImplementedError
    #
    # def delete_topic(self):
    #     raise NotImplementedError

    ##########################################################################
    # PRIVATE HELPER METHODS
    #
    # Everything below this point is methods that are not exposed, and are helpers for the
    # actual exposed interfaces.
    ##########################################################################

    def _raise_if_not_connected(self):
        if not self._connected:
            raise ConnectionError("The client is not yet connected")

    def _maybe_bootstrap_cluster(self, broker_port):
        """Attempt to bootstrap the cluster information using the given broker"""
        broker = Broker(broker_port[0], port=broker_port[1], configuration=self.configuration)

        try:
            broker.connect()
        except ConnectionError:
            # Just skip to the next bootstrap broker
            return False

        # Fetch topic metadata for all topics/brokers
        req = TopicMetadataV2Request({'topics': None})
        correlation_id, metadata = broker.send(req)
        broker.close()

        # Add brokers and topics to cluster
        self._controller_id = metadata['controller_id']
        self._update_from_metadata(metadata)
        self._last_full_metadata = time.time()
        return True

    def _connect_all_brokers(self):
        pool = ThreadPool(processes=self.configuration.broker_threads)
        for broker_id in self.cluster.brokers:
            pool.apply_async(self.cluster.brokers[broker_id].connect)
        pool.close()
        pool.join()

    def _send_to_broker(self, broker_id, request):
        """
        Given a broker ID and a request, send the request to that broker and return the response

        Args:
            broker_id (int): The ID of a broker in the cluster
            request (BaseRequest): A valid request object that inherits from BaseRequest

        Returns:
            BaseResponse: An instance of the response class associated with the request

        Raises:
            ConnectionError: If there is a failure to send the request to all brokers in the cluster
        """
        correlation_id, response = self.cluster.brokers[broker_id].send(request)
        return response

    def _send_any_broker(self, request):
        """
        Sends a request to any broker. This can be used for requests that do not require a
        specific broker to answer, such as metadata requests

        Args:
            request (BaseRequest): A valid request object that inherits from BaseRequest

        Returns:
            BaseResponse: An instance of the response class associated with the request

        Raises:
            ConnectionError: If there is a failure to send the request to all brokers in the cluster
        """
        broker_ids = list(self.cluster.brokers.keys())
        shuffle(broker_ids)
        while len(broker_ids) > 0:
            broker_id = broker_ids.pop()
            try:
                if self.cluster.brokers[broker_id].hostname is not None:
                    return self._send_to_broker(broker_id, request)
            except ConnectionError:
                # We're going to ignore failures unless we exhaust brokers
                pass

        # If we exhausted all the brokers, we have a serious problem
        raise ConnectionError("Failed to send request to any broker")

    def _send_some_brokers(self, requests, ignore_errors=True):
        """
        Sends a request to one or more brokers. The responses are returned mapped to the broker that
        they were retrieved from. This method uses a thread pool to parallelize sends.

        Args:
            requests (int -> BaseRequest): A dictionary, where keys are integer broker IDs and the values are valid
                request objects that inherit from BaseRequest.

        Returns:
            dict (int -> BaseResponse): A map of broker IDs to response instances (inherited from
                BaseResponse). Failed requests are represented with a value of None
        """
        results = {}
        pool = ThreadPool(processes=self.configuration.broker_threads)
        for broker_id in requests:
            results[broker_id] = pool.apply_async(self._send_to_broker, (broker_id, requests[broker_id]))
        pool.close()
        pool.join()

        responses = {}
        for broker_id in results:
            try:
                responses[broker_id] = results[broker_id].get()
            except ConnectionError:
                if ignore_errors:
                    # Individual broker failures are OK, as we'll represent them with a None value
                    responses[broker_id] = None
                else:
                    raise
        return responses

    def _send_all_brokers(self, request):
        """
        Sends a request to all brokers. The responses are returned mapped to the broker that
        they were retrieved from

        Args:
            request (BaseRequest): A valid request object that inherits from BaseRequest

        Returns:
            dict (int -> BaseResponse): A map of broker IDs to response instances (inherited from
                BaseResponse). Failed requests are represented with a value of None
        """
        requests = {}
        for broker_id in self.cluster.brokers:
            if self.cluster.brokers[broker_id].hostname is not None:
                requests[broker_id] = request
        return self._send_some_brokers(requests)

    def _make_broker(self, broker_dict):
        return Broker(broker_dict['host'], id=broker_dict['node_id'], port=broker_dict['port'], configuration=self.configuration)

    def _send_group_aware_request(self, group_name, request):
        """
        Sends a request to the broker currently serving as the group coordinator for the specified
        group name. The request is not introspected for whether or not this is the correct broker,
        and the response is not checked as to whether or not there was an error.

        As a side effect of this call, the group object is created if it exists, and the coordinator
        attribute is set to the broker that is currently the coordinator for that group.

        Args:
            group_name (string): The name of the group to find the coordinator for
            request (BaseRequest): A request instance, inherited from BaseRequest, to send to the
                coordinator broker

        Returns:
            BaseResponse: The response instance, appropriate for the request, that is returned from
                the broker

        Raises:
            ConnectionError: If there is a failure to send the request to the coordinator broker,
                or a failure to retrieve the coordinator information
            GroupError: If an error is returned when fetching coordinator information
        """
        response = self._send_any_broker(GroupCoordinatorV0Request({'group_id': group_name}))
        raise_if_error(GroupError, response['error'])

        if group_name not in self.cluster.groups:
            self.cluster.add_group(Group(group_name))
        try:
            self.cluster.groups[group_name].coordinator = self.cluster.brokers[response['node_id']]
        except KeyError:
            broker = self._make_broker(response)
            self.cluster.add_broker(broker)
            self.cluster.groups[group_name].coordinator = broker

        response = self._send_to_broker(self.cluster.groups[group_name].coordinator.id, request)
        return response

    def _send_list_offsets_to_brokers(self, request_values):
        """
        Given a mapping of broker IDs to values for ListOffset requests, send the requests to all the brokers and
        collate the responses into a mapping of topic names to TopicOffsets instances

        Args:
            request_values (dict): a mapping of broker ID (int) to value dictionaries for ListOffsets requests

        Returns:
            dict (string -> TopicOffsets): A dictionary mapping topic names to TopicOffsets instances that contain
                offsets for all the partitions requested.

        Raises:
            ConnectionError: If there is a failure to send the request to a broker
            OffsetError: If there is a failure retrieving offsets
        """
        requests = {}
        for broker_id in request_values:
            requests[broker_id] = ListOffsetV1Request(request_values[broker_id])
        responses = self._send_some_brokers(requests, ignore_errors=False)

        rv = {}
        for broker_id in responses:
            response = responses[broker_id]
            for topic in response['responses']:
                topic_name = topic['topic']
                if topic_name not in rv:
                    rv[topic_name] = TopicOffsets(self.cluster.topics[topic_name])
                rv[topic_name].set_offsets_from_list(topic['partition_responses'])

        return rv

    def _send_set_offset_request(self, group_name, topic_offsets):
        request = {'group_id': group_name,
                   'group_generation_id': -1,
                   'member_id': '',
                   'retention_time': -1,
                   'topics': []}
        for offset in topic_offsets:
            if not (isinstance(offset, TopicOffsets) and isinstance(offset.topic, Topic)):
                raise TypeError("TopicOffsets objects are not properly formed")

            request['topics'].append({'topic': offset.topic.name,
                                      'partitions': [{'partition': p_num,
                                                      'offset': p_offset,
                                                      'metadata': None} for p_num, p_offset in enumerate(offset.partitions)]})

        return self._send_group_aware_request(group_name, OffsetCommitV2Request(request))

    def _parse_set_offset_response(self, response):
        rv = {}
        for topic in response['responses']:
            topic_name = topic['topic']
            rv[topic_name] = [-1] * len(topic['partition_responses'])
            for partition in topic['partition_responses']:
                rv[topic_name][partition['partition']] = partition['error']

        return rv

    def _update_brokers_from_metadata(self, metadata):
        """
        Given a Metadata response (either V0 or V1), update the broker information for this
        cluster. We don't delete brokers because we don't know if the brokers is gone temporarily
        (crashed or maintenance) or permanently.

        Args:
            metadata (MetadataV1Response): A metadata response to create or update brokers for
        """
        for b in metadata['brokers']:
            try:
                broker = self.cluster.brokers[b['node_id']]
                if (broker.hostname != b['host']) or (broker.port != b['port']):
                    # if the hostname or port changes, close the existing connection
                    broker.close()
                    broker.hostname = b['host']
                    broker.port = b['port']
            except KeyError:
                broker = self._make_broker(b)
                self.cluster.add_broker(broker)
            broker.rack = b['rack']

    def _maybe_delete_topics_not_in_metadata(self, metadata, delete):
        """
        If delete is True, check each topic in the cluster and delete it if it is not also in the metadata
        response provided. This should only be used when the metadata response has a full list of all topics
        in the cluster.

        Args:
            metadata (MetadataV1Response): a metadata response that contains the full list of topics in the
                cluster
            delete (boolean): If False, no action is taken
        """
        if not delete:
            return

        topic_list = metadata.topic_names()
        topics_for_deletion = []
        for topic_name in self.cluster.topics:
            if topic_name in topic_list:
                continue

            self.cluster.topics[topic_name].assure_has_partitions(0)
            topics_for_deletion.append(topic_name)

        for topic_name in topics_for_deletion:
            del self.cluster.topics[topic_name]

    def _update_or_add_partition(self, partition_metadata, partition):
        partition.leader = self.cluster.brokers[partition_metadata['leader']]
        for i, replica in enumerate(partition_metadata['replicas']):
            if replica not in self.cluster.brokers:
                # We have a replica ID that is not a known broker. This can happen if a broker is offline, or
                # if the partition is otherwise assigned to a non-existent broker ID. In this case, we need to
                # create a broker object for this with no endpoint information as a placeholder.
                self.cluster.add_broker(Broker(hostname=None, id=replica))
            partition.add_or_update_replica(i, self.cluster.brokers[replica])
        partition.delete_replicas(len(partition_metadata['replicas']))

    def _update_topics_from_metadata(self, metadata, delete=False):
        """
        Given a Metadata response (either V0 or V1 will work), update the topic information
        for this cluster.

        Args:
            metadata (MetadataV1Response): A metadata response to create or update topics for
            delete (boolean): If True, delete topics from the cluster that are not present in the
                metadata response

        Raises:
            IndexError: If the brokers in the metadata object are not defined in the cluster
        """
        for t in metadata['topics']:
            if t['name'] not in self.cluster.topics:
                self.cluster.add_topic(Topic(t['name'], len(t['partitions'])))
            topic = self.cluster.topics[t['name']]
            topic._last_updated = time.time()

            topic.assure_has_partitions(len(t['partitions']))
            for p in t['partitions']:
                self._update_or_add_partition(p, topic.partitions[p['id']])

        self._maybe_delete_topics_not_in_metadata(metadata, delete)

    def _update_from_metadata(self, metadata, delete=False):
        """
        Given a metadata response, update both the brokers and topics from it. If specified, delete the topics
        that are not present in the provided metadata

        Args:
            metadata (MetadataV1Response): A metadata response to create or update brokers and topics for
            delete (boolean): If True, delete topics from the cluster that are not present in the metadata response
        """
        self._update_brokers_from_metadata(metadata)
        self._update_topics_from_metadata(metadata, delete=delete)

    def _maybe_update_metadata_for_topics(self, topics, cache=True):
        """
        Fetch metadata for the topics specified from the cluster if cache is False or if the cached metadata has expired.
        The cluster brokers and topics are updated with the metadata information.

        Args:
            cache (boolean): Ignore the metadata expiration and fetch from the cluster anyway

        Raises:
            ConnectionError: If there is a failure to send the request to all brokers in the cluster
        """
        force_update = not cache
        for topic in topics:
            try:
                if not self.cluster.topics[topic].updated_since(time.time() - self.configuration.metadata_refresh):
                    force_update = True
            except KeyError:
                force_update = True

        if force_update:
            self._update_from_metadata(self._send_any_broker(TopicMetadataV2Request({'topics': topics})), delete=False)

    def _maybe_update_full_metadata(self, cache=True):
        """
        Fetch all topic metadata from the cluster if cache is False or if the cached metadata has expired. The cluster
        brokers and topics are updated with the metadata information, potentially deleting ones that no longer exist

        Args:
            cache (boolean): Ignore the metadata expiration and fetch from the cluster anyway

        Raises:
            ConnectionError: If there is a failure to send the request to all brokers in the cluster
        """
        if (not cache) or (self._last_full_metadata < (time.time() - self.configuration.metadata_refresh)):
            self._update_from_metadata(self._send_any_broker(TopicMetadataV2Request({'topics': None})), delete=True)
            self._last_full_metadata = time.time()

    def _add_or_update_group(self, group_info, coordinator):
        """
        Given group information from a ListGroups response, assure that the group exists in the cluster as specified

        Args:
            group_info (dict): A group from a ListGroups response, which contains group_id and protocol_type keys
            coordinator (int): The ID of the group coordinator broker
        """
        group_name = group_info['group_id']
        try:
            group = self.cluster.groups[group_name]
        except KeyError:
            group = Group(group_name)
            self.cluster.add_group(group)
        group.coordinator = self.cluster.brokers[coordinator]
        group.protocol_type = group_info['protocol_type']

    def _update_groups_from_lists(self, responses):
        """
        Given a list of ListGroups responses, make sure that all the groups are in the cluster correctly

        Args:
            responses (dict): a mapping of broker IDs to ListGroupsV0Response instances from each broker

        Returns:
            int: a count of the number of responses that were not present or in error
        """
        error_counter = 0
        for broker_id, response in responses.items():
            if (response is None) or (response['error'] != 0):
                error_counter += 1
                continue

            for group_info in response['groups']:
                self._add_or_update_group(group_info, broker_id)

        return error_counter

    def _maybe_update_groups_list(self, cache=True):
        """
        Fetch lists of groups from all brokers if cache is False or if the cached group list has expired.

        Args:
            cache (boolean): Ignore the metadata expiration and fetch from the cluster anyway

        Returns:
            int: a count of the number of brokers that returned no response or error responses
        """
        if (not cache) or (self._last_group_list < (time.time() - self.configuration.metadata_refresh)):
            error_counter = self._update_groups_from_lists(self._send_all_brokers(ListGroupsV0Request({})))
            self._last_group_list = time.time()
            return error_counter
        return 0

    def _update_groups_from_describe(self, response):
        """
        Given a DescribeGroupsV0 response, update the group information for all groups in the response
        in this cluster. This does not delete any groups not in the response, as we do not fetch all
        describe groups data like that at this time

        Args:
            response (DescribeGroupsV0Response): A response to create or update groups for
        """
        for g in response['groups']:
            if g['group_id'] not in self.cluster.groups:
                self.cluster.add_group(Group(g['group_id']))
            group = self.cluster.groups[g['group_id']]
            group.state = g['state']
            group.protocol_type = g['protocol_type']
            group.protocol = g['protocol']
            group.clear_members()
            for m in g['members']:
                group.add_member(m['member_id'],
                                 client_id=m['client_id'],
                                 client_host=m['client_host'],
                                 metadata=m['member_metadata'],
                                 assignment=m['member_assignment'])
            group._last_updated = time.time()

    def _map_topic_partitions_to_brokers(self, topic_list):
        """
        Given a list of topics, map the topic-partitions to the leader brokers

        Args:
            topic_list (list): a list of the topic name strings to map

        Returns:
            dict: a multi-dimensional dictionary where the keys are broker IDs and the values are a dictionary where the
                keys are topic names and the values are an array of partition IDs

        Raises:
            TopicError: if any of the topics do not exist in the cluster
        """
        broker_to_tp = {}
        for topic_name in topic_list:
            try:
                topic = self.cluster.topics[topic_name]
            except KeyError:
                raise TopicError("Topic {0} does not exist in the cluster".format(topic_name))

            for i, partition in enumerate(topic.partitions):
                broker_to_tp.setdefault(partition.leader.id, {}).setdefault(topic_name, []).append(i)

        return broker_to_tp

    def _get_topics_for_group(self, group, topic_list):
        """
        Given a group and a topic_list, return a list of topics that is either the topic list provided or the list of
        all topics consumed by the group

        Args:
            group (Group): the group for which the topic list is being created
            topic_list (list): A list of string topic names to fetch offsets for. Defaults to None, which specifies all
                topics that are subscribed to by the group.

        Returns:
             list: a list of topics to be fetched for the group

        Raises:
            GroupError: if the topic list is empty
        """
        # We'll be nice. If the topic_list is a string, convert it to a list
        if isinstance(topic_list, six.string_types):
            topic_list = [topic_list]

        # Create a list of topics to fetch
        fetch_topics = topic_list or group.subscribed_topics()
        if len(fetch_topics) == 0:
            raise GroupError("No topic specified is consumed by the group")
        return fetch_topics