kafka/tools/models/partition.py
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import division
from kafka.tools.exceptions import ReplicaNotFoundException
from kafka.tools.models import BaseModel
class Partition(BaseModel):
equality_attrs = ['topic', 'num']
def __init__(self, topic, num):
self.topic = topic
self.num = num
self.leader = None
self.replicas = []
self.size = 0
self.scaled_size = 0
# Shallow copy - do not copy replica list (zero length)
def copy(self):
newpartition = Partition(self.topic, self.num)
newpartition.size = self.size
return newpartition
# Set the current size of this partition iff it is larger than the currently known size
def set_size(self, size):
if size > self.size:
self.size = size
self.scaled_size = (self.topic.cluster.retention / self.topic.retention) * self.size
def dict_for_reassignment(self):
return {"topic": self.topic.name, "partition": self.num, "replicas": [broker.id for broker in self.replicas]}
def dict_for_replica_election(self):
return {"topic": self.topic.name, "partition": self.num}
# Given a broker, add it to this partition as a replica
# If position is not specified, default to the end of the replica list
def add_replica(self, broker, position=-1):
if position < 0:
position = len(self.replicas)
self._add_broker_partition(position, broker)
self.replicas.insert(position, broker)
# Remove the specified broker from the replica list of this partition
# If the replica does not exist on this partition, throw an exception
def remove_replica(self, broker):
try:
position = self.replicas.index(broker)
except ValueError:
raise ReplicaNotFoundException
broker.partitions[position].remove(self)
self.replicas.remove(broker)
def delete_replicas(self, target_count):
"""
Assure that the partition has only the specified number of replicas, deleting any extras
Args:
target_count (int): the maximum number of replicas to retain
"""
while len(self.replicas) > target_count:
self.remove_replica(self.replicas[-1])
def add_or_update_replica(self, position, new_broker):
"""
Given a position and a broker, make sure the broker is in the replica
set for this partition at the given position.
Args:
position (int): The position in the replica list for the new broker
new_broker (Broker): The broker that should now be at that position in the replica set
"""
if len(self.replicas) > position:
if self.replicas[position] == new_broker:
# No change in the replica at this position
return
else:
# New replica at this position. Swap it in
self.swap_replicas(self.replicas[position], new_broker)
else:
# No replica yet at this position. Add it
self.add_replica(new_broker, position=position)
# Remove one broker from the replica list and replace it at the same position with another
# This just calls add_replica and remove_replica, but we do it a lot
def swap_replicas(self, remove_broker, add_broker):
try:
position = self.replicas.index(remove_broker)
except ValueError:
raise ReplicaNotFoundException
self.remove_replica(remove_broker)
self.add_replica(add_broker, position)
# Given two brokers that appear in the replica list, swap their positions (making sure to adjust the broker objects as well)
# If either replica is not in the list, throw an exception
def swap_replica_positions(self, broker1, broker2):
try:
p1 = self.replicas.index(broker1)
p2 = self.replicas.index(broker2)
except ValueError:
raise ReplicaNotFoundException
# First, change the position of this partition on the first broker object to p2
self._remove_broker_partition(broker1)
self._add_broker_partition(p2, broker1)
# Then change the position of this partition on the second broker object to p1
self._remove_broker_partition(broker2)
self._add_broker_partition(p1, broker2)
# Last, swap the replica positons on this partition object
self.replicas[p1] = broker2
self.replicas[p2] = broker1
# Helper function to add a partition to a broker
# This should never be called - please use the add_replica method
def _add_broker_partition(self, pos, broker):
if pos not in broker.partitions:
broker.partitions[pos] = [self]
else:
broker.partitions[pos].append(self)
# Helper function to remove a partition from a broker
# This should never be called - please use the remove_replica method
def _remove_broker_partition(self, broker):
pos = self.replicas.index(broker)
broker.partitions[pos].remove(self)
def to_dict(self):
return {
'size': self.size,
'replicas': [r.id for r in self.replicas]
}