diff --git a/kafka/tools/assigner/__main__.py b/kafka/tools/assigner/__main__.py index cb84419..ca0a286 100644 --- a/kafka/tools/assigner/__main__.py +++ b/kafka/tools/assigner/__main__.py @@ -133,7 +133,7 @@ def main(): for i, batch in enumerate(batches): log.info("Executing partition reassignment {0}/{1}: {2}".format(i + 1, len(batches), repr(batch))) - batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run) + batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run, args.throttle) run_plugins_at_step(plugins, 'before_ple') diff --git a/kafka/tools/assigner/arguments.py b/kafka/tools/assigner/arguments.py index bb06b25..14ef034 100644 --- a/kafka/tools/assigner/arguments.py +++ b/kafka/tools/assigner/arguments.py @@ -47,6 +47,7 @@ def set_up_arguments(action_map, sizer_map, plugins): aparser.add_argument('-g', '--generate', help="Generate partition reassignment file", action='store_true') aparser.add_argument('-e', '--execute', help="Execute partition reassignment", action='store_true') aparser.add_argument('-m', '--moves', help="Max number of moves per step", required=False, default=10, type=int) + aparser.add_argument('-r', '--throttle', help="Max B/s per broker to use for reassignment", required=False, default=None) aparser.add_argument('-x', '--exclude-topics', help="Comma-separated list of topics to skip when performing actions", action=CSVAction, default=[]) aparser.add_argument('--sizer', help="Select module to use to get partition sizes", required=False, default='ssh', choices=sizer_map.keys()) aparser.add_argument('-p', '--property', help="Property of the form 'key=value' to be passed to modules (i.e. sizer)", required=False, default=[], diff --git a/kafka/tools/assigner/models/reassignment.py b/kafka/tools/assigner/models/reassignment.py index 2c5cb15..83e9dfe 100644 --- a/kafka/tools/assigner/models/reassignment.py +++ b/kafka/tools/assigner/models/reassignment.py @@ -44,23 +44,30 @@ def dict_for_reassignment(self): reassignment['partitions'].append(partition.dict_for_reassignment()) return reassignment - def execute(self, num, total, zookeeper, tools_path, plugins=[], dry_run=True): + def execute(self, num, total, zookeeper, tools_path, plugins=[], dry_run=True, throttle=None): for plugin in plugins: plugin.before_execute_batch(num) if not dry_run: - self._execute(num, total, zookeeper, tools_path) + self._execute(num, total, zookeeper, tools_path, throttle) for plugin in plugins: plugin.after_execute_batch(num) - def _execute(self, num, total, zookeeper, tools_path): + def _execute(self, num, total, zookeeper, tools_path, throttle): with NamedTemporaryFile(mode='w') as assignfile: json.dump(self.dict_for_reassignment(), assignfile) assignfile.flush() FNULL = open(os.devnull, 'w') - proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--execute', - '--zookeeper', zookeeper, - '--reassignment-json-file', assignfile.name], - stdout=FNULL, stderr=FNULL) + if throttle is not None: + proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--execute', + '--zookeeper', zookeeper, + '--reassignment-json-file', assignfile.name, + '--throttle', throttle], + stdout=FNULL, stderr=FNULL) + else: + proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--execute', + '--zookeeper', zookeeper, + '--reassignment-json-file', assignfile.name], + stdout=FNULL, stderr=FNULL) proc.wait() # Wait until finished diff --git a/tests/tools/assigner/models/test_reassignment.py b/tests/tools/assigner/models/test_reassignment.py index e6491d1..c7686c4 100644 --- a/tests/tools/assigner/models/test_reassignment.py +++ b/tests/tools/assigner/models/test_reassignment.py @@ -42,7 +42,12 @@ def test_reassignment_repr(self): @patch.object(Reassignment, '_execute') def test_reassignment_execute_real(self, mock_exec): self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', plugins=[self.null_plugin], dry_run=False) - mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools') + mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools', None) + + @patch.object(Reassignment, '_execute') + def test_reassignment_execute_throttle(self, mock_exec): + self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', plugins=[self.null_plugin], dry_run=False, throttle='1000') + mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools', '1000') @patch.object(Reassignment, '_execute') def test_reassignment_execute_dryrun(self, mock_exec): @@ -55,7 +60,7 @@ def test_reassignment_internal_execute(self, mock_check, mock_popen): mock_popen.set_default() mock_check.side_effect = [10, 5, 0] - self.reassignment._execute(1, 1, 'zkconnect', '/path/to/tools') + self.reassignment._execute(1, 1, 'zkconnect', '/path/to/tools', None) compare([call.Popen(['/path/to/tools/kafka-reassign-partitions.sh', '--execute', '--zookeeper', 'zkconnect', '--reassignment-json-file', ANY], stderr=ANY, stdout=ANY), diff --git a/tests/tools/assigner/test_main.py b/tests/tools/assigner/test_main.py index 665a6c6..64a047b 100644 --- a/tests/tools/assigner/test_main.py +++ b/tests/tools/assigner/test_main.py @@ -72,6 +72,7 @@ def test_main(self, mock_plugins, mock_sizes): tools_path='/path/to/tools', property=['datadir=/path/to/data'], moves=10, + throttle=1000, execute=False, exclude_topics=[], generate=False,