Skip to content

Commit

Permalink
node,scylla_node,cluster,scylla_cluster: Add type hints for nodes and…
Browse files Browse the repository at this point in the history
… clusters
  • Loading branch information
cezarmoise committed Sep 25, 2024
1 parent 4b1ac16 commit df0a17a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 52 deletions.
35 changes: 19 additions & 16 deletions ccmlib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
import os
import random
import shutil
import subprocess
import threading
import time
from collections import OrderedDict, defaultdict
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple

from ruamel.yaml import YAML

from ccmlib import common, repository
from ccmlib.node import Node, NodeError
from ccmlib.common import logger
from ccmlib.scylla_node import ScyllaNode
from ccmlib.utils.version import parse_version


Expand All @@ -23,8 +26,8 @@ def __init__(self, path, name, partitioner=None, install_dir=None, create_direct
self.id = 0
self.ipprefix = None
self.ipformat = None
self.nodes = OrderedDict()
self.seeds = []
self.nodes: OrderedDict[str, ScyllaNode] = OrderedDict()
self.seeds: List[ScyllaNode] = []
self.partitioner = partitioner
self.snitch = snitch
self._config_options = {}
Expand Down Expand Up @@ -140,7 +143,7 @@ class LogWatchingThread():
def __init__(self, cluster):
self.executor = ThreadPoolExecutor(max_workers=1)
self.thread = None
self.cluster = cluster
self.cluster: Cluster = cluster
self.req_stop_event = threading.Event()
self.done_event = threading.Event()
self.log_positions = defaultdict(int)
Expand Down Expand Up @@ -213,7 +216,7 @@ def get_install_dir(self):
def hasOpscenter(self):
return False

def nodelist(self):
def nodelist(self) -> List[ScyllaNode]:
return [self.nodes[name] for name in list(self.nodes.keys())]

def version(self):
Expand All @@ -222,7 +225,7 @@ def version(self):
def cassandra_version(self):
return self.version()

def add(self, node, is_seed, data_center=None, rack=None):
def add(self, node: ScyllaNode, is_seed, data_center=None, rack=None):
if node.name in self.nodes:
raise common.ArgumentError(f'Cannot create existing node {node.name}')
self.nodes[node.name] = node
Expand All @@ -246,14 +249,14 @@ def add(self, node, is_seed, data_center=None, rack=None):

# nodes can be provided in multiple notations, determining the cluster topology:
# 1. int - specifying the number of nodes in a single DC, single RACK cluster
# 2. list[int] - specifying the number of nodes in a multi DC, single RACK per DC cluster
# 2. List[int] - specifying the number of nodes in a multi DC, single RACK per DC cluster
# The datacenters are automatically named as dc{i}, starting from 1, the rack is named RAC1
# For example, [3, 2] would translate to the following topology {'dc1': {'RAC1': 3}, 'dc2': {'RAC1': 2}}
# Where 3 nodes are populated in dc1/RAC1, and 2 nodes are populated in dc2/RAC1
# 3.a dict[str: int] - specifying the number of nodes in a multi DC, single RACK per DC cluster
# The dictionary keys explicitly identify each datacenter name, and the value is the number of nodes in the DC.
# For example, {'DC1': 3, 'DC2': 2] would translate to the following topology {'DC1': {'RAC1': 3}, 'DC2': {'RAC1': 2}}
# 3.b dict[str: list[int]] - specifying the number of nodes in a multi DC, multi RACK cluster
# 3.b dict[str: List[int]] - specifying the number of nodes in a multi DC, multi RACK cluster
# The dictionary keys explicitly identify each datacenter name, and the value is the number of nodes in each RACK in the DC.
# Racks are automatically named as RAC{i}, starting from 1
# For example, {'DC1': [2, 2, 2], 'DC2': [3, 3]] would translate to the following topology {'DC1': {'RAC1': 2, 'RAC2': 2, 'RAC3': 2}, 'DC2': {'RAC1': 3, 'RAC2': 3}}
Expand Down Expand Up @@ -328,8 +331,8 @@ def populate(self, nodes, debug=False, tokens=None, use_vnodes=False, ipprefix=N
self._update_config()
return self

def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add_node=True, is_seed=True, data_center=None, rack=None):
ipformat = self.get_ipformat()
def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add_node=True, is_seed=True, data_center=None, rack=None) -> ScyllaNode:
ipformat = self.get_ipformat() # noqa: F841
binary = self.get_binary_interface(i)
node = self.create_node(name=f'node{i}',
auto_bootstrap=auto_bootstrap,
Expand Down Expand Up @@ -365,7 +368,7 @@ def get_storage_interface(self, nodeid):
return (self.get_node_ip(nodeid), 7000)

def get_node_jmx_port(self, nodeid):
return 7000 + nodeid * 100 + self.id;
return 7000 + nodeid * 100 + self.id

def get_debug_port(self, nodeid):
return 2000 + nodeid * 100
Expand Down Expand Up @@ -394,7 +397,7 @@ def balanced_tokens_across_dcs(self, dcs):
tokens.extend(new_tokens)
return tokens

def remove(self, node=None, wait_other_notice=False, other_nodes=None, remove_node_dir=True):
def remove(self, node: ScyllaNode=None, wait_other_notice=False, other_nodes=None, remove_node_dir=True):
if node is not None:
if node.name not in self.nodes:
return
Expand All @@ -421,7 +424,7 @@ def remove_dir_with_retry(self, path):
try:
common.rmdirs(path)
removed = True
except:
except Exception:
tries = tries + 1
time.sleep(.1)
if tries == 5:
Expand All @@ -435,7 +438,7 @@ def clear(self):
def get_path(self):
return os.path.join(self.path, self.name)

def get_seeds(self, node=None):
def get_seeds(self, node: ScyllaNode=None):
# if first node, or there is now seeds at all
# or node added is not a seed, return
# all cluster seed nodes
Expand All @@ -450,7 +453,7 @@ def add_seed(self, node):
address = node.address()
else:
address = node
if not address in self.seeds:
if address not in self.seeds:
self.seeds.append(address)

def show(self, verbose):
Expand All @@ -476,7 +479,7 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=False, wait_
if wait_other_notice:
marks = [(node, node.mark_log()) for node in list(self.nodes.values())]

started = []
started: List[Tuple[ScyllaNode, subprocess.Popen, int]] = []
for node in list(self.nodes.values()):
if not node.is_running():
mark = 0
Expand Down Expand Up @@ -677,7 +680,7 @@ def _update_config(self, install_dir=None):
with open(filename, 'w') as f:
YAML().dump(cluster_config, f)

def __update_pids(self, started):
def __update_pids(self, started: List[Tuple[ScyllaNode, subprocess.Popen, int]]):
for node, p, _ in started:
node._update_pid(p)

Expand Down
32 changes: 17 additions & 15 deletions ccmlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
from datetime import datetime
import locale
from collections import defaultdict, namedtuple
from typing import TYPE_CHECKING

from ruamel.yaml import YAML

from ccmlib import common
from ccmlib.cli_session import CliSession
from ccmlib.repository import setup
from ccmlib.utils.version import parse_version

if TYPE_CHECKING:
from ccmlib.scylla_cluster import ScyllaCluster

class Status():
UNINITIALIZED = "UNINITIALIZED"
Expand Down Expand Up @@ -104,7 +106,7 @@ def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_inte
is almost always the right choice.
"""
self.name = name
self.cluster = cluster
self.cluster: 'ScyllaCluster' = cluster
self.status = Status.UNINITIALIZED
self.auto_bootstrap = auto_bootstrap
self.network_interfaces = {'storage': common.normalize_interface(storage_interface),
Expand Down Expand Up @@ -424,7 +426,7 @@ def print_process_output(self, name, proc, verbose=False):
try:
stderr = proc.communicate()[1]
except ValueError:
[stdout, stderr] = ['', '']
stderr = ''
if stderr is None:
stderr = ''
if len(stderr) > 1:
Expand Down Expand Up @@ -709,7 +711,7 @@ def stop(self, wait=True, wait_other_notice=False, other_nodes=None, gently=True
if gently is True:
try:
self.flush()
except:
except Exception:
print(f"WARN: Failed to flush node: {self.name} on shutdown.")
pass

Expand Down Expand Up @@ -1086,7 +1088,7 @@ def do_split(f):
return results

def run_sstablemetadata(self, output_file=None, datafiles=None, keyspace=None, column_families=None):
cdir = self.get_install_dir()
cdir = self.get_install_dir() # noqa: F841
sstablemetadata = self._find_cmd('sstablemetadata')
env = self.get_env()
sstablefiles = self.__gather_sstables(datafiles=datafiles, keyspace=keyspace, columnfamilies=column_families)
Expand All @@ -1105,7 +1107,7 @@ def run_sstablemetadata(self, output_file=None, datafiles=None, keyspace=None, c
return results

def run_sstableexpiredblockers(self, output_file=None, keyspace=None, column_family=None):
cdir = self.get_install_dir()
cdir = self.get_install_dir() # noqa: F841
sstableexpiredblockers = self._find_cmd('sstableexpiredblockers')
env = self.get_env()
cmd = sstableexpiredblockers + [keyspace, column_family]
Expand All @@ -1125,7 +1127,7 @@ def get_sstablespath(self, output_file=None, datafiles=None, keyspace=None, tabl
return sstablefiles

def run_sstablerepairedset(self, set_repaired=True, datafiles=None, keyspace=None, column_families=None):
cdir = self.get_install_dir()
cdir = self.get_install_dir() # noqa: F841
sstablerepairedset = self._find_cmd('sstablerepairedset')
env = self.get_env()
sstablefiles = self.__gather_sstables(datafiles, keyspace, column_families)
Expand All @@ -1138,10 +1140,10 @@ def run_sstablerepairedset(self, set_repaired=True, datafiles=None, keyspace=Non
subprocess.call(cmd, env=env)

def run_sstablelevelreset(self, keyspace, cf, output=False):
cdir = self.get_install_dir()
cdir = self.get_install_dir() # noqa: F841
sstablelevelreset = self._find_cmd('sstablelevelreset')
env = self.get_env()
sstablefiles = self.__cleanup_sstables(keyspace, cf)
sstablefiles = self.__cleanup_sstables(keyspace, cf) # noqa: F841

cmd = sstablelevelreset + ["--really-reset", keyspace, cf]

Expand All @@ -1154,10 +1156,10 @@ def run_sstablelevelreset(self, keyspace, cf, output=False):
return subprocess.call(cmd, env=env)

def run_sstableofflinerelevel(self, keyspace, cf, dry_run=False, output=False):
cdir = self.get_install_dir()
cdir = self.get_install_dir() # noqa: F841
sstableofflinerelevel = self._find_cmd('sstableofflinerelevel')
env = self.get_env()
sstablefiles = self.__cleanup_sstables(keyspace, cf)
sstablefiles = self.__cleanup_sstables(keyspace, cf) # noqa: F841

if dry_run:
cmd = sstableofflinerelevel + ["--dry-run", keyspace, cf]
Expand All @@ -1175,7 +1177,7 @@ def run_sstableofflinerelevel(self, keyspace, cf, dry_run=False, output=False):
def run_sstableverify(self, keyspace, cf, options=None, output=False):
sstableverify = self.get_tool('sstableverify')
env = self.get_env()
sstablefiles = self.__cleanup_sstables(keyspace, cf)
sstablefiles = self.__cleanup_sstables(keyspace, cf) # noqa: F841
if not isinstance(sstableverify, list):
sstableverify = [sstableverify]
cmd = sstableverify + [keyspace, cf]
Expand All @@ -1201,7 +1203,7 @@ def _find_cmd(self, cmd):
fcmd = common.join_bin(cdir, 'bin', cmd)
try:
os.chmod(fcmd, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
except:
except Exception:
print(f"WARN: Couldn't change permissions to use {cmd}.")
print("WARN: If it didn't work, you will have to do so manually.")
return [fcmd]
Expand Down Expand Up @@ -1879,7 +1881,7 @@ def _update_pid(self, process):
start = time.time()
while not (os.path.isfile(pidfile) and os.stat(pidfile).st_size > 0):
if (time.time() - start > 30.0):
print("Timed out waiting for pidfile {} to be filled (current time is %s): File {} size={}".format(
print("Timed out waiting for pidfile {} to be filled (current time is {}): File {} size={}".format(
pidfile,
datetime.now(),
'exists' if os.path.isfile(pidfile) else 'does not exist' if not os.path.exists(pidfile) else 'is not a file',
Expand Down Expand Up @@ -2085,7 +2087,7 @@ def make_pat_for_log_level(level):
matchings = []
it = iter(log.splitlines())
for line in it:
l = line if case_sensitive else line.lower()
l = line if case_sensitive else line.lower() # noqa: E741
is_error_line = (error_pat.search(l) and
not debug_pat.search(error_pat.split(l)[0])) if not search_str else search_str in l
if is_error_line:
Expand Down
25 changes: 13 additions & 12 deletions ccmlib/scylla_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import subprocess
import signal
from typing import List, Tuple
import uuid
import datetime

Expand Down Expand Up @@ -41,7 +42,7 @@ def __init__(self, path, name, partitioner=None, install_dir=None,
install_dir, self.scylla_mode = install_func(install_dir)

self.started = False
self.force_wait_for_cluster_start = (force_wait_for_cluster_start != False)
self.force_wait_for_cluster_start = force_wait_for_cluster_start
self.__set_default_timeouts()
self._scylla_manager = None
self.skip_manager_server = skip_manager_server
Expand Down Expand Up @@ -124,7 +125,7 @@ def start_nodes(self, nodes=None, no_wait=False, verbose=False, wait_for_binary_
elif isinstance(nodes, ScyllaNode):
nodes = [nodes]

started = []
started: List[Tuple[ScyllaNode, subprocess.Popen, int]] = []
for node in nodes:
if not node.is_running():
if started:
Expand Down Expand Up @@ -186,7 +187,7 @@ def stop_nodes(self, nodes=None, wait=True, gently=True, wait_other_notice=False
marks = []
if wait_other_notice:
if not other_nodes:
other_nodes = [node for node in list(self.nodes.values()) if not node in nodes]
other_nodes = [node for node in list(self.nodes.values()) if node not in nodes]
marks = [(node, node.mark_log()) for node in other_nodes if node.is_live()]

# stop all nodes in parallel
Expand Down Expand Up @@ -256,7 +257,7 @@ def _update_config(self, install_dir=None):
YAML().dump(data, f)

def sctool(self, cmd):
if self._scylla_manager == None:
if self._scylla_manager is None:
raise Exception("scylla manager not enabled - sctool command cannot be executed")
return self._scylla_manager.sctool(cmd)

Expand Down Expand Up @@ -315,7 +316,7 @@ def _update_config(self, install_dir=None):
with open(conf_file, 'r') as f:
data = YAML().load(f)
data['http'] = self._get_api_address()
if not 'database' in data:
if 'database' not in data:
data['database'] = {}
data['database']['hosts'] = [self.scylla_cluster.get_node_ip(1)]
data['database']['replication_factor'] = 3
Expand All @@ -327,10 +328,10 @@ def _update_config(self, install_dir=None):
del data['tls_cert_file']
if 'tls_key_file' in data:
del data['tls_key_file']
if not 'logger' in data:
if 'logger' not in data:
data['logger'] = {}
data['logger']['mode'] = 'stderr'
if not 'repair' in data:
if 'repair' not in data:
data['repair'] = {}
if self.version < ComparableScyllaVersion("2.2"):
data['repair']['segments_per_repair'] = 16
Expand Down Expand Up @@ -387,7 +388,7 @@ def _update_pid(self):
pidfile = self._get_pid_file()
while not (os.path.isfile(pidfile) and os.stat(pidfile).st_size > 0):
if time.time() - start > 30.0:
print("Timed out waiting for pidfile {} to be filled (current time is %s): File {} size={}".format(
print("Timed out waiting for pidfile {} to be filled (current time is {}): File {} size={}".format(
pidfile,
datetime.now(),
'exists' if os.path.isfile(pidfile) else 'does not exist' if not os.path.exists(pidfile) else 'is not a file',
Expand All @@ -411,7 +412,7 @@ def start(self):
try:
os.kill(self._pid, 0)
return
except OSError as err:
except OSError:
pass

log_file = os.path.join(self._get_path(), 'scylla-manager.log')
Expand Down Expand Up @@ -441,12 +442,12 @@ def stop(self, gently):
if gently:
try:
self._process_scylla_manager.terminate()
except OSError as e:
except OSError:
pass
else:
try:
self._process_scylla_manager.kill()
except OSError as e:
except OSError:
pass
else:
if self._pid:
Expand All @@ -470,7 +471,7 @@ def sctool(self, cmd, ignore_exit_status=False):
def agent_check_location(self, location_list, extra_config_file_list=None):
agent_bin = os.path.join(self._get_path(), 'bin', 'scylla-manager-agent')
locations_names = ','.join(location_list)
agent_conf = os.path.join(self.scylla_cluster.get_path(), 'node1/conf/scylla-manager-agent.yaml')
agent_conf = os.path.join(self.scylla_cluster.get_path(), 'node1/conf/scylla-manager-agent.yaml') # noqa: F841
args = [agent_bin, "check-location", "-L", str(locations_names)]
if extra_config_file_list:
for config_file in extra_config_file_list:
Expand Down
Loading

0 comments on commit df0a17a

Please sign in to comment.