Skip to content

Commit

Permalink
ccmlib/node: check both pending and active tasks when waiting for com…
Browse files Browse the repository at this point in the history
…pactions

- Rename Node._parse_pending_tasks to Node._parse_tasks
- Count all tasks, pending and active
- Allow searching for tasks based on only keyspace
- Update and refactor test to allow more varied cases
  • Loading branch information
cezarmoise committed Sep 17, 2024
1 parent eabbf9f commit 93b5eef
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 77 deletions.
85 changes: 43 additions & 42 deletions ccmlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import warnings
from datetime import datetime
import locale
from collections import namedtuple
from collections import defaultdict, namedtuple

from ruamel.yaml import YAML

Expand Down Expand Up @@ -745,58 +745,59 @@ def stop(self, wait=True, wait_other_notice=False, other_nodes=None, gently=True
return False

@staticmethod
def _parse_pending_tasks(output, keyspace, column_family):
# "nodetool compactionstats" prints the compaction stats like:
# pending tasks: 42
# - ks1.cf1: 13
# - ks1.cf2: 19
# - ks2.cf1: 10
head_pattern = r"pending tasks:\s*(?P<tasks>\d+)"
tail_pattern = re.compile(r'\-\s+(\w+)\.(\w+):\s*(\d+)')
head, *tail = output.strip().split('\n')
def _parse_tasks(output: str, keyspace: str, column_family: str):
"""
Returns the total number of tasks
`nodetool compactionstats` prints the compaction stats like:
```
pending tasks: 42
- ks1.cf1: 13
- ks2.cf2: 19
- ks3.cf3: 10
id compaction type keyspace table completed total unit progress
55eaee80-7445-11ef-9197-2931a44dadc4 COMPACTION ks3 cf3 32116 55680 keys 57.68%
55e8f2b0-7445-11ef-b438-2930a44dadc4 COMPACTION ks4 cf4 46789 55936 keys 83.65%
```
"""
lines = output.strip().splitlines()
tasks = defaultdict(int)
pending_tasks_pattern = re.compile(r'- (?P<ks>\w+)\.(?P<cf>\w+): (?P<tasks>\d+)')
active_tasks_pattern = re.compile(r'\s*([\w-]+)\s+\w+\s+(?P<ks>\w+)\s+(?P<cf>\w+)\s+\d+\s+\d+\s+\w+\s+\d+\.\d+%')

for line in lines:
line = line.strip()
if match := pending_tasks_pattern.match(line):
tasks[(match.group("ks"), match.group("cf"))] += int(match.group("tasks"))
elif match := active_tasks_pattern.match(line):
tasks[(match.group("ks"), match.group("cf"))] += 1

if keyspace is None and column_family is None:
matched = re.search(head_pattern, head)
if not matched:
raise RuntimeError(f"Cannot find 'pending tasks' in nodetool output.\nOutput: {output}")
return int(matched.group('tasks'))

# if keyspace or column_family is specified, check active tasks
# of the specified ks.cf instead
def matches(expected, actual):
if expected is None:
return True
return expected == actual
return sum(tasks.values())
elif keyspace is not None and column_family is None:
return sum(num_tasks for (ks, _), num_tasks in tasks.items() if ks == keyspace)
elif keyspace is not None and column_family is not None:
return tasks.get((keyspace, column_family), 0)

total = 0
for line in tail:
m = tail_pattern.search(line)
if not m:
break
ks, cf, num = m.groups()
if matches(keyspace, ks) and matches(column_family, cf):
total += int(num)
return total

def wait_for_compactions(self,
keyspace=None,
column_family=None,
idle_timeout=300):
def wait_for_compactions(self, keyspace: str=None, column_family: str=None, idle_timeout=300):
"""Wait for all compactions to finish on this node.
:param keyspace: only wait for the compactions performed for specified
keyspace. if not specified, all keyspaces are waited
:param column_family: only wait for the compactions performed for
specified column_family. if not specified, all keyspaces are waited
:param keyspace: only wait for the compactions performed for specified keyspace.
If not specified, all keyspaces are waited.
Must be provided if collumn_family is provided.
:param column_family: only wait for the compactions performed for specified column_family.
If not specified, all keyspaces are waited.
:param idle_timeout: the time in seconds to wait for progress.
Total time to wait is undeteremined, as long as we observe forward
progress.
Total time to wait is undeteremined, as long as we observe forward progress.
"""
if column_family is not None and keyspace is None:
raise ValueError("Cannot search only by column family, need also keyspace")
pending_tasks = -1
last_change = None
while not last_change or time.time() - last_change < idle_timeout:
output, err = self.nodetool("compactionstats", capture_output=True)
n = self._parse_pending_tasks(output, keyspace, column_family)
n = self._parse_tasks(output, keyspace, column_family)
# no active tasks, good!
if n == 0:
return
Expand Down
125 changes: 90 additions & 35 deletions tests/test_internal_functions.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,95 @@
import pytest
import textwrap
from ccmlib.node import Node

def test_parse_pending_tasks():
def verify_result(output, keyspace, column_family, expected):
n = Node._parse_pending_tasks(output, keyspace, column_family)
assert n == expected
# Define the test cases and corresponding outputs
test_cases = [
{
"id": "only_pending",
"output": textwrap.dedent("""\
pending tasks: 6
- system_schema.tables: 1
- system_schema.columns: 2
- keyspace1.standard1: 3\
"""),
"expected_tasks": [
("system_schema", "tables", 1),
("system_schema", "columns", 2),
("system_schema", None, 3),
("keyspace1", "standard1", 3),
("keyspace1", None, 3),
(None, None, 6),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
},
{
"id": "pending_and_in_progress",
"output": textwrap.dedent("""\
pending tasks: 6
- system_schema.tables: 1
- system_schema.columns: 2
- keyspace1.standard1: 3
def verify_cases(output, cases):
for ks, cf, expected in cases:
verify_result(output, ks, cf, expected)
id compaction type keyspace table completed total unit progress
8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640 keys 0.16%
Active compaction remaining time : n/a\
"""),
"expected_tasks": [
("system_schema", "tables", 1),
("system_schema", "columns", 3),
("system_schema", None, 4),
("keyspace1", "standard1", 3),
("keyspace1", None, 3),
(None, None, 7),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
},
{
"id": "only_in_progress",
"output": textwrap.dedent("""\
pending tasks: 0
for output in [
'''
pending tasks: 6
- system_schema.tables: 1
- system_schema.columns: 2
- keyspace1.standard1: 3
''',
'''
pending tasks: 6
- system_schema.tables: 1
- system_schema.columns: 2
- keyspace1.standard1: 3
id compaction type keyspace table completed total unit progress
8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640 keys 0.16%
Active compaction remaining time : n/a\
"""),
"expected_tasks": [
("system_schema", "tables", 0),
("system_schema", "columns", 1),
("system_schema", None, 1),
("keyspace1", "standard1", 0),
("keyspace1", None, 0),
(None, None, 1),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
},
{
"id": "no_tasks",
"output": textwrap.dedent("""\
pending tasks: 0
\
"""),
"expected_tasks": [
("system_schema", "tables", 0),
("system_schema", "columns", 0),
("system_schema", None, 0),
("keyspace1", "standard1", 0),
("keyspace1", None, 0),
(None, None, 0),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
}
]

id compaction type keyspace table completed total unit progress
8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640
keys 0.16%
Active compaction remaining time : n/a
'''
]:
verify_cases(output, [
('system_schema', 'tables', 1),
('system_schema', 'columns', 2),
('system_schema', None, 3),
('keyspace1', 'standard1', 3),
('keyspace1', None, 3),
(None, None, 6),
('keyspace1x', None, 0),
('keyspace1x', 'table1x', 0),
])
@pytest.mark.parametrize("test_case", test_cases, ids=[tc["id"] for tc in test_cases])
def test_parse_tasks(test_case):
output = test_case["output"]
expected_tasks = test_case["expected_tasks"]

for ks, cf, expected in expected_tasks:
n = Node._parse_tasks(output, ks, cf)
assert n == expected, f"Expected {expected} tasks for {ks}.{cf}, but got {n}"

0 comments on commit 93b5eef

Please sign in to comment.