Skip to content

Commit

Permalink
ccmlib/node: check both pending and active tasks when waiting for com…
Browse files Browse the repository at this point in the history
…pactions

- Rename Node._parse_pending_tasks to Node._parse_tasks
- Count all tasks, pending and active
- Allow searching for tasks based on only keyspace
- Update and refactor test to allow more varied cases
  • Loading branch information
cezarmoise authored and fruch committed Sep 24, 2024
1 parent 94ef2e9 commit 8c83ce8
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 14 deletions.
70 changes: 56 additions & 14 deletions ccmlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import warnings
from datetime import datetime
import locale
from collections import namedtuple
from typing import List, Optional
from collections import defaultdict, namedtuple

import yaml

Expand Down Expand Up @@ -79,6 +79,9 @@ def __decode(self, value):

# Groups: 0 = ks, 1 = cf, 2 = tmp or none, 3 = version, 4 = identifier (generation), 4 = "big-" or none, 5 = suffix (Compacted or Data.db)
_sstable_regexp = re.compile(r'((?P<keyspace>[^\s-]+)-(?P<cf>[^\s-]+)-)?(?P<tmp>tmp(link)?-)?(?P<version>[^\s-]+)-(?P<identifier>[^-]+)-(?P<big>big-)?(?P<suffix>[a-zA-Z]+)\.[a-zA-Z0-9]+$')
# Regexes for parsing nodetool compactionstats
_pending_tasks_pattern = re.compile(r'- (?P<ks>\w+)\.(?P<cf>\w+): (?P<tasks>\d+)')
_active_tasks_pattern = re.compile(r'\s*([\w-]+)\s+\w+\s+(?P<ks>\w+)\s+(?P<cf>\w+)\s+\d+\s+\d+\s+\w+\s+\d+\.\d+%')


