From df0a17a748a30e5fbd478290a734159b42f42535 Mon Sep 17 00:00:00 2001 From: Cezar Moise Date: Thu, 12 Sep 2024 17:11:05 +0300 Subject: [PATCH] node,scylla_node,cluster,scylla_cluster: Add type hints for nodes and clusters --- ccmlib/cluster.py | 35 +++++++++++++++++++---------------- ccmlib/node.py | 32 +++++++++++++++++--------------- ccmlib/scylla_cluster.py | 25 +++++++++++++------------ ccmlib/scylla_node.py | 20 +++++++++++--------- 4 files changed, 60 insertions(+), 52 deletions(-) diff --git a/ccmlib/cluster.py b/ccmlib/cluster.py index 89da27a1..4c897f0e 100644 --- a/ccmlib/cluster.py +++ b/ccmlib/cluster.py @@ -3,16 +3,19 @@ import os import random import shutil +import subprocess import threading import time from collections import OrderedDict, defaultdict from concurrent.futures import ThreadPoolExecutor +from typing import List, Tuple from ruamel.yaml import YAML from ccmlib import common, repository from ccmlib.node import Node, NodeError from ccmlib.common import logger +from ccmlib.scylla_node import ScyllaNode from ccmlib.utils.version import parse_version @@ -23,8 +26,8 @@ def __init__(self, path, name, partitioner=None, install_dir=None, create_direct self.id = 0 self.ipprefix = None self.ipformat = None - self.nodes = OrderedDict() - self.seeds = [] + self.nodes: OrderedDict[str, ScyllaNode] = OrderedDict() + self.seeds: List[ScyllaNode] = [] self.partitioner = partitioner self.snitch = snitch self._config_options = {} @@ -140,7 +143,7 @@ class LogWatchingThread(): def __init__(self, cluster): self.executor = ThreadPoolExecutor(max_workers=1) self.thread = None - self.cluster = cluster + self.cluster: Cluster = cluster self.req_stop_event = threading.Event() self.done_event = threading.Event() self.log_positions = defaultdict(int) @@ -213,7 +216,7 @@ def get_install_dir(self): def hasOpscenter(self): return False - def nodelist(self): + def nodelist(self) -> List[ScyllaNode]: return [self.nodes[name] for name in list(self.nodes.keys())] def version(self): @@ -222,7 +225,7 @@ def version(self): def cassandra_version(self): return self.version() - def add(self, node, is_seed, data_center=None, rack=None): + def add(self, node: ScyllaNode, is_seed, data_center=None, rack=None): if node.name in self.nodes: raise common.ArgumentError(f'Cannot create existing node {node.name}') self.nodes[node.name] = node @@ -246,14 +249,14 @@ def add(self, node, is_seed, data_center=None, rack=None): # nodes can be provided in multiple notations, determining the cluster topology: # 1. int - specifying the number of nodes in a single DC, single RACK cluster - # 2. list[int] - specifying the number of nodes in a multi DC, single RACK per DC cluster + # 2. List[int] - specifying the number of nodes in a multi DC, single RACK per DC cluster # The datacenters are automatically named as dc{i}, starting from 1, the rack is named RAC1 # For example, [3, 2] would translate to the following topology {'dc1': {'RAC1': 3}, 'dc2': {'RAC1': 2}} # Where 3 nodes are populated in dc1/RAC1, and 2 nodes are populated in dc2/RAC1 # 3.a dict[str: int] - specifying the number of nodes in a multi DC, single RACK per DC cluster # The dictionary keys explicitly identify each datacenter name, and the value is the number of nodes in the DC. # For example, {'DC1': 3, 'DC2': 2] would translate to the following topology {'DC1': {'RAC1': 3}, 'DC2': {'RAC1': 2}} - # 3.b dict[str: list[int]] - specifying the number of nodes in a multi DC, multi RACK cluster + # 3.b dict[str: List[int]] - specifying the number of nodes in a multi DC, multi RACK cluster # The dictionary keys explicitly identify each datacenter name, and the value is the number of nodes in each RACK in the DC. # Racks are automatically named as RAC{i}, starting from 1 # For example, {'DC1': [2, 2, 2], 'DC2': [3, 3]] would translate to the following topology {'DC1': {'RAC1': 2, 'RAC2': 2, 'RAC3': 2}, 'DC2': {'RAC1': 3, 'RAC2': 3}} @@ -328,8 +331,8 @@ def populate(self, nodes, debug=False, tokens=None, use_vnodes=False, ipprefix=N self._update_config() return self - def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add_node=True, is_seed=True, data_center=None, rack=None): - ipformat = self.get_ipformat() + def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add_node=True, is_seed=True, data_center=None, rack=None) -> ScyllaNode: + ipformat = self.get_ipformat() # noqa: F841 binary = self.get_binary_interface(i) node = self.create_node(name=f'node{i}', auto_bootstrap=auto_bootstrap, @@ -365,7 +368,7 @@ def get_storage_interface(self, nodeid): return (self.get_node_ip(nodeid), 7000) def get_node_jmx_port(self, nodeid): - return 7000 + nodeid * 100 + self.id; + return 7000 + nodeid * 100 + self.id def get_debug_port(self, nodeid): return 2000 + nodeid * 100 @@ -394,7 +397,7 @@ def balanced_tokens_across_dcs(self, dcs): tokens.extend(new_tokens) return tokens - def remove(self, node=None, wait_other_notice=False, other_nodes=None, remove_node_dir=True): + def remove(self, node: ScyllaNode=None, wait_other_notice=False, other_nodes=None, remove_node_dir=True): if node is not None: if node.name not in self.nodes: return @@ -421,7 +424,7 @@ def remove_dir_with_retry(self, path): try: common.rmdirs(path) removed = True - except: + except Exception: tries = tries + 1 time.sleep(.1) if tries == 5: @@ -435,7 +438,7 @@ def clear(self): def get_path(self): return os.path.join(self.path, self.name) - def get_seeds(self, node=None): + def get_seeds(self, node: ScyllaNode=None): # if first node, or there is now seeds at all # or node added is not a seed, return # all cluster seed nodes @@ -450,7 +453,7 @@ def add_seed(self, node): address = node.address() else: address = node - if not address in self.seeds: + if address not in self.seeds: self.seeds.append(address) def show(self, verbose): @@ -476,7 +479,7 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=False, wait_ if wait_other_notice: marks = [(node, node.mark_log()) for node in list(self.nodes.values())] - started = [] + started: List[Tuple[ScyllaNode, subprocess.Popen, int]] = [] for node in list(self.nodes.values()): if not node.is_running(): mark = 0 @@ -677,7 +680,7 @@ def _update_config(self, install_dir=None): with open(filename, 'w') as f: YAML().dump(cluster_config, f) - def __update_pids(self, started): + def __update_pids(self, started: List[Tuple[ScyllaNode, subprocess.Popen, int]]): for node, p, _ in started: node._update_pid(p) diff --git a/ccmlib/node.py b/ccmlib/node.py index 7b9dce3d..4e53311a 100644 --- a/ccmlib/node.py +++ b/ccmlib/node.py @@ -16,14 +16,16 @@ from datetime import datetime import locale from collections import defaultdict, namedtuple +from typing import TYPE_CHECKING from ruamel.yaml import YAML from ccmlib import common -from ccmlib.cli_session import CliSession from ccmlib.repository import setup from ccmlib.utils.version import parse_version +if TYPE_CHECKING: + from ccmlib.scylla_cluster import ScyllaCluster class Status(): UNINITIALIZED = "UNINITIALIZED" @@ -104,7 +106,7 @@ def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_inte is almost always the right choice. """ self.name = name - self.cluster = cluster + self.cluster: 'ScyllaCluster' = cluster self.status = Status.UNINITIALIZED self.auto_bootstrap = auto_bootstrap self.network_interfaces = {'storage': common.normalize_interface(storage_interface), @@ -424,7 +426,7 @@ def print_process_output(self, name, proc, verbose=False): try: stderr = proc.communicate()[1] except ValueError: - [stdout, stderr] = ['', ''] + stderr = '' if stderr is None: stderr = '' if len(stderr) > 1: @@ -709,7 +711,7 @@ def stop(self, wait=True, wait_other_notice=False, other_nodes=None, gently=True if gently is True: try: self.flush() - except: + except Exception: print(f"WARN: Failed to flush node: {self.name} on shutdown.") pass @@ -1086,7 +1088,7 @@ def do_split(f): return results def run_sstablemetadata(self, output_file=None, datafiles=None, keyspace=None, column_families=None): - cdir = self.get_install_dir() + cdir = self.get_install_dir() # noqa: F841 sstablemetadata = self._find_cmd('sstablemetadata') env = self.get_env() sstablefiles = self.__gather_sstables(datafiles=datafiles, keyspace=keyspace, columnfamilies=column_families) @@ -1105,7 +1107,7 @@ def run_sstablemetadata(self, output_file=None, datafiles=None, keyspace=None, c return results def run_sstableexpiredblockers(self, output_file=None, keyspace=None, column_family=None): - cdir = self.get_install_dir() + cdir = self.get_install_dir() # noqa: F841 sstableexpiredblockers = self._find_cmd('sstableexpiredblockers') env = self.get_env() cmd = sstableexpiredblockers + [keyspace, column_family] @@ -1125,7 +1127,7 @@ def get_sstablespath(self, output_file=None, datafiles=None, keyspace=None, tabl return sstablefiles def run_sstablerepairedset(self, set_repaired=True, datafiles=None, keyspace=None, column_families=None): - cdir = self.get_install_dir() + cdir = self.get_install_dir() # noqa: F841 sstablerepairedset = self._find_cmd('sstablerepairedset') env = self.get_env() sstablefiles = self.__gather_sstables(datafiles, keyspace, column_families) @@ -1138,10 +1140,10 @@ def run_sstablerepairedset(self, set_repaired=True, datafiles=None, keyspace=Non subprocess.call(cmd, env=env) def run_sstablelevelreset(self, keyspace, cf, output=False): - cdir = self.get_install_dir() + cdir = self.get_install_dir() # noqa: F841 sstablelevelreset = self._find_cmd('sstablelevelreset') env = self.get_env() - sstablefiles = self.__cleanup_sstables(keyspace, cf) + sstablefiles = self.__cleanup_sstables(keyspace, cf) # noqa: F841 cmd = sstablelevelreset + ["--really-reset", keyspace, cf] @@ -1154,10 +1156,10 @@ def run_sstablelevelreset(self, keyspace, cf, output=False): return subprocess.call(cmd, env=env) def run_sstableofflinerelevel(self, keyspace, cf, dry_run=False, output=False): - cdir = self.get_install_dir() + cdir = self.get_install_dir() # noqa: F841 sstableofflinerelevel = self._find_cmd('sstableofflinerelevel') env = self.get_env() - sstablefiles = self.__cleanup_sstables(keyspace, cf) + sstablefiles = self.__cleanup_sstables(keyspace, cf) # noqa: F841 if dry_run: cmd = sstableofflinerelevel + ["--dry-run", keyspace, cf] @@ -1175,7 +1177,7 @@ def run_sstableofflinerelevel(self, keyspace, cf, dry_run=False, output=False): def run_sstableverify(self, keyspace, cf, options=None, output=False): sstableverify = self.get_tool('sstableverify') env = self.get_env() - sstablefiles = self.__cleanup_sstables(keyspace, cf) + sstablefiles = self.__cleanup_sstables(keyspace, cf) # noqa: F841 if not isinstance(sstableverify, list): sstableverify = [sstableverify] cmd = sstableverify + [keyspace, cf] @@ -1201,7 +1203,7 @@ def _find_cmd(self, cmd): fcmd = common.join_bin(cdir, 'bin', cmd) try: os.chmod(fcmd, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) - except: + except Exception: print(f"WARN: Couldn't change permissions to use {cmd}.") print("WARN: If it didn't work, you will have to do so manually.") return [fcmd] @@ -1879,7 +1881,7 @@ def _update_pid(self, process): start = time.time() while not (os.path.isfile(pidfile) and os.stat(pidfile).st_size > 0): if (time.time() - start > 30.0): - print("Timed out waiting for pidfile {} to be filled (current time is %s): File {} size={}".format( + print("Timed out waiting for pidfile {} to be filled (current time is {}): File {} size={}".format( pidfile, datetime.now(), 'exists' if os.path.isfile(pidfile) else 'does not exist' if not os.path.exists(pidfile) else 'is not a file', @@ -2085,7 +2087,7 @@ def make_pat_for_log_level(level): matchings = [] it = iter(log.splitlines()) for line in it: - l = line if case_sensitive else line.lower() + l = line if case_sensitive else line.lower() # noqa: E741 is_error_line = (error_pat.search(l) and not debug_pat.search(error_pat.split(l)[0])) if not search_str else search_str in l if is_error_line: diff --git a/ccmlib/scylla_cluster.py b/ccmlib/scylla_cluster.py index 82f1d577..bd3b338f 100644 --- a/ccmlib/scylla_cluster.py +++ b/ccmlib/scylla_cluster.py @@ -4,6 +4,7 @@ import time import subprocess import signal +from typing import List, Tuple import uuid import datetime @@ -41,7 +42,7 @@ def __init__(self, path, name, partitioner=None, install_dir=None, install_dir, self.scylla_mode = install_func(install_dir) self.started = False - self.force_wait_for_cluster_start = (force_wait_for_cluster_start != False) + self.force_wait_for_cluster_start = force_wait_for_cluster_start self.__set_default_timeouts() self._scylla_manager = None self.skip_manager_server = skip_manager_server @@ -124,7 +125,7 @@ def start_nodes(self, nodes=None, no_wait=False, verbose=False, wait_for_binary_ elif isinstance(nodes, ScyllaNode): nodes = [nodes] - started = [] + started: List[Tuple[ScyllaNode, subprocess.Popen, int]] = [] for node in nodes: if not node.is_running(): if started: @@ -186,7 +187,7 @@ def stop_nodes(self, nodes=None, wait=True, gently=True, wait_other_notice=False marks = [] if wait_other_notice: if not other_nodes: - other_nodes = [node for node in list(self.nodes.values()) if not node in nodes] + other_nodes = [node for node in list(self.nodes.values()) if node not in nodes] marks = [(node, node.mark_log()) for node in other_nodes if node.is_live()] # stop all nodes in parallel @@ -256,7 +257,7 @@ def _update_config(self, install_dir=None): YAML().dump(data, f) def sctool(self, cmd): - if self._scylla_manager == None: + if self._scylla_manager is None: raise Exception("scylla manager not enabled - sctool command cannot be executed") return self._scylla_manager.sctool(cmd) @@ -315,7 +316,7 @@ def _update_config(self, install_dir=None): with open(conf_file, 'r') as f: data = YAML().load(f) data['http'] = self._get_api_address() - if not 'database' in data: + if 'database' not in data: data['database'] = {} data['database']['hosts'] = [self.scylla_cluster.get_node_ip(1)] data['database']['replication_factor'] = 3 @@ -327,10 +328,10 @@ def _update_config(self, install_dir=None): del data['tls_cert_file'] if 'tls_key_file' in data: del data['tls_key_file'] - if not 'logger' in data: + if 'logger' not in data: data['logger'] = {} data['logger']['mode'] = 'stderr' - if not 'repair' in data: + if 'repair' not in data: data['repair'] = {} if self.version < ComparableScyllaVersion("2.2"): data['repair']['segments_per_repair'] = 16 @@ -387,7 +388,7 @@ def _update_pid(self): pidfile = self._get_pid_file() while not (os.path.isfile(pidfile) and os.stat(pidfile).st_size > 0): if time.time() - start > 30.0: - print("Timed out waiting for pidfile {} to be filled (current time is %s): File {} size={}".format( + print("Timed out waiting for pidfile {} to be filled (current time is {}): File {} size={}".format( pidfile, datetime.now(), 'exists' if os.path.isfile(pidfile) else 'does not exist' if not os.path.exists(pidfile) else 'is not a file', @@ -411,7 +412,7 @@ def start(self): try: os.kill(self._pid, 0) return - except OSError as err: + except OSError: pass log_file = os.path.join(self._get_path(), 'scylla-manager.log') @@ -441,12 +442,12 @@ def stop(self, gently): if gently: try: self._process_scylla_manager.terminate() - except OSError as e: + except OSError: pass else: try: self._process_scylla_manager.kill() - except OSError as e: + except OSError: pass else: if self._pid: @@ -470,7 +471,7 @@ def sctool(self, cmd, ignore_exit_status=False): def agent_check_location(self, location_list, extra_config_file_list=None): agent_bin = os.path.join(self._get_path(), 'bin', 'scylla-manager-agent') locations_names = ','.join(location_list) - agent_conf = os.path.join(self.scylla_cluster.get_path(), 'node1/conf/scylla-manager-agent.yaml') + agent_conf = os.path.join(self.scylla_cluster.get_path(), 'node1/conf/scylla-manager-agent.yaml') # noqa: F841 args = [agent_bin, "check-location", "-L", str(locations_names)] if extra_config_file_list: for config_file in extra_config_file_list: diff --git a/ccmlib/scylla_node.py b/ccmlib/scylla_node.py index 877aff23..77b21473 100644 --- a/ccmlib/scylla_node.py +++ b/ccmlib/scylla_node.py @@ -15,7 +15,7 @@ import threading from pathlib import Path from collections import OrderedDict -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple, Union, TYPE_CHECKING import logging @@ -35,6 +35,8 @@ from ccmlib.scylla_repository import setup, get_scylla_version from ccmlib.utils.version import parse_version +if TYPE_CHECKING: + from ccmlib.scylla_cluster import ScyllaCluster class ScyllaNode(Node): @@ -42,7 +44,7 @@ class ScyllaNode(Node): Provides interactions to a Scylla node. """ - def __init__(self, name, cluster, auto_bootstrap, thrift_interface, + def __init__(self, name, cluster: 'ScyllaCluster', auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None, scylla_manager=None): self._node_install_dir = None @@ -184,7 +186,7 @@ def get_cassandra_version(self): def set_log_level(self, new_level, class_name=None): known_level = {'TRACE' : 'trace', 'DEBUG' : 'debug', 'INFO' : 'info', 'WARN' : 'warn', 'ERROR' : 'error', 'OFF' : 'info'} - if not new_level in known_level: + if new_level not in known_level: raise common.ArgumentError(f"Unknown log level {new_level} (use one of {' '.join(known_level)})") new_log_level = known_level[new_level] @@ -303,7 +305,7 @@ def wait_for_starting(self, from_mark=None, timeout=None): time.sleep(sleep_time) return bool(self.grep_log(f"{bootstrap_message}|{resharding_message}", from_mark=from_mark)) - def _start_scylla(self, args, marks, update_pid, + def _start_scylla(self, args, marks: List[Tuple['ScyllaNode', int]], update_pid, wait_other_notice, wait_normal_token_owner, wait_for_binary_proto, ext_env): log_file = os.path.join(self.get_path(), 'logs', 'system.log') @@ -825,7 +827,7 @@ def _update_scylla_agent_pid(self): start = time.time() while not (os.path.isfile(pidfile) and os.stat(pidfile).st_size > 0): if time.time() - start > 30.0: - print("Timed out waiting for pidfile {} to be filled (current time is %s): File {} size={}".format( + print("Timed out waiting for pidfile {} to be filled (current time is {}): File {} size={}".format( pidfile, datetime.now(), 'exists' if os.path.isfile(pidfile) else 'does not exist' if not os.path.exists(pidfile) else 'is not a file', @@ -881,7 +883,7 @@ def do_stop(self, gently=True): def _wait_until_stopped(self, wait_seconds): return wait_for(func=lambda: not self.is_running(), timeout=wait_seconds) - def wait_until_stopped(self, wait_seconds=None, marks=None, dump_core=True): + def wait_until_stopped(self, wait_seconds=None, marks: List[Tuple['ScyllaNode', int]]=None, dump_core=True): """ Wait until node is stopped after do_stop was called. - wait_other_notice: return only when the other live nodes of the @@ -922,7 +924,7 @@ def wait_until_stopped(self, wait_seconds=None, marks=None, dump_core=True): if self.jmx_pid: try: self.warning("{} scylla-jmx is still running. Killing process using kill({}, SIGKILL)...".format( - self.name, wait_seconds, self.jmx_pid)) + self.name, self.jmx_pid)) os.kill(self.jmx_pid, signal.SIGKILL) except OSError: pass @@ -934,7 +936,7 @@ def wait_until_stopped(self, wait_seconds=None, marks=None, dump_core=True): if node != self: node.watch_log_for_death(self, from_mark=mark) - def stop(self, wait=True, wait_other_notice=False, other_nodes=None, gently=True, wait_seconds=None, marks=None): + def stop(self, wait=True, wait_other_notice=False, other_nodes: List['ScyllaNode']=None, gently=True, wait_seconds=None, marks=None): """ Stop the node. - wait: if True (the default), wait for the Scylla process to be @@ -1393,7 +1395,7 @@ def hostid(self, timeout=60, force_refresh=False): self.error(f"Failed to get hostid using {url}: {e}") return None - def watch_rest_for_alive(self, nodes, timeout=120, wait_normal_token_owner=True): + def watch_rest_for_alive(self, nodes: Union['ScyllaNode', List['ScyllaNode']], timeout=120, wait_normal_token_owner=True): """ Use the REST API to wait until this node detects that the nodes listed in "nodes" become fully operational.