From 11caf6c8254fc724a86e44d8671da710865e2276 Mon Sep 17 00:00:00 2001 From: Travis Foster Date: Mon, 22 Jan 2018 16:02:36 -0800 Subject: [PATCH] Add kafka_check command 'all_isr' Add all_isr command to test if all replicas are in sync, not just the minimum required. --- kafka_utils/kafka_check/commands/all_isr.py | 95 +++++++++++++++ tests/kafka_check/test_all_isr.py | 128 ++++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 kafka_utils/kafka_check/commands/all_isr.py create mode 100644 tests/kafka_check/test_all_isr.py diff --git a/kafka_utils/kafka_check/commands/all_isr.py b/kafka_utils/kafka_check/commands/all_isr.py new file mode 100644 index 00000000..301ade4d --- /dev/null +++ b/kafka_utils/kafka_check/commands/all_isr.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 Yelp Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import absolute_import + +from kafka_utils.kafka_check import status_code +from kafka_utils.kafka_check.commands.command import KafkaCheckCmd +from kafka_utils.util.metadata import get_topic_partition_metadata + + +class AllIsrCmd(KafkaCheckCmd): + + def build_subparser(self, subparsers): + subparser = subparsers.add_parser( + 'all_isr', + description='Check that all configured replicas are in sync for each topic in the cluster.', + help='This command will check if the number of in sync replicas equals the number ' + 'configured replicas for each topic-partition in the cluster.', + ) + return subparser + + def run_command(self): + """All_isr command, checks if number of isr is equal to + the configured replicas for each topic-partition.""" + topics = get_topic_partition_metadata(self.cluster_config.broker_list) + not_in_sync = _process_metadata_response( + topics, + ) + + errcode = status_code.OK if not not_in_sync else status_code.CRITICAL + out = _prepare_output(not_in_sync, self.args.verbose) + return errcode, out + + +def _process_metadata_response(topics): + """Returns partitions that are not in sync.""" + not_in_sync_partitions = [] + for topic_name, partitions in topics.items(): + for metadata in partitions.values(): + cur_isr = len(metadata.isr) + replicas = len(metadata.replicas) + if cur_isr < replicas: + not_in_sync_partitions.append({ + 'isr': cur_isr, + 'replicas': replicas, + 'topic': metadata.topic, + 'partition': metadata.partition, + }) + + return not_in_sync_partitions + + +def _prepare_output(partitions, verbose): + """Returns dict with 'raw' and 'message' keys filled.""" + out = {} + partitions_count = len(partitions) + out['raw'] = { + 'not_enough_replicas_count': partitions_count, + } + + if partitions_count == 0: + out['message'] = 'All configured replicas in sync.' + else: + out['message'] = ( + "{0} partition(s) have configured replicas that " + "are not in sync." + ).format(partitions_count) + + if verbose: + lines = ( + "isr={isr} is lower than replicas={replicas} for {topic}:{partition}" + .format( + isr=p['isr'], + replicas=p['replicas'], + topic=p['topic'], + partition=p['partition'], + ) + for p in partitions + ) + out['verbose'] = "Partitions:\n" + "\n".join(lines) + if verbose: + out['raw']['partitions'] = partitions + + return out diff --git a/tests/kafka_check/test_all_isr.py b/tests/kafka_check/test_all_isr.py new file mode 100644 index 00000000..fe5d99e9 --- /dev/null +++ b/tests/kafka_check/test_all_isr.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 Yelp Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import absolute_import + +from kafka.common import PartitionMetadata + +from kafka_utils.kafka_check.commands.all_isr import _prepare_output +from kafka_utils.kafka_check.commands.all_isr import _process_metadata_response + + +TOPICS_STATE = { + 'topic_0': { + 0: PartitionMetadata( + topic='topic_0', + partition=0, + leader=170396635, + replicas=(170396635, 170398981), + isr=(170398981,), + error=0, + ), + }, + 'topic_1': { + 0: PartitionMetadata( + topic='topic_1', + partition=0, + leader=170396635, + replicas=(170396635, 170398981), + isr=(170396635, 170398981), + error=0, + ), + }, +} + +NOT_IN_SYNC_PARTITIONS = [ + { + 'isr': 1, + 'replicas': 2, + 'topic': 'topic_0', + 'partition': 0, + }, +] + + +def test_process_metadata_response_empty(): + result = _process_metadata_response( + topics={}, + ) + + assert result == [] + + +def test_process_metadata_out_of_sync(): + result = _process_metadata_response( + topics=TOPICS_STATE, + ) + + assert result == NOT_IN_SYNC_PARTITIONS + + +def test_prepare_output_ok_no_verbose(): + origin = { + 'message': "All configured replicas in sync.", + 'raw': { + 'not_enough_replicas_count': 0, + } + } + assert _prepare_output([], False) == origin + + +def test_prepare_output_ok_verbose(): + origin = { + 'message': "All configured replicas in sync.", + 'raw': { + 'not_enough_replicas_count': 0, + 'partitions': [], + } + } + assert _prepare_output([], True) == origin + + +def test_prepare_output_critical_no_verbose(): + origin = { + 'message': ( + "1 partition(s) have configured replicas that " + "are not in sync." + ), + 'raw': { + 'not_enough_replicas_count': 1, + } + } + assert _prepare_output(NOT_IN_SYNC_PARTITIONS, False) == origin + + +def test_prepare_output_critical_verbose(): + origin = { + 'message': ( + "1 partition(s) have configured replicas that " + "are not in sync." + ), + 'verbose': ( + "Partitions:\n" + "isr=1 is lower than replicas=2 for topic_0:0" + ), + 'raw': { + 'not_enough_replicas_count': 1, + 'partitions': [ + { + 'isr': 1, + 'replicas': 2, + 'partition': 0, + 'topic': 'topic_0' + }, + ], + } + } + assert _prepare_output(NOT_IN_SYNC_PARTITIONS, True) == origin