class Node(object):
Expand Down Expand Up @@ -745,30 +748,69 @@ def stop(self, wait=True, wait_other_notice=False, other_nodes=None, gently=True
else:
return False

def wait_for_compactions(self, idle_timeout=300):
@staticmethod
def _parse_tasks(output: str, keyspace: str, column_family: str):
"""
Returns the total number of tasks
`nodetool compactionstats` prints the compaction stats like:
```
pending tasks: 42
- ks1.cf1: 13
- ks2.cf2: 19
- ks3.cf3: 10
id compaction type keyspace table completed total unit progress
55eaee80-7445-11ef-9197-2931a44dadc4 COMPACTION ks3 cf3 32116 55680 keys 57.68%
55e8f2b0-7445-11ef-b438-2930a44dadc4 COMPACTION ks4 cf4 46789 55936 keys 83.65%
```
"""
Wait for all compactions to finish on this node.
idle_timeout is the time in seconds to wait for progress.
Total time to wait is undeteremined, as long as we observe forward progress.
lines = output.strip().splitlines()
tasks = defaultdict(int)

for line in lines:
line = line.strip()
if match := _pending_tasks_pattern.match(line):
tasks[(match.group("ks"), match.group("cf"))] += int(match.group("tasks"))
elif match := _active_tasks_pattern.match(line):
tasks[(match.group("ks"), match.group("cf"))] += 1

if keyspace is None and column_family is None:
return sum(tasks.values())
elif keyspace is not None and column_family is None:
return sum(num_tasks for (ks, _), num_tasks in tasks.items() if ks == keyspace)
elif keyspace is not None and column_family is not None:
return tasks.get((keyspace, column_family), 0)

def wait_for_compactions(self, keyspace: str=None, column_family: str=None, idle_timeout=300):
"""Wait for all compactions to finish on this node.
:param keyspace: only wait for the compactions performed for specified keyspace.
If not specified, all keyspaces are waited.
Must be provided if collumn_family is provided.
:param column_family: only wait for the compactions performed for specified column family.
If not specified, all column families are waited.
:param idle_timeout: the time in seconds to wait for progress.
Total time to wait is undeteremined, as long as we observe forward progress.
"""
pending_tasks = None
if column_family is not None and keyspace is None:
raise ValueError("Cannot search only by column family, need also keyspace")
pending_tasks = -1
last_change = None
pattern = re.compile(r"pending tasks:\s*(?P<tasks>\d+)")
while not last_change or time.time() - last_change < idle_timeout:
output, err = self.nodetool("compactionstats", capture_output=True)
m = pattern.search(output)
if not m:
raise RuntimeError(f"Cannot find 'pending tasks' in nodetool output.\nOutput: {output}")
n = int(m.group('tasks'))
n = self._parse_tasks(output, keyspace, column_family)
# no active tasks, good!
if n == 0:
return
if n != pending_tasks:
pending_tasks = n
last_change = time.time()
if n > pending_tasks: # background progress
if 0 < pending_tasks < n:
# background progress
self.warning(f"Pending compaction tasks increased from {pending_tasks} to {n} while waiting for compactions.")
pending_tasks = n
time.sleep(1)
raise TimeoutError(f"Waiting for compactions timed out after {idle_timeout} seconds with {pending_tasks} pending tasks remaining.")
raise TimeoutError(f"Waiting for compactions timed out after {idle_timeout} seconds with pending tasks remaining: {output}.")

def nodetool(self, cmd, capture_output=True, wait=True, timeout=None, verbose=True):
"""
Expand Down
95 changes: 95 additions & 0 deletions tests/test_internal_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import pytest
import textwrap
from ccmlib.node import Node

# Define the test cases and corresponding outputs
test_cases = [
{
"id": "only_pending",
"output": textwrap.dedent("""\
pending tasks: 6
- system_schema.tables: 1
- system_schema.columns: 2
- keyspace1.standard1: 3\
"""),
"expected_tasks": [
("system_schema", "tables", 1),
("system_schema", "columns", 2),
("system_schema", None, 3),
("keyspace1", "standard1", 3),
("keyspace1", None, 3),
(None, None, 6),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
},
{
"id": "pending_and_in_progress",
"output": textwrap.dedent("""\
pending tasks: 6
- system_schema.tables: 1
- system_schema.columns: 2
- keyspace1.standard1: 3
id compaction type keyspace table completed total unit progress
8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640 keys 0.16%
Active compaction remaining time : n/a\
"""),
"expected_tasks": [
("system_schema", "tables", 1),
("system_schema", "columns", 3),
("system_schema", None, 4),
("keyspace1", "standard1", 3),
("keyspace1", None, 3),
(None, None, 7),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
},
{
"id": "only_in_progress",
"output": textwrap.dedent("""\
pending tasks: 0
id compaction type keyspace table completed total unit progress
8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640 keys 0.16%
Active compaction remaining time : n/a\
"""),
"expected_tasks": [
("system_schema", "tables", 0),
("system_schema", "columns", 1),
("system_schema", None, 1),
("keyspace1", "standard1", 0),
("keyspace1", None, 0),
(None, None, 1),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
},
{
"id": "no_tasks",
"output": textwrap.dedent("""\
pending tasks: 0
\
"""),
"expected_tasks": [
("system_schema", "tables", 0),
("system_schema", "columns", 0),
("system_schema", None, 0),
("keyspace1", "standard1", 0),
("keyspace1", None, 0),
(None, None, 0),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
}
]

@pytest.mark.parametrize("test_case", test_cases, ids=[tc["id"] for tc in test_cases])
def test_parse_tasks(test_case):
output = test_case["output"]
expected_tasks = test_case["expected_tasks"]

for ks, cf, expected in expected_tasks:
n = Node._parse_tasks(output, ks, cf)
assert n == expected, f"Expected {expected} tasks for {ks}.{cf}, but got {n}"

0 comments on commit 8c83ce8

Please sign in to comment.