diff --git a/ccmlib/node.py b/ccmlib/node.py index 171fda78..19a38228 100644 --- a/ccmlib/node.py +++ b/ccmlib/node.py @@ -16,8 +16,8 @@ import warnings from datetime import datetime import locale -from collections import namedtuple from typing import List, Optional +from collections import defaultdict, namedtuple import yaml @@ -79,6 +79,9 @@ def __decode(self, value): # Groups: 0 = ks, 1 = cf, 2 = tmp or none, 3 = version, 4 = identifier (generation), 4 = "big-" or none, 5 = suffix (Compacted or Data.db) _sstable_regexp = re.compile(r'((?P[^\s-]+)-(?P[^\s-]+)-)?(?Ptmp(link)?-)?(?P[^\s-]+)-(?P[^-]+)-(?Pbig-)?(?P[a-zA-Z]+)\.[a-zA-Z0-9]+$') +# Regexes for parsing nodetool compactionstats +_pending_tasks_pattern = re.compile(r'- (?P\w+)\.(?P\w+): (?P\d+)') +_active_tasks_pattern = re.compile(r'\s*([\w-]+)\s+\w+\s+(?P\w+)\s+(?P\w+)\s+\d+\s+\d+\s+\w+\s+\d+\.\d+%') class Node(object): @@ -745,30 +748,69 @@ def stop(self, wait=True, wait_other_notice=False, other_nodes=None, gently=True else: return False - def wait_for_compactions(self, idle_timeout=300): + @staticmethod + def _parse_tasks(output: str, keyspace: str, column_family: str): + """ + Returns the total number of tasks + + `nodetool compactionstats` prints the compaction stats like: + ``` + pending tasks: 42 + - ks1.cf1: 13 + - ks2.cf2: 19 + - ks3.cf3: 10 + + id compaction type keyspace table completed total unit progress + 55eaee80-7445-11ef-9197-2931a44dadc4 COMPACTION ks3 cf3 32116 55680 keys 57.68% + 55e8f2b0-7445-11ef-b438-2930a44dadc4 COMPACTION ks4 cf4 46789 55936 keys 83.65% + ``` """ - Wait for all compactions to finish on this node. - idle_timeout is the time in seconds to wait for progress. - Total time to wait is undeteremined, as long as we observe forward progress. + lines = output.strip().splitlines() + tasks = defaultdict(int) + + for line in lines: + line = line.strip() + if match := _pending_tasks_pattern.match(line): + tasks[(match.group("ks"), match.group("cf"))] += int(match.group("tasks")) + elif match := _active_tasks_pattern.match(line): + tasks[(match.group("ks"), match.group("cf"))] += 1 + + if keyspace is None and column_family is None: + return sum(tasks.values()) + elif keyspace is not None and column_family is None: + return sum(num_tasks for (ks, _), num_tasks in tasks.items() if ks == keyspace) + elif keyspace is not None and column_family is not None: + return tasks.get((keyspace, column_family), 0) + + def wait_for_compactions(self, keyspace: str=None, column_family: str=None, idle_timeout=300): + """Wait for all compactions to finish on this node. + + :param keyspace: only wait for the compactions performed for specified keyspace. + If not specified, all keyspaces are waited. + Must be provided if collumn_family is provided. + :param column_family: only wait for the compactions performed for specified column family. + If not specified, all column families are waited. + :param idle_timeout: the time in seconds to wait for progress. + Total time to wait is undeteremined, as long as we observe forward progress. """ - pending_tasks = None + if column_family is not None and keyspace is None: + raise ValueError("Cannot search only by column family, need also keyspace") + pending_tasks = -1 last_change = None - pattern = re.compile(r"pending tasks:\s*(?P\d+)") while not last_change or time.time() - last_change < idle_timeout: output, err = self.nodetool("compactionstats", capture_output=True) - m = pattern.search(output) - if not m: - raise RuntimeError(f"Cannot find 'pending tasks' in nodetool output.\nOutput: {output}") - n = int(m.group('tasks')) + n = self._parse_tasks(output, keyspace, column_family) + # no active tasks, good! if n == 0: return if n != pending_tasks: - pending_tasks = n last_change = time.time() - if n > pending_tasks: # background progress + if 0 < pending_tasks < n: + # background progress self.warning(f"Pending compaction tasks increased from {pending_tasks} to {n} while waiting for compactions.") + pending_tasks = n time.sleep(1) - raise TimeoutError(f"Waiting for compactions timed out after {idle_timeout} seconds with {pending_tasks} pending tasks remaining.") + raise TimeoutError(f"Waiting for compactions timed out after {idle_timeout} seconds with pending tasks remaining: {output}.") def nodetool(self, cmd, capture_output=True, wait=True, timeout=None, verbose=True): """ diff --git a/tests/test_internal_functions.py b/tests/test_internal_functions.py new file mode 100644 index 00000000..76ca06fb --- /dev/null +++ b/tests/test_internal_functions.py @@ -0,0 +1,95 @@ +import pytest +import textwrap +from ccmlib.node import Node + +# Define the test cases and corresponding outputs +test_cases = [ + { + "id": "only_pending", + "output": textwrap.dedent("""\ + pending tasks: 6 + - system_schema.tables: 1 + - system_schema.columns: 2 + - keyspace1.standard1: 3\ + """), + "expected_tasks": [ + ("system_schema", "tables", 1), + ("system_schema", "columns", 2), + ("system_schema", None, 3), + ("keyspace1", "standard1", 3), + ("keyspace1", None, 3), + (None, None, 6), + ("keyspace1x", None, 0), + ("keyspace1x", "table1x", 0), + ] + }, + { + "id": "pending_and_in_progress", + "output": textwrap.dedent("""\ + pending tasks: 6 + - system_schema.tables: 1 + - system_schema.columns: 2 + - keyspace1.standard1: 3 + + id compaction type keyspace table completed total unit progress + 8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640 keys 0.16% + Active compaction remaining time : n/a\ + """), + "expected_tasks": [ + ("system_schema", "tables", 1), + ("system_schema", "columns", 3), + ("system_schema", None, 4), + ("keyspace1", "standard1", 3), + ("keyspace1", None, 3), + (None, None, 7), + ("keyspace1x", None, 0), + ("keyspace1x", "table1x", 0), + ] + }, + { + "id": "only_in_progress", + "output": textwrap.dedent("""\ + pending tasks: 0 + + id compaction type keyspace table completed total unit progress + 8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640 keys 0.16% + Active compaction remaining time : n/a\ + """), + "expected_tasks": [ + ("system_schema", "tables", 0), + ("system_schema", "columns", 1), + ("system_schema", None, 1), + ("keyspace1", "standard1", 0), + ("keyspace1", None, 0), + (None, None, 1), + ("keyspace1x", None, 0), + ("keyspace1x", "table1x", 0), + ] + }, + { + "id": "no_tasks", + "output": textwrap.dedent("""\ + pending tasks: 0 + \ + """), + "expected_tasks": [ + ("system_schema", "tables", 0), + ("system_schema", "columns", 0), + ("system_schema", None, 0), + ("keyspace1", "standard1", 0), + ("keyspace1", None, 0), + (None, None, 0), + ("keyspace1x", None, 0), + ("keyspace1x", "table1x", 0), + ] + } +] + +@pytest.mark.parametrize("test_case", test_cases, ids=[tc["id"] for tc in test_cases]) +def test_parse_tasks(test_case): + output = test_case["output"] + expected_tasks = test_case["expected_tasks"] + + for ks, cf, expected in expected_tasks: + n = Node._parse_tasks(output, ks, cf) + assert n == expected, f"Expected {expected} tasks for {ks}.{cf}, but got {n}"