From 0d98bbefc1105851b7b7203de4f6c68d9c097730 Mon Sep 17 00:00:00 2001 From: Kyle Ambroff-Kao Date: Mon, 9 Jul 2018 12:11:30 -0700 Subject: [PATCH] Add demote command (#91) * Add demote command. * More complete. * Fixed flake8 errors for kafka/tools/assigner/actions/demote.py The flake8 errors were part of the reason why Travis build was failing for PR #91 for linkedin/kafka-tools. The other factor was tox running into an upstream python bug [1] for which PR #94 has been raised. I confirmed that after these 2 factors were eliminated, the build passes [2]. [1] https://bugs.python.org/issue10496 [2] https://github.com/akashvacher/kafka-tools/commit/2228e65f43ce86b28bae8b32e35ad703d54a18a0 --- kafka/tools/assigner/actions/demote.py | 56 ++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 kafka/tools/assigner/actions/demote.py diff --git a/kafka/tools/assigner/actions/demote.py b/kafka/tools/assigner/actions/demote.py new file mode 100644 index 0000000..25f877c --- /dev/null +++ b/kafka/tools/assigner/actions/demote.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from kafka.tools.assigner.actions import ActionModule +from kafka.tools.exceptions import NotEnoughReplicasException + + +class ActionDemote(ActionModule): + name = "demote" + helpstr = "Force a broker to relinquish leadership wherever it is safe to do so. This is done by reordering " \ + "replica lists and then performing a PLE." + + def __init__(self, args, cluster): + super(ActionDemote, self).__init__(args, cluster) + + @classmethod + def _add_args(cls, parser): + parser.add_argument( + '-b', '--brokers', type=int, help='List of broker ids to demote', required=True, nargs='*') + parser.add_argument('-t', '--topics', required=False, nargs='*', + help='Optional list of topics to use. If not specified, leadership of all topics lead by the brokers to demote will be changed.') + + def process_cluster(self): + # Randomize a broker list to use for new replicas once. We'll round robin it from here + brokers_to_demote = set(self.args.brokers) + topics_to_consider = self.args.topics and set(self.args.topics) or None + + for partition in self.cluster.partitions(self.args.exclude_topics): + if topics_to_consider and partition.topic.name not in topics_to_consider: + continue + + if all(b.id in brokers_to_demote for b in partition.replicas): + msg = 'Topic {0} partition {1} only has replicas in the list of brokers to be demoted: {2}'.format( + partition.topic.name, + str(partition.num), + ', '.join(str(b.id) for b in partition.replicas)) + raise NotEnoughReplicasException(msg) + + while partition.replicas[0].id in brokers_to_demote: + broker = partition.replicas[0] + partition.remove_replica(broker) + partition.add_replica(broker)