-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
node,scylla_node,cluster,scylla_cluster: Add type hints for nodes and clusters #609
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,16 +3,19 @@ | |
import os | ||
import random | ||
import shutil | ||
import subprocess | ||
import threading | ||
import time | ||
from collections import OrderedDict, defaultdict | ||
from concurrent.futures import ThreadPoolExecutor | ||
from typing import List, Tuple | ||
|
||
from ruamel.yaml import YAML | ||
|
||
from ccmlib import common, repository | ||
from ccmlib.node import Node, NodeError | ||
from ccmlib.common import logger | ||
from ccmlib.scylla_node import ScyllaNode | ||
from ccmlib.utils.version import parse_version | ||
|
||
|
||
|
@@ -23,8 +26,8 @@ def __init__(self, path, name, partitioner=None, install_dir=None, create_direct | |
self.id = 0 | ||
self.ipprefix = None | ||
self.ipformat = None | ||
self.nodes = OrderedDict() | ||
self.seeds = [] | ||
self.nodes: OrderedDict[str, ScyllaNode] = OrderedDict() | ||
self.seeds: List[ScyllaNode] = [] | ||
self.partitioner = partitioner | ||
self.snitch = snitch | ||
self._config_options = {} | ||
|
@@ -140,7 +143,7 @@ class LogWatchingThread(): | |
def __init__(self, cluster): | ||
self.executor = ThreadPoolExecutor(max_workers=1) | ||
self.thread = None | ||
self.cluster = cluster | ||
self.cluster: Cluster = cluster | ||
self.req_stop_event = threading.Event() | ||
self.done_event = threading.Event() | ||
self.log_positions = defaultdict(int) | ||
|
@@ -213,7 +216,7 @@ def get_install_dir(self): | |
def hasOpscenter(self): | ||
return False | ||
|
||
def nodelist(self): | ||
def nodelist(self) -> List[ScyllaNode]: | ||
return [self.nodes[name] for name in list(self.nodes.keys())] | ||
|
||
def version(self): | ||
|
@@ -222,7 +225,7 @@ def version(self): | |
def cassandra_version(self): | ||
return self.version() | ||
|
||
def add(self, node, is_seed, data_center=None, rack=None): | ||
def add(self, node: ScyllaNode, is_seed, data_center=None, rack=None): | ||
if node.name in self.nodes: | ||
raise common.ArgumentError(f'Cannot create existing node {node.name}') | ||
self.nodes[node.name] = node | ||
|
@@ -246,14 +249,14 @@ def add(self, node, is_seed, data_center=None, rack=None): | |
|
||
# nodes can be provided in multiple notations, determining the cluster topology: | ||
# 1. int - specifying the number of nodes in a single DC, single RACK cluster | ||
# 2. list[int] - specifying the number of nodes in a multi DC, single RACK per DC cluster | ||
# 2. List[int] - specifying the number of nodes in a multi DC, single RACK per DC cluster | ||
# The datacenters are automatically named as dc{i}, starting from 1, the rack is named RAC1 | ||
# For example, [3, 2] would translate to the following topology {'dc1': {'RAC1': 3}, 'dc2': {'RAC1': 2}} | ||
# Where 3 nodes are populated in dc1/RAC1, and 2 nodes are populated in dc2/RAC1 | ||
# 3.a dict[str: int] - specifying the number of nodes in a multi DC, single RACK per DC cluster | ||
# The dictionary keys explicitly identify each datacenter name, and the value is the number of nodes in the DC. | ||
# For example, {'DC1': 3, 'DC2': 2] would translate to the following topology {'DC1': {'RAC1': 3}, 'DC2': {'RAC1': 2}} | ||
# 3.b dict[str: list[int]] - specifying the number of nodes in a multi DC, multi RACK cluster | ||
# 3.b dict[str: List[int]] - specifying the number of nodes in a multi DC, multi RACK cluster | ||
# The dictionary keys explicitly identify each datacenter name, and the value is the number of nodes in each RACK in the DC. | ||
# Racks are automatically named as RAC{i}, starting from 1 | ||
# For example, {'DC1': [2, 2, 2], 'DC2': [3, 3]] would translate to the following topology {'DC1': {'RAC1': 2, 'RAC2': 2, 'RAC3': 2}, 'DC2': {'RAC1': 3, 'RAC2': 3}} | ||
|
@@ -328,8 +331,8 @@ def populate(self, nodes, debug=False, tokens=None, use_vnodes=False, ipprefix=N | |
self._update_config() | ||
return self | ||
|
||
def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add_node=True, is_seed=True, data_center=None, rack=None): | ||
ipformat = self.get_ipformat() | ||
def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add_node=True, is_seed=True, data_center=None, rack=None) -> ScyllaNode: | ||
ipformat = self.get_ipformat() # noqa: F841 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the point of introducing noqa ? If we don't introduce the linter as well ? |
||
binary = self.get_binary_interface(i) | ||
node = self.create_node(name=f'node{i}', | ||
auto_bootstrap=auto_bootstrap, | ||
|
@@ -365,7 +368,7 @@ def get_storage_interface(self, nodeid): | |
return (self.get_node_ip(nodeid), 7000) | ||
|
||
def get_node_jmx_port(self, nodeid): | ||
return 7000 + nodeid * 100 + self.id; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shame to get rid of ANSI C, isn't it ? 😜 |
||
return 7000 + nodeid * 100 + self.id | ||
|
||
def get_debug_port(self, nodeid): | ||
return 2000 + nodeid * 100 | ||
|
@@ -394,7 +397,7 @@ def balanced_tokens_across_dcs(self, dcs): | |
tokens.extend(new_tokens) | ||
return tokens | ||
|
||
def remove(self, node=None, wait_other_notice=False, other_nodes=None, remove_node_dir=True): | ||
def remove(self, node: ScyllaNode=None, wait_other_notice=False, other_nodes=None, remove_node_dir=True): | ||
if node is not None: | ||
if node.name not in self.nodes: | ||
return | ||
|
@@ -421,7 +424,7 @@ def remove_dir_with_retry(self, path): | |
try: | ||
common.rmdirs(path) | ||
removed = True | ||
except: | ||
except Exception: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Linter should be angry about this, isn't it ? too broad exception ? I think in this case we can set a specific set of possible exceptions |
||
tries = tries + 1 | ||
time.sleep(.1) | ||
if tries == 5: | ||
|
@@ -435,7 +438,7 @@ def clear(self): | |
def get_path(self): | ||
return os.path.join(self.path, self.name) | ||
|
||
def get_seeds(self, node=None): | ||
def get_seeds(self, node: ScyllaNode=None): | ||
# if first node, or there is now seeds at all | ||
# or node added is not a seed, return | ||
# all cluster seed nodes | ||
|
@@ -450,7 +453,7 @@ def add_seed(self, node): | |
address = node.address() | ||
else: | ||
address = node | ||
if not address in self.seeds: | ||
if address not in self.seeds: | ||
self.seeds.append(address) | ||
|
||
def show(self, verbose): | ||
|
@@ -476,7 +479,7 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=False, wait_ | |
if wait_other_notice: | ||
marks = [(node, node.mark_log()) for node in list(self.nodes.values())] | ||
|
||
started = [] | ||
started: List[Tuple[ScyllaNode, subprocess.Popen, int]] = [] | ||
for node in list(self.nodes.values()): | ||
if not node.is_running(): | ||
mark = 0 | ||
|
@@ -677,7 +680,7 @@ def _update_config(self, install_dir=None): | |
with open(filename, 'w') as f: | ||
YAML().dump(cluster_config, f) | ||
|
||
def __update_pids(self, started): | ||
def __update_pids(self, started: List[Tuple[ScyllaNode, subprocess.Popen, int]]): | ||
for node, p, _ in started: | ||
node._update_pid(p) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,14 +16,16 @@ | |
from datetime import datetime | ||
import locale | ||
from collections import defaultdict, namedtuple | ||
from typing import TYPE_CHECKING | ||
|
||
from ruamel.yaml import YAML | ||
|
||
from ccmlib import common | ||
from ccmlib.cli_session import CliSession | ||
from ccmlib.repository import setup | ||
from ccmlib.utils.version import parse_version | ||
|
||
if TYPE_CHECKING: | ||
from ccmlib.scylla_cluster import ScyllaCluster | ||
|
||
class Status(): | ||
UNINITIALIZED = "UNINITIALIZED" | ||
|
@@ -104,7 +106,7 @@ def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_inte | |
is almost always the right choice. | ||
""" | ||
self.name = name | ||
self.cluster = cluster | ||
self.cluster: 'ScyllaCluster' = cluster | ||
self.status = Status.UNINITIALIZED | ||
self.auto_bootstrap = auto_bootstrap | ||
self.network_interfaces = {'storage': common.normalize_interface(storage_interface), | ||
|
@@ -424,7 +426,7 @@ def print_process_output(self, name, proc, verbose=False): | |
try: | ||
stderr = proc.communicate()[1] | ||
except ValueError: | ||
[stdout, stderr] = ['', ''] | ||
stderr = '' | ||
if stderr is None: | ||
stderr = '' | ||
if len(stderr) > 1: | ||
|
@@ -709,7 +711,7 @@ def stop(self, wait=True, wait_other_notice=False, other_nodes=None, gently=True | |
if gently is True: | ||
try: | ||
self.flush() | ||
except: | ||
except Exception: | ||
print(f"WARN: Failed to flush node: {self.name} on shutdown.") | ||
pass | ||
|
||
|
@@ -1086,7 +1088,7 @@ def do_split(f): | |
return results | ||
|
||
def run_sstablemetadata(self, output_file=None, datafiles=None, keyspace=None, column_families=None): | ||
cdir = self.get_install_dir() | ||
cdir = self.get_install_dir() # noqa: F841 | ||
sstablemetadata = self._find_cmd('sstablemetadata') | ||
env = self.get_env() | ||
sstablefiles = self.__gather_sstables(datafiles=datafiles, keyspace=keyspace, columnfamilies=column_families) | ||
|
@@ -1105,7 +1107,7 @@ def run_sstablemetadata(self, output_file=None, datafiles=None, keyspace=None, c | |
return results | ||
|
||
def run_sstableexpiredblockers(self, output_file=None, keyspace=None, column_family=None): | ||
cdir = self.get_install_dir() | ||
cdir = self.get_install_dir() # noqa: F841 | ||
sstableexpiredblockers = self._find_cmd('sstableexpiredblockers') | ||
env = self.get_env() | ||
cmd = sstableexpiredblockers + [keyspace, column_family] | ||
|
@@ -1125,7 +1127,7 @@ def get_sstablespath(self, output_file=None, datafiles=None, keyspace=None, tabl | |
return sstablefiles | ||
|
||
def run_sstablerepairedset(self, set_repaired=True, datafiles=None, keyspace=None, column_families=None): | ||
cdir = self.get_install_dir() | ||
cdir = self.get_install_dir() # noqa: F841 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we have unused-variables here? (Applies to all occurrences) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cause we never linted this code. This code path also probably isn't in use... |
||
sstablerepairedset = self._find_cmd('sstablerepairedset') | ||
env = self.get_env() | ||
sstablefiles = self.__gather_sstables(datafiles, keyspace, column_families) | ||
|
@@ -1138,10 +1140,10 @@ def run_sstablerepairedset(self, set_repaired=True, datafiles=None, keyspace=Non | |
subprocess.call(cmd, env=env) | ||
|
||
def run_sstablelevelreset(self, keyspace, cf, output=False): | ||
cdir = self.get_install_dir() | ||
cdir = self.get_install_dir() # noqa: F841 | ||
sstablelevelreset = self._find_cmd('sstablelevelreset') | ||
env = self.get_env() | ||
sstablefiles = self.__cleanup_sstables(keyspace, cf) | ||
sstablefiles = self.__cleanup_sstables(keyspace, cf) # noqa: F841 | ||
|
||
cmd = sstablelevelreset + ["--really-reset", keyspace, cf] | ||
|
||
|
@@ -1154,10 +1156,10 @@ def run_sstablelevelreset(self, keyspace, cf, output=False): | |
return subprocess.call(cmd, env=env) | ||
|
||
def run_sstableofflinerelevel(self, keyspace, cf, dry_run=False, output=False): | ||
cdir = self.get_install_dir() | ||
cdir = self.get_install_dir() # noqa: F841 | ||
sstableofflinerelevel = self._find_cmd('sstableofflinerelevel') | ||
env = self.get_env() | ||
sstablefiles = self.__cleanup_sstables(keyspace, cf) | ||
sstablefiles = self.__cleanup_sstables(keyspace, cf) # noqa: F841 | ||
|
||
if dry_run: | ||
cmd = sstableofflinerelevel + ["--dry-run", keyspace, cf] | ||
|
@@ -1175,7 +1177,7 @@ def run_sstableofflinerelevel(self, keyspace, cf, dry_run=False, output=False): | |
def run_sstableverify(self, keyspace, cf, options=None, output=False): | ||
sstableverify = self.get_tool('sstableverify') | ||
env = self.get_env() | ||
sstablefiles = self.__cleanup_sstables(keyspace, cf) | ||
sstablefiles = self.__cleanup_sstables(keyspace, cf) # noqa: F841 | ||
if not isinstance(sstableverify, list): | ||
sstableverify = [sstableverify] | ||
cmd = sstableverify + [keyspace, cf] | ||
|
@@ -1201,7 +1203,7 @@ def _find_cmd(self, cmd): | |
fcmd = common.join_bin(cdir, 'bin', cmd) | ||
try: | ||
os.chmod(fcmd, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) | ||
except: | ||
except Exception: | ||
print(f"WARN: Couldn't change permissions to use {cmd}.") | ||
print("WARN: If it didn't work, you will have to do so manually.") | ||
return [fcmd] | ||
|
@@ -1879,7 +1881,7 @@ def _update_pid(self, process): | |
start = time.time() | ||
while not (os.path.isfile(pidfile) and os.stat(pidfile).st_size > 0): | ||
if (time.time() - start > 30.0): | ||
print("Timed out waiting for pidfile {} to be filled (current time is %s): File {} size={}".format( | ||
print("Timed out waiting for pidfile {} to be filled (current time is {}): File {} size={}".format( | ||
pidfile, | ||
datetime.now(), | ||
'exists' if os.path.isfile(pidfile) else 'does not exist' if not os.path.exists(pidfile) else 'is not a file', | ||
|
@@ -2085,7 +2087,7 @@ def make_pat_for_log_level(level): | |
matchings = [] | ||
it = iter(log.splitlines()) | ||
for line in it: | ||
l = line if case_sensitive else line.lower() | ||
l = line if case_sensitive else line.lower() # noqa: E741 | ||
is_error_line = (error_pat.search(l) and | ||
not debug_pat.search(error_pat.split(l)[0])) if not search_str else search_str in l | ||
if is_error_line: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No real need to change those comments