diff --git a/.gitignore b/.gitignore index 2b6fb2718a..4bd677e51c 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ upgrade html/ doxygen/doxypy-0.4.2/ .pytest_cache/ +.vscode/ + +pytest.ini \ No newline at end of file diff --git a/auth_test.py b/auth_test.py index 4bfe5003c9..6a63bdbb30 100644 --- a/auth_test.py +++ b/auth_test.py @@ -552,10 +552,16 @@ def test_materialized_views_auth(self): * Create a new user, 'cathy', with no permissions * Create a ks, table * Connect as cathy + * * Try CREATE MV without ALTER permission on base table, assert throws Unauthorized * Grant cathy ALTER permissions, then CREATE MV successfully + * + * Try to MODIFY base without WRITE permission on base, assert throws Unauthorized + * Grant cathy WRITE permissions on base, and modify base successfully + * * Try to SELECT from the mv, assert throws Unauthorized - * Grant cathy SELECT permissions, and read from the MV successfully + * Grant cathy SELECT permissions on base, and read from the MV successfully + * * Revoke cathy's ALTER permissions, assert DROP MV throws Unauthorized * Restore cathy's ALTER permissions, DROP MV successfully """ @@ -576,12 +582,36 @@ def test_materialized_views_auth(self): cassandra.execute("GRANT ALTER ON ks.cf TO cathy") cathy.execute(create_mv) - # TRY SELECT MV without SELECT permission on base table - assert_unauthorized(cathy, "SELECT * FROM ks.mv1", "User cathy has no SELECT permission on or any of its parents") + # Try MODIFY base without WRITE permission on base + assert_unauthorized(cathy, "INSERT INTO ks.cf(id, value) VALUES(1, '1')", "User cathy has no MODIFY permission on
or any of its parents") - # Grant SELECT permission and CREATE MV - cassandra.execute("GRANT SELECT ON ks.cf TO cathy") - cathy.execute("SELECT * FROM ks.mv1") + if self.cluster.version() >= LooseVersion('4.0'): + # From 4.0 onward, only base MODIFY permission is required to update base with MV + # Grant WRITE permission on Base + cassandra.execute("GRANT MODIFY ON ks.cf TO cathy") + cathy.execute("INSERT INTO ks.cf(id, value) VALUES(1, '1')") + + # TRY SELECT MV without SELECT permission on base table + assert_unauthorized(cathy, "SELECT * FROM ks.cf", "User cathy has no SELECT permission on
or any of its parents") + assert_unauthorized(cathy, "SELECT * FROM ks.mv1", "User cathy has no SELECT permission on
or any of its parents") + + # Grant SELECT permission + cassandra.execute("GRANT SELECT ON ks.cf TO cathy") + assert_one(cathy, "SELECT * FROM ks.cf", [1, '1']) + assert_one(cathy, "SELECT * FROM ks.mv1", ['1', 1]) + else: + # Before 4.0, MODIFY on MV is required to insert to base + # Grant WRITE permission on Base + cassandra.execute("GRANT MODIFY ON ks.cf TO cathy") + assert_unauthorized(cathy, "INSERT INTO ks.cf(id, value) VALUES(1, '1')", "User cathy has no SELECT permission on
or any of its parents") + cassandra.execute("GRANT SELECT ON ks.cf TO cathy") + assert_unauthorized(cathy, "INSERT INTO ks.cf(id, value) VALUES(1, '1')", "User cathy has no MODIFY permission on
or any of its parents") + + # Grant WRITE permission on MV + cassandra.execute("GRANT MODIFY ON ks.mv1 TO cathy") + cathy.execute("INSERT INTO ks.cf(id, value) VALUES(1, '1')") + assert_one(cathy, "SELECT * FROM ks.cf", [1, '1']) + assert_one(cathy, "SELECT * FROM ks.mv1", ['1', 1]) # Revoke ALTER permission and try DROP MV cassandra.execute("REVOKE ALTER ON ks.cf FROM cathy") diff --git a/bootstrap_test.py b/bootstrap_test.py index 51da884ed2..de58b8e5d0 100644 --- a/bootstrap_test.py +++ b/bootstrap_test.py @@ -849,6 +849,12 @@ def test_simultaneous_bootstrap(self): " cannot bootstrap while cassandra.consistent.rangemovement is true" cluster = self.cluster + configuration_options = { + 'request_timeout_in_ms': 120000, + 'read_request_timeout_in_ms': 120000, + 'range_request_timeout_in_ms': 120000 + } + cluster.set_configuration_options(configuration_options) cluster.set_environment_variable('CASSANDRA_TOKEN_PREGENERATION_DISABLED', 'True') cluster.populate(1) cluster.start() @@ -884,7 +890,10 @@ def test_simultaneous_bootstrap(self): # bugs like 9484, where count(*) fails at higher # data loads. for _ in range(5): - assert_one(session, "SELECT count(*) from keyspace1.standard1", [500000], cl=ConsistencyLevel.ONE) + logger.info("Querying: SELECT count(*) from keyspace1.standard1") + # Improve reliability for slower/loaded test systems by using larger client timeout + assert_one(session, "SELECT count(*) from keyspace1.standard1", [500000], cl=ConsistencyLevel.ONE, timeout=180) + logger.info("Querying completed") def test_cleanup(self): """ @@ -1019,6 +1028,56 @@ def test_bootstrap_binary_disabled(self): assert_bootstrap_state(self, node3, 'COMPLETED', user='cassandra', password='cassandra') node3.wait_for_binary_interface() + @since('4.0') + @pytest.mark.no_vnodes + def test_simple_bootstrap_with_everywhere_strategy(self): + cluster = self.cluster + tokens = cluster.balanced_tokens(2) + cluster.set_configuration_options(values={'num_tokens': 1}) + + logger.debug("[node1, node2] tokens: %r" % (tokens,)) + + keys = 10000 + + # Create a single node cluster + cluster.populate(1) + node1 = cluster.nodelist()[0] + node1.set_configuration_options(values={'initial_token': tokens[0]}) + cluster.start() + + session = self.patient_cql_connection(node1) + create_ks(session, 'ks', 'EverywhereStrategy') + create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'}) + + insert_statement = session.prepare("INSERT INTO ks.cf (key, c1, c2) VALUES (?, 'value1', 'value2')") + execute_concurrent_with_args(session, insert_statement, [['k%d' % k] for k in range(keys)]) + + node1.flush() + node1.compact() + + # Reads inserted data all during the bootstrap process. We shouldn't + # get any error + query_c1c2(session, random.randint(0, keys - 1), ConsistencyLevel.ONE) + session.shutdown() + + # Bootstrapping a new node in the current version + node2 = new_node(cluster) + node2.set_configuration_options(values={'initial_token': tokens[1]}) + node2.start(wait_for_binary_proto=True) + node2.compact() + + node1.cleanup() + logger.debug("node1 size for ks.cf after cleanup: %s" % float(data_size(node1,'ks','cf'))) + node1.compact() + logger.debug("node1 size for ks.cf after compacting: %s" % float(data_size(node1,'ks','cf'))) + + logger.debug("node2 size for ks.cf after compacting: %s" % float(data_size(node2,'ks','cf'))) + + size1 = float(data_size(node1,'ks','cf')) + size2 = float(data_size(node2,'ks','cf')) + assert_almost_equal(size1, size2, error=0.3) + + assert_bootstrap_state(self, node2, 'COMPLETED') class TestBootstrap(BootstrapTester): """ diff --git a/byteman/guardrails/disk_usage_full.btm b/byteman/guardrails/disk_usage_full.btm new file mode 100644 index 0000000000..bbdf8ddca9 --- /dev/null +++ b/byteman/guardrails/disk_usage_full.btm @@ -0,0 +1,8 @@ +RULE return FULL disk usage +CLASS org.apache.cassandra.service.disk.usage.DiskUsageMonitor +METHOD getState +AT EXIT +IF TRUE +DO + return org.apache.cassandra.service.disk.usage.DiskUsageState.FULL; +ENDRULE \ No newline at end of file diff --git a/byteman/guardrails/disk_usage_stuffed.btm b/byteman/guardrails/disk_usage_stuffed.btm new file mode 100644 index 0000000000..3256211304 --- /dev/null +++ b/byteman/guardrails/disk_usage_stuffed.btm @@ -0,0 +1,8 @@ +RULE return STUFFED disk usage +CLASS org.apache.cassandra.service.disk.usage.DiskUsageMonitor +METHOD getState +AT EXIT +IF TRUE +DO + return org.apache.cassandra.service.disk.usage.DiskUsageState.STUFFED; +ENDRULE \ No newline at end of file diff --git a/byteman/merge_schema_failure_4x.btm b/byteman/merge_schema_failure_4x.btm index bee5c3c731..e27a547f71 100644 --- a/byteman/merge_schema_failure_4x.btm +++ b/byteman/merge_schema_failure_4x.btm @@ -3,7 +3,7 @@ # RULE inject node failure on merge schema exit CLASS org.apache.cassandra.schema.Schema -METHOD merge +METHOD mergeAndUpdateVersion AT EXIT # set flag to only run this rule once. IF TRUE diff --git a/client_request_metrics_test.py b/client_request_metrics_test.py index ec45dc2059..322e24f9db 100644 --- a/client_request_metrics_test.py +++ b/client_request_metrics_test.py @@ -42,7 +42,7 @@ def fixture_add_additional_log_patterns(self, fixture_dtest_setup): fixture_dtest_setup.ignore_log_patterns = ( 'Testing write failures', # The error to simulate a write failure 'ERROR WRITE_FAILURE', # Logged in DEBUG mode for write failures - f"Scanned over {TOMBSTONE_FAILURE_THRESHOLD + 1} tombstones during query" # Caused by the read failure tests + f"Scanned over {TOMBSTONE_FAILURE_THRESHOLD + 1} (tombstones|tombstone rows) during query" # Caused by the read failure tests ) def setup_once(self): @@ -50,6 +50,7 @@ def setup_once(self): cluster.set_configuration_options({'read_request_timeout_in_ms': 3000, 'write_request_timeout_in_ms': 3000, 'phi_convict_threshold': 12, + 'tombstone_warn_threshold': -1, 'tombstone_failure_threshold': TOMBSTONE_FAILURE_THRESHOLD, 'enable_materialized_views': 'true'}) cluster.populate(2, debug=True) diff --git a/compaction_test.py b/compaction_test.py index 55fa0167f9..107b0c8b7f 100644 --- a/compaction_test.py +++ b/compaction_test.py @@ -15,7 +15,7 @@ since = pytest.mark.since logger = logging.getLogger(__name__) -strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy'] +strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy', 'UnifiedCompactionStrategy'] class TestCompaction(Tester): @@ -114,6 +114,8 @@ def test_bloomfilter_size(self, strategy): else: if strategy == "DateTieredCompactionStrategy": strategy_string = "strategy=DateTieredCompactionStrategy,base_time_seconds=86400" # we want a single sstable, so make sure we don't have a tiny first window + elif self.strategy == "UnifiedCompactionStrategy": + strategy_string = "strategy=UnifiedCompactionStrategy,max_sstables_to_compact=4" # disable layout-preserving compaction which can leave more than one sstable else: strategy_string = "strategy={}".format(strategy) min_bf_size = 100000 @@ -121,18 +123,21 @@ def test_bloomfilter_size(self, strategy): cluster = self.cluster cluster.populate(1).start() [node1] = cluster.nodelist() + logger.debug("Compaction: " + strategy_string) for x in range(0, 5): node1.stress(['write', 'n=100K', "no-warmup", "cl=ONE", "-rate", "threads=300", "-schema", "replication(factor=1)", "compaction({},enabled=false)".format(strategy_string)]) node1.flush() + logger.debug(node1.nodetool('cfstats keyspace1.standard1').stdout) node1.nodetool('enableautocompaction') node1.wait_for_compactions() table_name = 'standard1' - output = node1.nodetool('cfstats').stdout + output = node1.nodetool('cfstats keyspace1.standard1').stdout + logger.debug(output) output = output[output.find(table_name):] output = output[output.find("Bloom filter space used"):] bfSize = int(output[output.find(":") + 1:output.find("\n")].strip()) @@ -153,7 +158,12 @@ def test_bloomfilter_size(self, strategy): logger.debug("bloom filter size is: {}".format(bfSize)) logger.debug("size factor = {}".format(size_factor)) - assert bfSize >= size_factor * min_bf_size + # In the case where the number of sstables is greater than the number of directories, it's possible this to be + # both with unique keys (where the bf size will remain close to the unadjusted limit) or with repetitions + # of keys (where the bf size will be a multiple of the expected). Permit both by only using the size factor on + # the maximum size. Note that the test is designed to end up with size_factor == 1 and most runs do so, thus + # this is not a loosening of the test in the common case, only ensures that we don't end up with flakes. + assert bfSize >= min_bf_size assert bfSize <= size_factor * max_bf_size @pytest.mark.parametrize("strategy", strategies) @@ -298,7 +308,7 @@ def test_compaction_strategy_switching(self, strategy): Ensure that switching strategies does not result in problems. Insert data, switch strategies, then check against data loss. """ - strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy'] + strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy', 'UnifiedCompactionStrategy'] if strategy in strategies: strategies.remove(strategy) @@ -307,6 +317,7 @@ def test_compaction_strategy_switching(self, strategy): [node1] = cluster.nodelist() for strat in strategies: + logger.debug("Switching to {}".format(strat)) session = self.patient_cql_connection(node1) create_ks(session, 'ks', 1) @@ -339,7 +350,10 @@ def test_large_compaction_warning(self): Check that we log a warning when the partition size is bigger than compaction_large_partition_warning_threshold_mb """ cluster = self.cluster - cluster.set_configuration_options({'compaction_large_partition_warning_threshold_mb': 1}) + if self.supports_guardrails: + cluster.set_configuration_options({'guardrails': {'partition_size_warn_threshold_in_mb': 1}}) + else: + cluster.set_configuration_options({'compaction_large_partition_warning_threshold_mb': 1}) cluster.populate(1).start() [node] = cluster.nodelist() @@ -361,7 +375,10 @@ def test_large_compaction_warning(self): node.nodetool('compact ks large') verb = 'Writing' if self.cluster.version() > '2.2' else 'Compacting' sizematcher = '\d+ bytes' if self.cluster.version() < LooseVersion('3.6') else '\d+\.\d{3}(K|M|G)iB' - node.watch_log_for('{} large partition ks/large:user \({}'.format(verb, sizematcher), from_mark=mark, timeout=180) + log_message = '{} large partition ks/large:user \({}'.format(verb, sizematcher) + if self.supports_guardrails: + log_message = "Detected partition 'user' in ks.large of size 2MB is greater than the maximum recommended size \(1MB\)" + node.watch_log_for(log_message, from_mark=mark, timeout=180) ret = list(session.execute("SELECT properties from ks.large where userid = 'user'")) assert_length_equal(ret, 1) diff --git a/conftest.py b/conftest.py index 19c308325c..6594eebfd2 100644 --- a/conftest.py +++ b/conftest.py @@ -42,6 +42,9 @@ def check_required_loopback_interfaces_available(): def pytest_addoption(parser): + parser.addoption("--sstable-format", action="store", default="bti", + help="SSTable format to be used by default for all newly created SSTables: " + "big or bti (default: bti)") parser.addoption("--use-vnodes", action="store_true", default=False, help="Determines wither or not to setup clusters using vnodes for tests") parser.addoption("--use-off-heap-memtables", action="store_true", default=False, @@ -175,6 +178,13 @@ class level seems to work, and I guess it's not that much extra overhead to setu if pytest.config.inicfg.get("log_format") is not None: logging_format = pytest.config.inicfg.get("log_format") + # ccm logger is configured to spit everything to console + # we want it to use logging setup configured for tests + # unless we do that, we get duplicated log records from ccm module + ccmLogger = logging.getLogger("ccm") + for handler in ccmLogger.handlers: + logging.getLogger("ccm").removeHandler(handler) + logging.basicConfig(level=log_level, format=logging_format) diff --git a/cqlsh_tests/cqlshrc.sample.cloud b/cqlsh_tests/cqlshrc.sample.cloud new file mode 100644 index 0000000000..62528670c4 --- /dev/null +++ b/cqlsh_tests/cqlshrc.sample.cloud @@ -0,0 +1,17 @@ +; Copyright DataStax, Inc. +; +; Licensed under the Apache License, Version 2.0 (the "License"); +; you may not use this file except in compliance with the License. +; You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, software +; distributed under the License is distributed on an "AS IS" BASIS, +; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +; See the License for the specific language governing permissions and +; limitations under the License. +; +; Sample ~/.cqlshrc file with cloud configuration. +[connection] +secure_connect_bundle = /path/to/creds.zip diff --git a/cqlsh_tests/secure-connect-test.zip b/cqlsh_tests/secure-connect-test.zip new file mode 100644 index 0000000000..bcd4a7fb2a Binary files /dev/null and b/cqlsh_tests/secure-connect-test.zip differ diff --git a/cqlsh_tests/test_cqlsh.py b/cqlsh_tests/test_cqlsh.py index 69ca0a9394..87ffdddf63 100644 --- a/cqlsh_tests/test_cqlsh.py +++ b/cqlsh_tests/test_cqlsh.py @@ -24,6 +24,7 @@ from cassandra.concurrent import execute_concurrent_with_args from cassandra.query import BatchStatement, BatchType from ccmlib import common +from ccmlib.node import ToolError from .cqlsh_tools import monkeypatch_driver, unmonkeypatch_driver from dtest import Tester, create_ks, create_cf @@ -97,6 +98,7 @@ def run_cqlsh(self, node, cmds, cqlsh_options=None, env_vars=None): logger.debug("Cqlsh command stderr:\n" + stderr) return stdout, stderr + class TestCqlsh(Tester, CqlshMixin): # override cluster options to enable user defined functions @@ -1161,6 +1163,7 @@ def get_test_table_output(self, has_val=True, has_val_idx=True): AND comment = '' AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} + AND memtable = {} AND crc_check_chance = 1.0 AND default_time_to_live = 0 AND extensions = {} @@ -1275,6 +1278,7 @@ def get_users_table_output(self): AND comment = '' AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} + AND memtable = {} AND crc_check_chance = 1.0 AND default_time_to_live = 0 AND extensions = {} @@ -1406,6 +1410,7 @@ def get_users_by_state_mv_output(self): AND comment = '' AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} + AND memtable = {} AND crc_check_chance = 1.0 AND extensions = {} AND gc_grace_seconds = 864000 @@ -1936,8 +1941,11 @@ def test_client_warnings(self): """ max_partitions_per_batch = 5 self.cluster.populate(3) - self.cluster.set_configuration_options({ - 'unlogged_batch_across_partitions_warn_threshold': str(max_partitions_per_batch)}) + + config_opts = {'unlogged_batch_across_partitions_warn_threshold': str(max_partitions_per_batch)} + if self.supports_guardrails: + config_opts = {"guardrails": config_opts} + self.cluster.set_configuration_options(config_opts) self.cluster.start() @@ -1987,6 +1995,52 @@ def test_connect_timeout(self): stdout, stderr = self.run_cqlsh(node1, cmds='USE system', cqlsh_options=['--debug', '--connect-timeout=10']) assert "Using connect timeout: 10 seconds" in stderr + @since('4.0') + def test_consistency_level_options(self): + """ + Tests for new cmdline consistency options: + - consistency-level + - serial-consistency-level + @jira_ticket STAR-432 + """ + self.cluster.populate(1) + self.cluster.start() + + node1, = self.cluster.nodelist() + + def expect_output_no_errors(cmd, options, output): + stdout, stderr = self.run_cqlsh(node1, cmds=cmd, cqlsh_options=options) + assert output in stdout, stderr + assert stderr == '' + + expect_output_no_errors('CONSISTENCY', [], + 'Current consistency level is ONE.') + + expect_output_no_errors('CONSISTENCY', ['--consistency-level', 'quorum'], + 'Current consistency level is QUORUM.') + + expect_output_no_errors('SERIAL CONSISTENCY', [], + 'Current serial consistency level is SERIAL.') + + expect_output_no_errors('SERIAL CONSISTENCY', ['--serial-consistency-level', 'local_serial'], + 'Current serial consistency level is LOCAL_SERIAL.') + + def expect_error(cmd, options, error_msg): + stdout, stderr = self.run_cqlsh(node1, cmds=cmd, cqlsh_options=options) + assert error_msg in stderr + + expect_error('CONSISTENCY', ['--consistency-level', 'foop'], + '"foop" is not a valid consistency level') + + expect_error('CONSISTENCY', ['--consistency-level', 'serial'], + '"serial" is not a valid consistency level') + + expect_error('SERIAL CONSISTENCY', ['--serial-consistency-level', 'foop'], + '"foop" is not a valid serial consistency level') + + expect_error('SERIAL CONSISTENCY', ['--serial-consistency-level', 'ONE'], + '"ONE" is not a valid serial consistency level') + @since('3.0.19') def test_protocol_negotiation(self): """ @@ -2656,6 +2710,50 @@ def test_cjk_output(self): """ assert stdout_lines_sorted.find(expected) >= 0 + @since('4.0') + def test_no_file_io(self): + def run_cqlsh_catch_toolerror(cmd, env): + """ + run_cqlsh will throw ToolError if cqlsh exits with a non-zero exit code. + """ + out = "" + err = "" + try: + out, err, _ = self.node1.run_cqlsh(cmd, env) + except ToolError as e: + return e.stdout, e.stderr + return out, err + + create_ks(self.session, 'foo', rf=1) + create_cf(self.session, 'bar', key_type='int', columns={'name': 'text'}) + + cqlsh_stdout, cqlsh_stderr, _ = self.node1.run_cqlsh('COPY foo.bar TO \'/dev/null\';', []) + assert '0 rows exported to 1 files' in cqlsh_stdout + assert cqlsh_stderr == '' + cqlsh_stdout, cqlsh_stderr = run_cqlsh_catch_toolerror('COPY foo.bar TO \'/dev/null\';', ['--no-file-io']) + assert cqlsh_stdout == '' + assert 'No file I/O permitted' in cqlsh_stderr + + cqlsh_stdout, cqlsh_stderr = run_cqlsh_catch_toolerror('DEBUG', []) + assert '(Pdb)' in cqlsh_stdout + cqlsh_stdout, cqlsh_stderr = run_cqlsh_catch_toolerror('DEBUG', ['--no-file-io']) + assert cqlsh_stdout == '' + assert 'No file I/O permitted' in cqlsh_stderr + + cqlsh_stdout, cqlsh_stderr = run_cqlsh_catch_toolerror('CAPTURE \'nah\'', []) + assert cqlsh_stdout == 'Now capturing query output to \'nah\'.\n' + assert cqlsh_stderr == '' + cqlsh_stdout, cqlsh_stderr = run_cqlsh_catch_toolerror('CAPTURE \'nah\'', ['--no-file-io']) + assert cqlsh_stdout == '' + assert 'No file I/O permitted' in cqlsh_stderr + + cqlsh_stdout, cqlsh_stderr = run_cqlsh_catch_toolerror('SOURCE \'nah\'', []) + assert cqlsh_stdout == '' + assert cqlsh_stderr == '' + cqlsh_stdout, cqlsh_stderr = run_cqlsh_catch_toolerror('SOURCE \'nah\'', ['--no-file-io']) + assert cqlsh_stdout == '' + assert 'No file I/O permitted' in cqlsh_stderr + class TestCqlLogin(Tester, CqlshMixin): """ diff --git a/cqlsh_tests/test_cqlsh_cloud.py b/cqlsh_tests/test_cqlsh_cloud.py new file mode 100644 index 0000000000..626eaaba09 --- /dev/null +++ b/cqlsh_tests/test_cqlsh_cloud.py @@ -0,0 +1,125 @@ +# coding=utf-8 + +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import pytest +from ccmlib.node import ToolError + +from dtest import Tester + +logger = logging.getLogger(__name__) +since = pytest.mark.since + + +@since("4.0") +class TestSecureBundleConnection(Tester): + """ + Tests related to cqlsh behavior for cloud e.g. secure bundle connection. + We only test cqlshrc behavior. + Testing if the connection using secure bundle is really working + requires a true cluster with generated secure bundle to run. + And this is not possible without testing infrastructure/tooling changes. + + We can assume that it is correctly tested by the python driver or + will be tested in the next stage of testing (cloud native). + + Validation is done using --debug information or error msgs. + + Inspired by STAR-765. + """ + + CQLSHRC_PATH = 'cqlsh_tests/cqlshrc.sample.cloud' + BUNDLE_PATH = 'cqlsh_tests/secure-connect-test.zip' + + def prepare(self, start=False): + if not self.cluster.nodelist(): + self.cluster.populate(1) + if start: + self.cluster.start() + return self.cluster.nodelist()[0] + + def _expect_tool_error(self, cmds, options, msg): + node = self.cluster.nodelist()[0] + with pytest.raises(ToolError, match=msg): + out, err, _ = node.run_cqlsh(cmds=cmds, cqlsh_options=options) + return out, err + + def test_start_fails_on_non_existing_file(self): + self.prepare() + self._expect_tool_error(cmds='HELP', + options=['--secure-connect-bundle', 'not-existing-file.zip'], + msg='No such file or directory') + + def test_start_fails_when_file_not_a_bundle(self): + self.prepare() + self._expect_tool_error(cmds='HELP', + options=['--secure-connect-bundle', self.CQLSHRC_PATH], + msg='Unable to open the zip file for the cloud config') + + def test_read_bundle_path_from_cqlshrc(self): + self.prepare() + self._expect_tool_error(cmds='HELP', + options=['--cqlshrc', self.CQLSHRC_PATH], + msg="No such file or directory: '/path/to/creds.zip'") + + def test_host_and_port_are_ignored_with_secure_bundle(self): + # it should connect with provided host and port to the started ccm node + node = self.prepare(start=True) + node.run_cqlsh("HELP", []) + # but fail with secure bundle even if port and host are set + expected_msg = "https://1263dd11-0aa5-41ef-8e56-17fa5fc7036e-europe-west1.db.astra.datastax.com:31669" + self._expect_tool_error(cmds='HELP', + options=['--secure-connect-bundle', self.BUNDLE_PATH, node.ip_addr, '9042'], + msg=expected_msg) + + def test_default_consistency_level_for_secure_connect_bundle_param(self): + self.prepare() + self._expect_tool_error(cmds='HELP', + options=['--secure-connect-bundle', 'not-existing-file.zip', '--debug'], + msg='Using consistency level:.*LOCAL_QUORUM') + + def test_default_consistency_level_for_secure_connect_bundle_in_clqshrc(self): + self.prepare() + self._expect_tool_error(cmds='HELP', + options=['--cqlshrc', self.CQLSHRC_PATH, '--debug'], + msg='Using consistency level:.*LOCAL_QUORUM') + + def test_set_consistency_level_for_secure_connect_bundle_in_clqshrc(self): + self.prepare() + self._expect_tool_error(cmds='HELP', + options=['--cqlshrc', self.CQLSHRC_PATH, '--debug', '--consistency-level', 'TWO'], + msg='Using consistency level:.*TWO') + + def test_debug_should_include_cloud_details(self): + self.prepare() + self._expect_tool_error(cmds='HELP', + options=['--secure-connect-bundle', 'not-existing-file.zip', '--debug'], + msg='Using secure connect bundle.*not-existing-file.zip') + + @pytest.mark.skip("we cannot test it without ccm secure conn bundle support in ccm") + def test_endpoint_load_balancing_policy_is_used(self): + # to test this we would need a 3 nodes cloud cluster + assert False, "TODO: implement" + + @pytest.mark.skip("we cannot test it without ccm secure conn bundle support in ccm") + def test_connects_correctly(self): + assert False, "TODO: implement" + + @pytest.mark.skip("we cannot test it without ccm secure conn bundle support in ccm") + def test_login_command_keeps_cloud_connection_using_bundle(self): + # cqlsh.py -b some-bundle.zip -u user -p password + # LOGIN user(password) + assert False diff --git a/cqlsh_tests/test_cqlsh_copy.py b/cqlsh_tests/test_cqlsh_copy.py index 396de32e15..a2e6913655 100644 --- a/cqlsh_tests/test_cqlsh_copy.py +++ b/cqlsh_tests/test_cqlsh_copy.py @@ -123,9 +123,9 @@ def prepare(self, nodes=1, partitioner="murmur3", configuration_options=None, to if auth_enabled: self.node1.watch_log_for('Created default superuser') - self.session = self.patient_cql_connection(self.node1, user='cassandra', password='cassandra') + self.session = self.patient_cql_connection(self.node1, user='cassandra', password='cassandra', request_timeout=30.0) else: - self.session = self.patient_cql_connection(self.node1) + self.session = self.patient_cql_connection(self.node1, request_timeout=30.0) self.session.execute('DROP KEYSPACE IF EXISTS ks') self.ks = 'ks' @@ -2387,6 +2387,9 @@ def _test_bulk_round_trip(self, nodes, partitioner, # enough for truncating larger tables, see CASSANDRA-11157 if 'truncate_request_timeout_in_ms' not in configuration_options: configuration_options['truncate_request_timeout_in_ms'] = 60000 + configuration_options['request_timeout_in_ms'] = 60000 + configuration_options['read_request_timeout_in_ms'] = 60000 + configuration_options['range_request_timeout_in_ms'] = 60000 self.prepare(nodes=nodes, partitioner=partitioner, configuration_options=configuration_options) @@ -2406,7 +2409,7 @@ def create_records(): else: count_statement = SimpleStatement("SELECT COUNT(*) FROM {}".format(stress_table), consistency_level=ConsistencyLevel.ALL, retry_policy=FlakyRetryPolicy(max_retries=3)) - ret = rows_to_list(self.session.execute(count_statement))[0][0] + ret = rows_to_list(self.session.execute(query=count_statement, timeout=60.0))[0][0] logger.debug('Generated {} records'.format(ret)) assert ret >= num_operations, 'cassandra-stress did not import enough records' return ret @@ -2481,8 +2484,12 @@ def test_bulk_round_trip_blogposts(self): @jira_ticket CASSANDRA-9302 """ + config_opts = {'batch_size_warn_threshold_in_kb': '10'} + if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0 + config_opts = {'guardrails': config_opts} + self._test_bulk_round_trip(nodes=3, partitioner="murmur3", num_operations=10000, - configuration_options={'batch_size_warn_threshold_in_kb': '10'}, + configuration_options=config_opts, profile=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'blogposts.yaml'), stress_table='stresscql.blogposts') @@ -2495,14 +2502,22 @@ def test_bulk_round_trip_blogposts_with_max_connections(self): @jira_ticket CASSANDRA-10938 """ + batch_size_warn_threshold_in_kb = '10' + native_transport_max_concurrent_connections = '12' + if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0 + config_opts = {'guardrails': {'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb}, + 'native_transport_max_concurrent_connections': native_transport_max_concurrent_connections} + else: + config_opts = {'native_transport_max_concurrent_connections': native_transport_max_concurrent_connections, + 'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb} self._test_bulk_round_trip(nodes=3, partitioner="murmur3", num_operations=10000, - configuration_options={'native_transport_max_concurrent_connections': '12', - 'batch_size_warn_threshold_in_kb': '10'}, + configuration_options=config_opts, profile=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'blogposts.yaml'), stress_table='stresscql.blogposts', copy_to_options={'NUMPROCESSES': 5, 'MAXATTEMPTS': 20}, copy_from_options={'NUMPROCESSES': 2}) + @pytest.mark.skip(reason="test is not reliable and sensitive to vm power") def test_bulk_round_trip_with_timeouts(self): """ Test bulk import with very short read and write timeout values, this should exercise the @@ -2827,8 +2842,13 @@ def test_copy_from_with_large_cql_rows(self): @jira_ticket CASSANDRA-11474 """ num_records = 100 - self.prepare(nodes=1, configuration_options={'batch_size_warn_threshold_in_kb': '1', # warn with 1kb and fail - 'batch_size_fail_threshold_in_kb': '5'}) # with 5kb size batches + batch_size_warn_threshold_in_kb = '1' # warn with 1kb and fail + batch_size_fail_threshold_in_kb = '5' # with 5kb size batches + config_opts = {'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb, + 'batch_size_fail_threshold_in_kb': batch_size_fail_threshold_in_kb} + if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0 + config_opts = {'guardrails': config_opts} + self.prepare(nodes=1, configuration_options=config_opts) logger.debug('Running stress') stress_table_name = 'standard1' @@ -3336,3 +3356,79 @@ def _test_invalid_data_for_maps(): _test_invalid_data_for_sets() _test_invalid_data_for_lists() _test_invalid_data_for_maps() + + @since('4.0') + def test_geotypes_copy(self): + """ + Tests whether cqlsh COPY properly handles geo types with empty and null values. + + @since 4.0.0 + + Steps: + * insert several geoTypes with null and empty values among them into a table + * cqlsh copy contents to .csv file and save them in a list + * wipe the table comletely of all data + * cqlsh copy contents from .csv back into the table + * get the contents of the table into a list + * assert the pre wiped data is the same as the newly copied data + :return + """ + self.prepare() + + self.session.execute("create table geo (k int primary key, point 'PointType', line 'LineStringType', poly 'PolygonType');") + self.session.execute("insert into geo (k, point, line, poly) VALUES (0, 'point(1.2 3.4)', 'linestring(1.0 1.1, 2.0 2.1, 3.0 3.1)', 'POLYGON ((10.1 10.0, 110.0 10.0, 110.0 110.0, 10.0 110.0, 10.0 10.0), (20.0 20.0, 20.0 30.0, 30.0 30.0, 30.0 20.0, 20.0 20.0))');") + self.session.execute("insert into geo (k, point, line, poly) VALUES (2, 'point(1.2 3.4)', 'linestring EMPTY', 'POLYGON EMPTY');") + self.session.execute("insert into geo (k) VALUES (1);") + + # make sure data is inserted + data_actual = rows_to_list(self.session.execute("select * from geo;")) + assert len(data_actual) == 3 + + # dump data to CSV and truncate + tempfile = self.get_temp_file() + self.run_cqlsh(cmds="COPY ks.geo TO '{name}'".format(name=tempfile.name)) + self.run_cqlsh(cmds="truncate ks.geo;") + + # import data back + self.run_cqlsh(cmds="COPY ks.geo FROM '{name}'".format(name=tempfile.name)) + data_copy = rows_to_list(self.session.execute("select * from geo;")) + + assert data_actual == data_copy + + @since("4.0") + def test_date_range_copy(self): + """ + DateRangeTests.test_copy_command + + Tests whether cqlsh COPY properly handles date_range types, including null values. + @note we cannot insert empty value ('') as it is not presented as null in cqlsh but it is in COPY + """ + self.prepare() + + self.session.execute("create table incomes (org text, period 'DateRangeType', incomes int, ver 'DateRangeType', primary key (org, period));") + # insert some data + self.session.execute("insert into incomes(org, period, incomes) values ('A','2014', 20140);") + self.session.execute("insert into incomes(org, period, incomes) values ('A','2015', 201500);") + self.session.execute("insert into incomes(org, period, incomes) values ('A','[2016-01-01 TO 2016-06-30]', 1007);") + self.session.execute("insert into incomes(org, period, incomes) values ('B','[2017-02-12 12:30:07 TO 2017-02-17 13:39:43.789]', 777);") + self.session.execute("insert into incomes(org, period, incomes, ver) values ('X','2011', 0, null);") + self.session.execute("insert into incomes(org, period, incomes) values ('C','*', 996);") + self.session.execute("insert into incomes(org, period, incomes) values ('C','[* TO *]', 997);") + self.session.execute("insert into incomes(org, period, incomes) values ('C','[* TO 2015-01]', 998);") + self.session.execute("insert into incomes(org, period, incomes) values ('C','[2015-01 TO *]', 999);") + + # make sure data is inserted + data_actual = rows_to_list(self.session.execute("select * from incomes;")) + assert len(data_actual) == 9 + + # dump data to CSV and truncate + tempfile = self.get_temp_file() + self.run_cqlsh(cmds="COPY ks.incomes TO '{name}'".format(name=tempfile.name)) + self.run_cqlsh(cmds="truncate ks.incomes;") + + # import data back + self.run_cqlsh(cmds="COPY ks.incomes FROM '{name}'".format(name=tempfile.name)) + data_copy = rows_to_list(self.session.execute("select * from incomes;")) + + assert data_actual == data_copy + diff --git a/cqlsh_tests/test_cqlsh_types.py b/cqlsh_tests/test_cqlsh_types.py new file mode 100644 index 0000000000..11e4604c7e --- /dev/null +++ b/cqlsh_tests/test_cqlsh_types.py @@ -0,0 +1,67 @@ +import logging +import pytest + +from dtest import Tester, create_ks + +logger = logging.getLogger(__name__) +since = pytest.mark.since + + +@since("4.0") +class TestCqlshTypes(Tester): + + def prepare(self, workload=None): + if not self.cluster.nodelist(): + self.allow_log_errors = True + self.cluster.populate(1) + if workload is not None: + for node in self.cluster.nodelist(): + node.set_workload(workload) + logger.debug('About to start cluster') + self.cluster.start() + logger.debug('Cluster started') + for node in self.cluster.nodelist(): + node.watch_log_for('Starting listening for CQL clients', timeout=60) + self.cluster.nodelist()[0].watch_log_for('Created default superuser') + self.node = self.cluster.nodelist()[0] + + conn = self.patient_cql_connection(self.node) + create_ks(conn, 'ks', 1) + + logger.debug('prepare completed') + + def test_point(self): + self.prepare() + + expected = 'POINT (1.2 2.3)' + self.node.run_cqlsh("CREATE TABLE ks.point_tbl (k INT PRIMARY KEY, point 'PointType');") + self.node.run_cqlsh("INSERT INTO ks.point_tbl (k, point) VALUES (1, '{}')".format(expected)) + result = self.node.run_cqlsh("SELECT * FROM ks.point_tbl;") + assert expected in result[0], result + + def test_linestring(self): + self.prepare() + + expected = 'LINESTRING (30.0 10.0, 10.0 30.0, 40.0 40.0)' + self.node.run_cqlsh("CREATE TABLE ks.line_tbl (k INT PRIMARY KEY, linestring 'LineStringType');") + self.node.run_cqlsh("INSERT INTO ks.line_tbl (k, linestring) VALUES (1, '{}')".format(expected)) + result = self.node.run_cqlsh("SELECT * FROM ks.line_tbl;") + assert expected in result[0], result + + def test_polygon(self): + self.prepare() + + expected = 'POLYGON ((30.0 10.0, 40.0 40.0, 20.0 40.0, 10.0 20.0, 30.0 10.0))' + self.node.run_cqlsh("CREATE TABLE ks.polygon_tbl (k INT PRIMARY KEY, polygon 'PolygonType');") + self.node.run_cqlsh("INSERT INTO ks.polygon_tbl (k, polygon) VALUES (1, '{}')".format(expected)) + result = self.node.run_cqlsh("SELECT * FROM ks.polygon_tbl;") + assert expected in result[0], result + + def test_date_range(self): + self.prepare() + + expected = '[2015-01 TO *]' + self.node.run_cqlsh("CREATE TABLE ks.date_range_tbl (k INT PRIMARY KEY, date_range_tbl 'DateRangeType');") + self.node.run_cqlsh("INSERT INTO ks.date_range_tbl (k, date_range_tbl) VALUES (1, '{}')".format(expected)) + result = self.node.run_cqlsh("SELECT * FROM ks.date_range_tbl;") + assert expected in result[0], result diff --git a/disk_balance_test.py b/disk_balance_test.py index ceadf98a80..e1b1ca9ec1 100644 --- a/disk_balance_test.py +++ b/disk_balance_test.py @@ -24,6 +24,10 @@ class TestDiskBalance(Tester): @jira_ticket CASSANDRA-6696 """ + STCS_COMPACTION_OPTS = "SizeTieredCompactionStrategy" + LCS_COMPACTION_OPTS = "LeveledCompactionStrategy,sstable_size_in_mb=1" + UCS_COMPACTION_OPTS = "UnifiedCompactionStrategy" + @pytest.fixture(scope='function', autouse=True) def fixture_set_cluster_settings(self, fixture_dtest_setup): cluster = fixture_dtest_setup.cluster @@ -190,16 +194,23 @@ def test_disk_balance_after_boundary_change_stcs(self): """ @jira_ticket CASSANDRA-13948 """ - self._disk_balance_after_boundary_change_test(lcs=False) + self._disk_balance_after_boundary_change_test(self.STCS_COMPACTION_OPTS) @since('3.10') def test_disk_balance_after_boundary_change_lcs(self): """ @jira_ticket CASSANDRA-13948 """ - self._disk_balance_after_boundary_change_test(lcs=True) + self._disk_balance_after_boundary_change_test(self.LCS_COMPACTION_OPTS) + + @since('4.0') + def test_disk_balance_after_boundary_change_ucs(self): + """ + @jira_ticket CASSANDRA-13948 + """ + self._disk_balance_after_boundary_change_test(self.UCS_COMPACTION_OPTS) - def _disk_balance_after_boundary_change_test(self, lcs): + def _disk_balance_after_boundary_change_test(self, compaction_opts): """ @jira_ticket CASSANDRA-13948 @@ -230,7 +241,6 @@ def _disk_balance_after_boundary_change_test(self, lcs): keys_per_flush = 10000 keys_to_write = num_flushes * keys_per_flush - compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if lcs else "SizeTieredCompactionStrategy" logger.debug("Writing {} keys in {} flushes (compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts)) total_keys = num_flushes * keys_per_flush current_keys = 0 @@ -254,29 +264,36 @@ def _disk_balance_after_boundary_change_test(self, lcs): node2.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds=10"], set_migration_task=False) node2.flush() - self._assert_balanced_after_boundary_change(node1, total_keys, lcs) + self._assert_balanced_after_boundary_change(node1, total_keys, compaction_opts) logger.debug("Decommissioning node1") node1.decommission() node1.stop() - self._assert_balanced_after_boundary_change(node2, total_keys, lcs) + self._assert_balanced_after_boundary_change(node2, total_keys, compaction_opts) @since('3.10') def test_disk_balance_after_joining_ring_stcs(self): """ @jira_ticket CASSANDRA-13948 """ - self._disk_balance_after_joining_ring_test(lcs=False) + self._disk_balance_after_joining_ring_test(self.STCS_COMPACTION_OPTS) @since('3.10') def test_disk_balance_after_joining_ring_lcs(self): """ @jira_ticket CASSANDRA-13948 """ - self._disk_balance_after_joining_ring_test(lcs=True) + self._disk_balance_after_joining_ring_test(self.LCS_COMPACTION_OPTS) + + @since('4.0') + def test_disk_balance_after_joining_ring_ucs(self): + """ + @jira_ticket CASSANDRA-13948 + """ + self._disk_balance_after_joining_ring_test(self.UCS_COMPACTION_OPTS) - def _disk_balance_after_joining_ring_test(self, lcs): + def _disk_balance_after_joining_ring_test(self, compaction_opts): """ @jira_ticket CASSANDRA-13948 @@ -302,7 +319,6 @@ def _disk_balance_after_joining_ring_test(self, lcs): keys_per_flush = 10000 keys_to_write = num_flushes * keys_per_flush - compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if lcs else "SizeTieredCompactionStrategy" logger.debug("Writing {} keys in {} flushes (compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts)) total_keys = num_flushes * keys_per_flush current_keys = 0 @@ -327,9 +343,9 @@ def _disk_balance_after_joining_ring_test(self, lcs): node1.nodetool("join") node1.nodetool("join") # Need to run join twice - one to join ring, another to leave write survey mode - self._assert_balanced_after_boundary_change(node1, total_keys, lcs) + self._assert_balanced_after_boundary_change(node1, total_keys, compaction_opts) - def _assert_balanced_after_boundary_change(self, node, total_keys, lcs): + def _assert_balanced_after_boundary_change(self, node, total_keys, compaction_opts): logger.debug("Cleanup {}".format(node.name)) node.cleanup() @@ -351,7 +367,7 @@ def _assert_balanced_after_boundary_change(self, node, total_keys, lcs): logger.debug("Reading data back ({} keys)".format(total_keys)) node.stress(['read', 'n={}'.format(total_keys), "no-warmup", "cl=ALL", "-pop", "seq=1...{}".format(total_keys), "-rate", "threads=1"]) - if lcs: + if compaction_opts == self.LCS_COMPACTION_OPTS: output = grep_sstables_in_each_level(node, "standard1") logger.debug("SSTables in each level: {}".format(output)) diff --git a/dtest.py b/dtest.py index 01cf4f1631..e46a66f1ef 100644 --- a/dtest.py +++ b/dtest.py @@ -364,6 +364,8 @@ def create_ks(session, name, rf): if isinstance(rf, int): # we assume simpleStrategy query = query % (name, "'class':'SimpleStrategy', 'replication_factor':%d" % rf) + elif 'EverywhereStrategy' in rf: + query = query % (name, "'class':'org.apache.cassandra.locator.EverywhereStrategy'") else: assert len(rf) >= 0, "At least one datacenter/rf pair is needed" # we assume networkTopologyStrategy diff --git a/dtest_config.py b/dtest_config.py index 86e8c96b25..ad454d49f7 100644 --- a/dtest_config.py +++ b/dtest_config.py @@ -11,6 +11,7 @@ class DTestConfig: def __init__(self): + self.sstable_format = "bti" self.use_vnodes = True self.use_off_heap_memtables = False self.num_tokens = -1 @@ -41,6 +42,10 @@ def setup(self, config): self.cassandra_version_from_build = self.get_version_from_build() return + self.sstable_format = config.getoption("--sstable-format") + if self.sstable_format: + assert self.sstable_format in ['bti', 'big'], "SSTable format {} is invalid - must be either bti or big".format(self.sstable_format) + self.use_vnodes = config.getoption("--use-vnodes") self.use_off_heap_memtables = config.getoption("--use-off-heap-memtables") self.num_tokens = config.getoption("--num-tokens") @@ -90,6 +95,17 @@ def setup(self, config): "--use-off-heap-memtables, see https://issues.apache.org/jira/browse/CASSANDRA-9472 " "for details" % version) + self.apply_to_env(os.environ, "JVM_EXTRA_OPTS") + + def apply_to_env(self, env, key="JVM_OPTS"): + current = env.get(key) or "" + if self.sstable_format: + default_sstable_format_prop = " -Dcassandra.sstable.format.default=" + self.sstable_format + if not current.__contains__("-Dcassandra.sstable.format.default"): + env.update({key: (env.get(key) or "") + default_sstable_format_prop}) + else: + logger.debug("Skipped adding {} because it is already in the env key {}: {}".format(default_sstable_format_prop, key, current)) + def get_version_from_build(self): # There are times when we want to know the C* version we're testing against # before we do any cluster. In the general case, we can't know that -- the diff --git a/dtest_setup.py b/dtest_setup.py index 6a396b8e51..d771e08af5 100644 --- a/dtest_setup.py +++ b/dtest_setup.py @@ -17,7 +17,7 @@ from cassandra.cluster import EXEC_PROFILE_DEFAULT from cassandra.policies import WhiteListRoundRobinPolicy from ccmlib.common import is_win -from ccmlib.cluster import Cluster +from ccmlib.cluster import Cluster, NodeError from dtest import (get_ip_from_node, make_execution_profile, get_auth_provider, get_port_from_node, get_eager_protocol_version) @@ -232,7 +232,9 @@ def _create_session(self, node, keyspace, user, password, compression, protocol_ protocol_version=protocol_version, port=port, ssl_options=ssl_opts, - connect_timeout=15, + connect_timeout=60, + idle_heartbeat_timeout=60, + idle_heartbeat_interval=60, allow_beta_protocol_version=True, execution_profiles=profiles) session = cluster.connect(wait_for_all_pools=True) @@ -244,7 +246,7 @@ def _create_session(self, node, keyspace, user, password, compression, protocol_ return session def patient_cql_connection(self, node, keyspace=None, - user=None, password=None, timeout=30, compression=True, + user=None, password=None, timeout=60, compression=True, protocol_version=None, port=None, ssl_opts=None, **kwargs): """ Returns a connection after it stops throwing NoHostAvailables due to not being ready. @@ -352,6 +354,10 @@ def dump_jfr_recording(self, nodes): def supports_v5_protocol(self, cluster_version): return cluster_version >= LooseVersion('4.0') + def supports_guardrails(self): + return self.cluster.version() >= LooseVersion('4.0') + + def cleanup_last_test_dir(self): if os.path.exists(self.last_test_dir): os.remove(self.last_test_dir) @@ -366,17 +372,35 @@ def stop_active_log_watch(self): """ self.log_watch_thread.join(timeout=60) + def stop_cluster(self, gently=False): + """ + Stops the cluster; if 'gently' is requested and a NodeError occurs, then + try again without 'gently'. + + Some tests, by design, leave the cluster in a state which prevents it from + being stopped using 'gently'. Retrying without 'gently' will avoid marking + the test as a failure, but may prevent jacoco results from being recorded. + """ + try: + self.cluster.stop(gently) + except NodeError as e: + if gently: + logger.debug("Exception stopping cluster with gently=True, retrying with gently=False: {0}".format(e)) + self.cluster.stop(gently=False) + else: + raise e + def cleanup_cluster(self, request=None): with log_filter('cassandra'): # quiet noise from driver when nodes start going down test_failed = request and hasattr(request.node, 'rep_call') and request.node.rep_call.failed if self.dtest_config.keep_test_dir or (self.dtest_config.keep_failed_test_dir and test_failed): - self.cluster.stop(gently=self.dtest_config.enable_jacoco_code_coverage) + self.stop_cluster(gently=self.dtest_config.enable_jacoco_code_coverage) else: # when recording coverage the jvm has to exit normally # or the coverage information is not written by the jacoco agent # otherwise we can just kill the process if self.dtest_config.enable_jacoco_code_coverage: - self.cluster.stop(gently=True) + self.stop_cluster(gently=True) # Cleanup everything: try: diff --git a/guardrails_test.py b/guardrails_test.py new file mode 100644 index 0000000000..bf883bba98 --- /dev/null +++ b/guardrails_test.py @@ -0,0 +1,99 @@ +import logging +import time +import pytest +import re + +from cassandra import InvalidRequest + +from dtest import Tester, create_ks +from tools.assertions import assert_one + +since = pytest.mark.since +logger = logging.getLogger(__name__) + +class BaseGuardrailsTester(Tester): + + def prepare(self, rf=1, options=None, nodes=3, install_byteman=False, extra_jvm_args=None, **kwargs): + if options is None: + options = {} + + if extra_jvm_args is None: + extra_jvm_args = [] + + cluster = self.cluster + cluster.set_log_level('TRACE') + cluster.populate(nodes, install_byteman=install_byteman) + if options: + cluster.set_configuration_options(values=options) + + cluster.start(jvm_args=extra_jvm_args) + node1 = cluster.nodelist()[0] + + session = self.patient_cql_connection(node1, **kwargs) + create_ks(session, 'ks', rf) + + return session + + +@since('4.0') +class TestGuardrails(BaseGuardrailsTester): + + def test_disk_usage_guardrail(self): + """ + Test disk usage guardrail will warn if exceeds warn threshold and reject writes if exceeds failure threshold + """ + + self.fixture_dtest_setup.ignore_log_patterns = ["Write request failed because disk usage exceeds failure threshold"] + guardrails_config = {'guardrails': {'disk_usage_percentage_warn_threshold': 98, + 'disk_usage_percentage_failure_threshold': 99}} + + logger.debug("prepare 2-node cluster with rf=1 and guardrails enabled") + session = self.prepare(rf=1, nodes=2, options=guardrails_config, extra_jvm_args=['-Dcassandra.disk_usage.monitor_interval_ms=100'], install_byteman=True) + node1, node2 = self.cluster.nodelist() + session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)") + + logger.debug("Inject FULL to node1, expect log on node1 and node2 rejects writes") + mark = node1.mark_log() + self.disk_usage_injection(node1, "full", False) + node1.watch_log_for("Adding state DISK_USAGE: FULL", filename='debug.log', from_mark=mark, timeout=10) + + # verify node2 will reject writes if node1 is the replica + session2 = self.patient_exclusive_cql_connection(node2, keyspace="ks") + rows = 100 + failed = 0 + for x in range(rows): + try: + session2.execute("INSERT INTO t(id, v) VALUES({v}, {v})".format(v=x)) + except InvalidRequest as e: + assert re.search("Write request failed because disk usage exceeds failure threshold", str(e)) + failed = failed + 1 + + assert rows != failed, "Expect node2 rejects some writes, but rejected all" + assert 0 != failed, "Expect node2 rejects some writes, but rejected nothing" + assert_one(session2, "SELECT COUNT(*) FROM t", [rows - failed]) + + logger.debug("Inject STUFFED to node1, node2 should warn client") + session2.execute("TRUNCATE t") + mark = node1.mark_log() + self.disk_usage_injection(node1, "stuffed") + node1.watch_log_for("Adding state DISK_USAGE: STUFFED", filename='debug.log', from_mark=mark, timeout=10) + + warnings = 0 + for x in range(rows): + fut = session2.execute_async("INSERT INTO t(id, v) VALUES({v}, {v})".format(v=x)) + fut.result() + if fut.warnings: + assert ["Replica disk usage exceeds warn threshold"] == fut.warnings + warnings = warnings + 1 + + assert rows != warnings,"Expect node2 emits some warnings, but got all warnings" + assert 0 != warnings,"Expect node2 emits some warnings, but got no warnings" + assert_one(session2, "SELECT COUNT(*) FROM t", [rows]) + + session.cluster.shutdown() + session2.cluster.shutdown() + + def disk_usage_injection(self, node, state, clear_byteman=True): + if clear_byteman: + node.byteman_submit(['-u']) + node.byteman_submit(["./byteman/guardrails/disk_usage_{}.btm".format(state)]) diff --git a/offline_tools_test.py b/offline_tools_test.py index 54c0e067cb..0e7b32ef55 100644 --- a/offline_tools_test.py +++ b/offline_tools_test.py @@ -284,9 +284,9 @@ def test_sstableverify(self): hashcomputed = False for line in outlines: if sstable in line: - if "Verifying BigTableReader" in line: + if "Verifying " in line: verified = True - elif "Checking computed hash of BigTableReader" in line: + elif "Checking computed hash of " in line: hashcomputed = True else: logger.debug(line) diff --git a/paging_test.py b/paging_test.py index 971c7778a0..e6554b85d8 100644 --- a/paging_test.py +++ b/paging_test.py @@ -18,6 +18,7 @@ assert_one, assert_lists_equal_ignoring_order) from tools.data import rows_to_list from tools.datahelp import create_rows, flatten_into_set, parse_data_into_dicts +from tools.misc import restart_cluster_and_update_config from tools.paging import PageAssertionMixin, PageFetcher since = pytest.mark.since @@ -3423,19 +3424,26 @@ def test_failure_threshold_deletions(self): supports_v5_protocol = self.supports_v5_protocol(self.cluster.version()) self.fixture_dtest_setup.allow_log_errors = True - self.cluster.set_configuration_options( - values={'tombstone_failure_threshold': 500} - ) + if self.supports_guardrails: + config_opts = {'guardrails': {'tombstone_failure_threshold': 500, + 'tombstone_warn_threshold': -1, + 'write_consistency_levels_disallowed': {}}} + else: + config_opts = {'tombstone_failure_threshold': 500} + restart_cluster_and_update_config(self.cluster, config_opts) self.session = self.prepare() self.setup_data() - # Add more data + if self.supports_guardrails: + # cell tombstones are not counted towards the threshold, so we delete rows + query = "delete from paging_test where id = 1 and mytext = '{}'" + else: + # Add more data + query = "insert into paging_test (id, mytext, col1) values (1, '{}', null)" + values = [uuid.uuid4() for i in range(3000)] for value in values: - self.session.execute(SimpleStatement( - "insert into paging_test (id, mytext, col1) values (1, '{}', null) ".format( - value - ), + self.session.execute(SimpleStatement(query.format(value), consistency_level=CL.ALL )) @@ -3456,7 +3464,7 @@ def test_failure_threshold_deletions(self): failure_msg = ("Scanned over.* tombstones in test_paging_size." "paging_test.* query aborted") else: - failure_msg = ("Scanned over.* tombstones during query.* query aborted") + failure_msg = ("Scanned over.* (tombstones|tombstone rows) during query.* query aborted") self.cluster.wait_for_any_log(failure_msg, 25) diff --git a/pushed_notifications_test.py b/pushed_notifications_test.py index 9d0ab93252..b5ed18851f 100644 --- a/pushed_notifications_test.py +++ b/pushed_notifications_test.py @@ -388,13 +388,18 @@ def test_tombstone_failure_threshold_message(self): have_v5_protocol = self.supports_v5_protocol(self.cluster.version()) self.fixture_dtest_setup.allow_log_errors = True - self.cluster.set_configuration_options( - values={ - 'tombstone_failure_threshold': 500, - 'read_request_timeout_in_ms': 30000, # 30 seconds - 'range_request_timeout_in_ms': 40000 - } - ) + + if self.supports_guardrails: + config_options = {'guardrails': {'tombstone_warn_threshold': -1, + 'tombstone_failure_threshold': 500}, + 'read_request_timeout_in_ms': 30000, # 30 seconds + 'range_request_timeout_in_ms': 40000} + else: + config_options = {'tombstone_failure_threshold': 500, + 'read_request_timeout_in_ms': 30000, # 30 seconds + 'range_request_timeout_in_ms': 40000} + + self.cluster.set_configuration_options(values=config_options) self.cluster.populate(3).start() node1, node2, node3 = self.cluster.nodelist() proto_version = 5 if have_v5_protocol else None @@ -407,17 +412,17 @@ def test_tombstone_failure_threshold_message(self): "PRIMARY KEY (id, mytext) )" ) - # Add data with tombstones + if self.supports_guardrails: + # cell tombstones are not counted towards the threshold, so we delete rows + query = "delete from test where id = 1 and mytext = '{}'" + else: + # Add data with tombstones + query = "insert into test (id, mytext, col1) values (1, '{}', null)" values = [str(i) for i in range(1000)] for value in values: - session.execute(SimpleStatement( - "insert into test (id, mytext, col1) values (1, '{}', null) ".format( - value - ), - consistency_level=CL.ALL - )) - - failure_msg = ("Scanned over.* tombstones.* query aborted") + session.execute(SimpleStatement(query.format(value),consistency_level=CL.ALL)) + + failure_msg = ("Scanned over.* (tombstones|tombstone rows).* query aborted") @pytest.mark.timeout(25) def read_failure_query(): diff --git a/pytest.ini b/pytest.ini index c8e85d449a..5f10d876e1 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,6 +2,11 @@ python_files = test_*.py *_test.py *_tests.py junit_suite_name = Cassandra dtests log_print = True -log_level = INFO -log_format = %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s +log_cli = True +log_cli_level = DEBUG +log_cli_format = %(asctime)s,%(msecs)03d %(name)s %(levelname)s %(message)s +log_cli_date_format = %Y-%m-%d %H:%M:%S +log_file_level = DEBUG +log_file_format = %(asctime)s,%(msecs)03d %(name)s %(levelname)s %(message)s +log_file_date_format = %Y-%m-%d %H:%M:%S timeout = 900 diff --git a/read_failures_test.py b/read_failures_test.py index 475f27815d..664ca70ff4 100644 --- a/read_failures_test.py +++ b/read_failures_test.py @@ -4,6 +4,7 @@ from cassandra import ConsistencyLevel, ReadFailure, ReadTimeout from cassandra.policies import FallthroughRetryPolicy from cassandra.query import SimpleStatement +from distutils.version import LooseVersion from dtest import Tester @@ -21,7 +22,9 @@ class TestReadFailures(Tester): @pytest.fixture(autouse=True) def fixture_add_additional_log_patterns(self, fixture_dtest_setup): fixture_dtest_setup.ignore_log_patterns = ( - "Scanned over [1-9][0-9]* tombstones", # This is expected when testing read failures due to tombstones + # These are expected when testing read failures due to tombstones, + "Scanned over [1-9][0-9]* tombstones", + "Scanned over [1-9][0-9]* tombstone rows", ) return fixture_dtest_setup @@ -33,9 +36,15 @@ def fixture_dtest_setup_params(self): self.expected_expt = ReadFailure def _prepare_cluster(self): - self.cluster.set_configuration_options( - values={'tombstone_failure_threshold': self.tombstone_failure_threshold} - ) + if self.supports_guardrails: + self.cluster.set_configuration_options( + values={'guardrails': {'tombstone_warn_threshold': -1, + 'tombstone_failure_threshold': self.tombstone_failure_threshold}} + ) + else: + self.cluster.set_configuration_options( + values={'tombstone_failure_threshold': self.tombstone_failure_threshold} + ) self.cluster.populate(3) self.cluster.start() self.nodes = list(self.cluster.nodes.values()) diff --git a/repair_tests/incremental_repair_test.py b/repair_tests/incremental_repair_test.py index 6da75d6e0a..998fec2c34 100644 --- a/repair_tests/incremental_repair_test.py +++ b/repair_tests/incremental_repair_test.py @@ -143,38 +143,20 @@ def test_consistent_repair(self): results = list(session.execute("SELECT * FROM system.repairs")) assert len(results) == 0, str(results) - # disable compaction so we can verify sstables are marked pending repair - for node in self.cluster.nodelist(): - node.nodetool('disableautocompaction ks tbl') - + # repair node1.repair(options=['ks']) # check that all participating nodes have the repair recorded in their system # table, that all nodes are listed as participants, and that all sstables are # (still) marked pending repair expected_participants = {n.address() for n in self.cluster.nodelist()} - expected_participants_wp = {n.address_and_port() for n in self.cluster.nodelist()} - recorded_pending_ids = set() for node in self.cluster.nodelist(): session = self.patient_exclusive_cql_connection(node) results = list(session.execute("SELECT * FROM system.repairs")) assert len(results) == 1 result = results[0] assert set(result.participants) == expected_participants - if hasattr(result, "participants_wp"): - assert set(result.participants_wp) == expected_participants_wp - assert result.state, ConsistentState.FINALIZED == "4=FINALIZED" - pending_id = result.parent_id - self.assertAllPendingRepairSSTables(node, 'ks', pending_id) - recorded_pending_ids.add(pending_id) - - assert len(recorded_pending_ids) == 1 - - # sstables are compacted out of pending repair by a compaction - # task, we disabled compaction earlier in the test, so here we - # force the compaction and check that all sstables are promoted - for node in self.cluster.nodelist(): - node.nodetool('compact ks tbl') + assert result.state == ConsistentState.FINALIZED, str(result.state) self.assertAllRepairedSSTables(node, 'ks') def test_sstable_marking(self): diff --git a/repair_tests/repair_test.py b/repair_tests/repair_test.py index 29f46e90b9..d79d67700b 100644 --- a/repair_tests/repair_test.py +++ b/repair_tests/repair_test.py @@ -1,6 +1,7 @@ import os import os.path import threading +import tempfile import time import re import pytest @@ -1167,8 +1168,7 @@ def test_multiple_concurrent_repairs(self): _, _, rc = node2.stress(['read', 'n=1M', 'no-warmup', '-rate', 'threads=30'], whitelist=True) assert rc == 0 - @since('4.0') - def test_wide_row_repair(self): + def _test_wide_row_repair(self, compaction_strategy): """ @jira_ticket CASSANDRA-13899 Make sure compressed vs uncompressed blocks are handled correctly when stream decompressing @@ -1178,13 +1178,26 @@ def test_wide_row_repair(self): cluster.populate(2).start() node1, node2 = cluster.nodelist() node2.stop(wait_other_notice=True) - profile_path = os.path.join(os.getcwd(), 'stress_profiles/repair_wide_rows.yaml') - logger.info(("yaml = " + profile_path)) - node1.stress(['user', 'profile=' + profile_path, 'n=50', 'ops(insert=1)', 'no-warmup', '-rate', 'threads=8', - '-insert', 'visits=FIXED(100K)', 'revisit=FIXED(100K)']) + template_path = os.path.join(os.getcwd(), 'stress_profiles/repair_wide_rows.yaml.tmpl') + with open(template_path) as profile_template: + profile = profile_template.read().replace("{{ compaction_strategy }}", compaction_strategy) + with tempfile.NamedTemporaryFile(mode='w+') as stress_profile: + stress_profile.write(profile) + stress_profile.flush() + print("yaml = " + stress_profile.name) + node1.stress(['user', 'profile=' + stress_profile.name, 'n=50', 'ops(insert=1)', 'no-warmup', '-rate', 'threads=8', + '-insert', 'visits=FIXED(100K)', 'revisit=FIXED(100K)']) node2.start(wait_for_binary_proto=True) node2.repair() + @since('4.0') + def test_wide_row_repair_lcs(self): + self._test_wide_row_repair('LeveledCompactionStrategy') + + @since('4.0') + def test_wide_row_repair_ucs(self): + self._test_wide_row_repair('UnifiedCompactionStrategy') + @since('2.1', max_version='4') def test_dead_coordinator(self): """ @@ -1225,6 +1238,40 @@ def run_repair(): else: node1.nodetool('repair keyspace1 standard1 -inc -par') + @since('3.0') + def test_repair_one_node_cluster(self): + options = [] + fix_STAR582 = self.cluster.version() >= "4.0" + if not fix_STAR582: + options = ['--ignore-unreplicated-keyspaces'] + options + self._repair_abort_test(options=options, nodes=1, rf=2) + + @since('3.0') + def test_repair_one_node_in_local_dc(self): + self._repair_abort_test(options=['--ignore-unreplicated-keyspaces', '--in-local-dc'], nodes=[1, 1], rf={'dc1': 1, 'dc2': 1}, no_common_range=True) + + def _repair_abort_test(self, options=[], nodes=1, rf=1, no_common_range=False): + cluster = self.cluster + logger.debug("Starting cluster..") + cluster.populate(nodes).start(wait_for_binary_proto=True) + + node1 = self.cluster.nodelist()[0] + session = self.patient_cql_connection(node1) + create_ks(session, 'ks', rf=rf) + + support_preview = self.cluster.version() >= "4.0" + if support_preview: + logger.debug("Preview repair") + out = node1.repair(["--preview"] + options) + if no_common_range: + assert "Nothing to repair for " in str(out), "Expect 'Nothing to repair for '" + + logger.debug("Full repair") + node1.repair(["--full"] + options) + + logger.debug("Incremental repair") + node1.repair(options) + @since('2.2') def test_dead_sync_initiator(self): """ diff --git a/replace_address_test.py b/replace_address_test.py index 995ea0333e..268aab0665 100644 --- a/replace_address_test.py +++ b/replace_address_test.py @@ -582,7 +582,14 @@ def test_replace_with_insufficient_replicas(self): self.replacement_node.watch_log_for("Unable to find sufficient sources for streaming range") assert_not_running(self.replacement_node) - def test_multi_dc_replace_with_rf1(self): + def test_multi_dc_replace_with_rf1_stcs(self): + self._test_multi_dc_replace_with_rf1('SizeTieredCompactionStrategy') + + @since("4.0") + def test_multi_dc_replace_with_rf1_ucs(self): + self._test_multi_dc_replace_with_rf1('UnifiedCompactionStrategy') + + def _test_multi_dc_replace_with_rf1(self, compaction_strategy): """ Test that multi-dc replace works when rf=1 on each dc """ @@ -592,7 +599,7 @@ def test_multi_dc_replace_with_rf1(self): # Create the keyspace and table keyspace: keyspace1 keyspace_definition: | - CREATE KEYSPACE keyspace1 WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}; + CREATE KEYSPACE keyspace1 WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}}; table: users table_definition: CREATE TABLE users ( @@ -601,7 +608,7 @@ def test_multi_dc_replace_with_rf1(self): last_name text, email text, PRIMARY KEY(username) - ) WITH compaction = {'class':'SizeTieredCompactionStrategy'}; + ) WITH compaction = {{'class':'{compaction_strategy}'}}; insert: partitions: fixed(1) batchtype: UNLOGGED @@ -609,7 +616,8 @@ def test_multi_dc_replace_with_rf1(self): read: cql: select * from users where username = ? fields: samerow - """ + """.format(compaction_strategy=compaction_strategy) + with tempfile.NamedTemporaryFile(mode='w+') as stress_config: stress_config.write(yaml_config) stress_config.flush() diff --git a/run_dtests.py b/run_dtests.py index 34dd5af766..3f3e1fda91 100755 --- a/run_dtests.py +++ b/run_dtests.py @@ -1,6 +1,6 @@ #!/usr/bin/env python """ -usage: run_dtests.py [-h] [--use-vnodes] [--use-off-heap-memtables] [--num-tokens=NUM_TOKENS] [--data-dir-count-per-instance=DATA_DIR_COUNT_PER_INSTANCE] +usage: run_dtests.py [-h] [--sstable-format=FORMAT] [--use-vnodes] [--use-off-heap-memtables] [--num-tokens=NUM_TOKENS] [--data-dir-count-per-instance=DATA_DIR_COUNT_PER_INSTANCE] [--force-resource-intensive-tests] [--skip-resource-intensive-tests] [--cassandra-dir=CASSANDRA_DIR] [--cassandra-version=CASSANDRA_VERSION] [--delete-logs] [--execute-upgrade-tests] [--execute-upgrade-tests-only] [--disable-active-log-watching] [--keep-test-dir] [--enable-jacoco-code-coverage] [--dtest-enable-debug-logging] [--dtest-print-tests-only] [--dtest-print-tests-output=DTEST_PRINT_TESTS_OUTPUT] @@ -8,6 +8,7 @@ optional arguments: -h, --help show this help message and exit + --sstable-format SSTable format to be used by default for all newly created SSTables: big or bti (default: bti) --use-vnodes Determines wither or not to setup clusters using vnodes for tests (default: False) --use-off-heap-memtables Enable Off Heap Memtables when creating test clusters for tests (default: False) --num-tokens=NUM_TOKENS Number of tokens to set num_tokens yaml setting to when creating instances with vnodes enabled (default: 256) diff --git a/schema_test.py b/schema_test.py index 6c9f8a1aa1..689fafcd6b 100644 --- a/schema_test.py +++ b/schema_test.py @@ -13,7 +13,7 @@ class TestSchema(Tester): - def test_table_alteration(self): + def _test_table_alteration(self, compaction_opts): """ Tests that table alters return as expected with many sstables at different schema points """ @@ -24,7 +24,7 @@ def test_table_alteration(self): create_ks(session, 'ks', 1) session.execute("use ks;") session.execute("create table tbl_o_churn (id int primary key, c0 text, c1 text) " - "WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 };") + "WITH compaction = " + compaction_opts + ";") stmt1 = session.prepare("insert into tbl_o_churn (id, c0, c1) values (?, ?, ?)") rows_to_insert = 50 @@ -54,6 +54,13 @@ def test_table_alteration(self): assert row.c2 == 'ddd' assert not hasattr(row, 'c0') + def test_table_alteration_stcs(self): + self._test_table_alteration("{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 }") + + @since("4.0") + def test_table_alteration_ucs(self): + self._test_table_alteration("{'class': 'UnifiedCompactionStrategy'}") + @since("2.0", max_version="3.X") # Compact Storage def test_drop_column_compact(self): session = self.prepare() diff --git a/scrub_test.py b/scrub_test.py index 3d50d70c31..7e2eee4049 100644 --- a/scrub_test.py +++ b/scrub_test.py @@ -110,12 +110,15 @@ def launch_nodetool_cmd(self, cmd): if not common.is_win(): # nodetool always prints out on windows assert_length_equal(response, 0) # nodetool does not print anything unless there is an error - def launch_standalone_scrub(self, ks, cf, reinsert_overflowed_ttl=False, no_validate=False): + def launch_standalone_scrub(self, ks, cf, reinsert_overflowed_ttl=False, no_validate=False, acceptable_errors=None): """ Launch the standalone scrub """ node1 = self.cluster.nodelist()[0] env = common.make_cassandra_env(node1.get_install_cassandra_root(), node1.get_node_cassandra_root()) + + self.dtest_config.apply_to_env(env, "JVM_OPTS") + scrub_bin = node1.get_tool('sstablescrub') logger.debug(scrub_bin) @@ -131,7 +134,7 @@ def launch_standalone_scrub(self, ks, cf, reinsert_overflowed_ttl=False, no_vali # if we have less than 64G free space, we get this warning - ignore it if err and "Consider adding more capacity" not in err.decode("utf-8"): logger.debug(err.decode("utf-8")) - assert_stderr_clean(err.decode("utf-8")) + assert_stderr_clean(err.decode("utf-8"), acceptable_errors) def perform_node_tool_cmd(self, cmd, table, indexes): """ @@ -158,33 +161,35 @@ def scrub(self, table, *indexes): time.sleep(.1) return self.get_sstables(table, indexes) - def standalonescrub(self, table, *indexes): + def standalonescrub(self, table, *indexes, acceptable_errors=None): """ Launch standalone scrub on table and indexes, and then return all sstables in a dict keyed by the table or index name. """ - self.launch_standalone_scrub(KEYSPACE, table) + self.launch_standalone_scrub(ks=KEYSPACE, cf=table, acceptable_errors=acceptable_errors) for index in indexes: self.launch_standalone_scrub(KEYSPACE, '{}.{}'.format(table, index)) return self.get_sstables(table, indexes) - def increment_generation_by(self, sstable, generation_increment): + def get_latest_generation(self, sstables): """ - Set the generation number for an sstable file name + Get the latest generation ID of the provided sstables """ - return re.sub('(\d(?!\d))\-', lambda x: str(int(x.group(1)) + generation_increment) + '-', sstable) + latest_gen = None + for table_or_index, table_sstables in list(sstables.items()): + gen = max(parse.search('{}-{generation}-{}.{}', s).named['generation'] for s in table_sstables) + latest_gen = gen if latest_gen is None else max([gen, latest_gen]) + return latest_gen - def increase_sstable_generations(self, sstables): + def get_earliest_generation(self, sstables): """ - After finding the number of existing sstables, increase all of the - generations by that amount. + Get the earliest generation ID of the provided sstables """ + earliest_gen = None for table_or_index, table_sstables in list(sstables.items()): - increment_by = len(set(parse.search('{}-{increment_by}-{suffix}.{file_extention}', s).named['increment_by'] for s in table_sstables)) - sstables[table_or_index] = [self.increment_generation_by(s, increment_by) for s in table_sstables] - - logger.debug('sstables after increment {}'.format(str(sstables))) - + gen = min(parse.search('{}-{generation}-{}.{}', s).named['generation'] for s in table_sstables) + earliest_gen = gen if earliest_gen is None else min([gen, earliest_gen]) + return earliest_gen @since('2.2') class TestScrubIndexes(TestHelper): @@ -237,16 +242,15 @@ def test_scrub_static_table(self): initial_sstables = self.flush('users', 'gender_idx', 'state_idx', 'birth_year_idx') scrubbed_sstables = self.scrub('users', 'gender_idx', 'state_idx', 'birth_year_idx') - self.increase_sstable_generations(initial_sstables) - assert initial_sstables == scrubbed_sstables + assert self.get_latest_generation(initial_sstables) < self.get_earliest_generation(scrubbed_sstables) users = self.query_users(session) assert initial_users == users # Scrub and check sstables and data again + initial_sstables = scrubbed_sstables scrubbed_sstables = self.scrub('users', 'gender_idx', 'state_idx', 'birth_year_idx') - self.increase_sstable_generations(initial_sstables) - assert initial_sstables == scrubbed_sstables + assert self.get_latest_generation(initial_sstables) < self.get_earliest_generation(scrubbed_sstables) users = self.query_users(session) assert initial_users == users @@ -278,8 +282,7 @@ def test_standalone_scrub(self): cluster.stop() scrubbed_sstables = self.standalonescrub('users', 'gender_idx', 'state_idx', 'birth_year_idx') - self.increase_sstable_generations(initial_sstables) - assert initial_sstables == scrubbed_sstables + assert self.get_latest_generation(initial_sstables) < self.get_earliest_generation(scrubbed_sstables) cluster.start() session = self.patient_cql_connection(node1) @@ -312,16 +315,14 @@ def test_scrub_collections_table(self): initial_sstables = self.flush('users', 'user_uuids_idx') scrubbed_sstables = self.scrub('users', 'user_uuids_idx') - self.increase_sstable_generations(initial_sstables) - assert initial_sstables == scrubbed_sstables + assert self.get_latest_generation(initial_sstables) < self.get_earliest_generation(scrubbed_sstables) users = list(session.execute(("SELECT * from users where uuids contains {some_uuid}").format(some_uuid=_id))) assert initial_users == users + initial_sstables = scrubbed_sstables scrubbed_sstables = self.scrub('users', 'user_uuids_idx') - - self.increase_sstable_generations(initial_sstables) - assert initial_sstables == scrubbed_sstables + assert self.get_latest_generation(initial_sstables) < self.get_earliest_generation(scrubbed_sstables) users = list(session.execute(("SELECT * from users where uuids contains {some_uuid}").format(some_uuid=_id))) @@ -374,16 +375,15 @@ def test_nodetool_scrub(self): initial_sstables = self.flush('users') scrubbed_sstables = self.scrub('users') - self.increase_sstable_generations(initial_sstables) - assert initial_sstables == scrubbed_sstables + assert self.get_latest_generation(initial_sstables) < self.get_earliest_generation(scrubbed_sstables) users = self.query_users(session) assert initial_users == users # Scrub and check sstables and data again + initial_sstables = scrubbed_sstables scrubbed_sstables = self.scrub('users') - self.increase_sstable_generations(initial_sstables) - assert initial_sstables == scrubbed_sstables + assert self.get_latest_generation(initial_sstables) < self.get_earliest_generation(scrubbed_sstables) users = self.query_users(session) assert initial_users == users @@ -415,8 +415,7 @@ def test_standalone_scrub(self): cluster.stop() scrubbed_sstables = self.standalonescrub('users') - self.increase_sstable_generations(initial_sstables) - assert initial_sstables == scrubbed_sstables + assert self.get_latest_generation(initial_sstables) < self.get_earliest_generation(scrubbed_sstables) cluster.start() session = self.patient_cql_connection(node1) @@ -443,9 +442,8 @@ def test_standalone_scrub_essential_files_only(self): self.delete_non_essential_sstable_files('users') - scrubbed_sstables = self.standalonescrub('users') - self.increase_sstable_generations(initial_sstables) - assert initial_sstables == scrubbed_sstables + scrubbed_sstables = self.standalonescrub(table='users', acceptable_errors=["WARN.*Could not recreate or deserialize existing bloom filter, continuing with a pass-through bloom filter but this will significantly impact reads performance"]) + assert self.get_latest_generation(initial_sstables) < self.get_earliest_generation(scrubbed_sstables) cluster.start() session = self.patient_cql_connection(node1) diff --git a/stress_profiles/repair_wide_rows.yaml b/stress_profiles/repair_wide_rows.yaml.tmpl similarity index 97% rename from stress_profiles/repair_wide_rows.yaml rename to stress_profiles/repair_wide_rows.yaml.tmpl index 87f46f02a3..c35ebbe96b 100644 --- a/stress_profiles/repair_wide_rows.yaml +++ b/stress_profiles/repair_wide_rows.yaml.tmpl @@ -9,8 +9,8 @@ table_definition: | col1 text, val blob, PRIMARY KEY(key, col1) - ) - WITH compaction = { 'class':'LeveledCompactionStrategy' } + ) + WITH compaction = { 'class':'{{ compaction_strategy }}' } AND compression = {'chunk_length_in_kb': '1', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}; # @@ -18,7 +18,7 @@ table_definition: | # The min and max only apply to text and blob types # The distribution field represents the total unique population # distribution of that column across rows. Supported types are -# +# # EXP(min..max) An exponential distribution over the range [min..max] # EXTREME(min..max,shape) An extreme value (Weibull) distribution over the range [min..max] # GAUSSIAN(min..max,stdvrng) A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng diff --git a/tools/assertions.py b/tools/assertions.py index 78fd4444d3..b03e83956b 100644 --- a/tools/assertions.py +++ b/tools/assertions.py @@ -114,7 +114,7 @@ def assert_unauthorized(session, query, message): assert_exception(session, query, matching=message, expected=Unauthorized) -def assert_one(session, query, expected, cl=None): +def assert_one(session, query, expected, cl=None, timeout=None): """ Assert query returns one row. @param session Session to use @@ -127,7 +127,7 @@ def assert_one(session, query, expected, cl=None): assert_one(session, query, [0, 0]) """ simple_query = SimpleStatement(query, consistency_level=cl) - res = session.execute(simple_query) + res = session.execute(simple_query) if timeout is None else session.execute(simple_query, timeout=timeout) list_res = _rows_to_list(res) assert list_res == [expected], "Expected {} from {}, but got {}".format([expected], query, list_res) @@ -293,15 +293,19 @@ def assert_stderr_clean(err, acceptable_errors=None): @param acceptable_errors A list that if used, the user chooses what messages are to be acceptable in stderr. """ + default_acceptable_errors = ["WARN.*JNA link failure.*unavailable.", + "objc.*Class JavaLaunchHelper.*?Which one is undefined.", + # Stress tool JMX connection failure, see CASSANDRA-12437 + "Failed to connect over JMX; not collecting these stats", + "Picked up JAVA_TOOL_OPTIONS:.*", + # Warnings for backward compatibility should be logged CASSANDRA-15234 + ".*parameters have been deprecated. They have new names and/or value format; " + + "For more information, please refer to NEWS.txt*"] + if acceptable_errors is None: - acceptable_errors = ["WARN.*JNA link failure.*unavailable.", - "objc.*Class JavaLaunchHelper.*?Which one is undefined.", - # Stress tool JMX connection failure, see CASSANDRA-12437 - "Failed to connect over JMX; not collecting these stats", - "Picked up JAVA_TOOL_OPTIONS:.*", - # Warnings for backward compatibility should be logged CASSANDRA-15234 - ".*parameters have been deprecated. They have new names and/or value format; " - + "For more information, please refer to NEWS.txt*"] + acceptable_errors = default_acceptable_errors + else: + acceptable_errors = default_acceptable_errors + acceptable_errors regex_str = r"^({}|\s*|\n)*$".format("|".join(acceptable_errors)) err_str = err.strip() diff --git a/tools/misc.py b/tools/misc.py index 542a889a5a..d746a9947e 100644 --- a/tools/misc.py +++ b/tools/misc.py @@ -157,3 +157,14 @@ def add_skip(cls, reason=""): else: cls.pytestmark = [pytest.mark.skip(reason)] return cls + + +def restart_cluster_and_update_config(cluster, config): + """ + Takes a new config, and applies it to a cluster. We need to restart + for it to take effect. We _could_ take a node here, but we don't want to. + If you really want to change the config of just one node, use JMX. + """ + cluster.stop() + cluster.set_configuration_options(values=config) + cluster.start() diff --git a/update-history/STAR-1342/20-c32760d5 STAR-452: add EverywhereStrategy smoke test (#10) b/update-history/STAR-1342/20-c32760d5 STAR-452: add EverywhereStrategy smoke test (#10) new file mode 100644 index 0000000000..e847736ba2 --- /dev/null +++ b/update-history/STAR-1342/20-c32760d5 STAR-452: add EverywhereStrategy smoke test (#10) @@ -0,0 +1,100 @@ +--- a/bootstrap_test.py ++++ b/bootstrap_test.py +@@ -1019,8 +1019,6 @@ + assert_bootstrap_state(self, node3, 'COMPLETED', user='cassandra', password='cassandra') + node3.wait_for_binary_interface() + +-<<<<<<< +-======= + @since('4.0') + @pytest.mark.no_vnodes + def test_simple_bootstrap_with_everywhere_strategy(self): +@@ -1072,88 +1070,6 @@ + + assert_bootstrap_state(self, node2, 'COMPLETED') + +- @since('4.1') +- def test_invalid_host_id(self): +- """ +- @jira_ticket CASSANDRA-14582 +- Test that node fails to bootstrap if host id is invalid +- """ +- cluster = self.cluster +- cluster.set_environment_variable('CASSANDRA_TOKEN_PREGENERATION_DISABLED', 'True') +- cluster.populate(1) +- cluster.start() +- +- node2 = new_node(cluster) +- +- try: +- node2.start(jvm_args=["-Dcassandra.host_id_first_boot=invalid-host-id"], wait_other_notice=False, wait_for_binary_proto=True) +- pytest.fail('Node should fail to bootstrap because host id set was invalid') +- except NodeError: +- pass # node does not start as expected +- +- @since('4.1') +- def test_host_id_override(self): +- """ +- @jira_ticket CASSANDRA-14582 +- Test that node persists host id +- """ +- cluster = self.cluster +- cluster.set_environment_variable('CASSANDRA_TOKEN_PREGENERATION_DISABLED', 'True') +- cluster.populate(1) +- cluster.start() +- +- host_id = "06fc931f-33b5-4e22-0001-000000000001" +- +- node1 = cluster.nodes['node1'] +- +- node2 = new_node(cluster) +- node2.start(wait_for_binary_proto=True, wait_other_notice=True, jvm_args=["-Dcassandra.host_id_first_boot={}".format(host_id)]) +- +- address2 = "'{}'".format(node2.address()) +- +- # 1. wait for host_id setup +- node2.watch_log_for(host_id) +- +- # 2. check host_id in local table +- session2 = self.patient_exclusive_cql_connection(node2) +- assert_one(session2, "SELECT host_id FROM system.local", [uuid.UUID(host_id)]) +- +- # 3. check host_id in other node's table +- session1 = self.patient_exclusive_cql_connection(node1) +- assert_one(session1, "SELECT host_id FROM system.peers_v2 WHERE peer = {}".format(address2), [uuid.UUID(host_id)]) +- +- # restart node and repeat +- node2.stop() +- node2.start(wait_for_binary_proto=True, wait_other_notice=True) +- +- # 1. wait for host_id setup +- node2.watch_log_for(host_id) +- +- # 2. check host_id in local table +- session2 = self.patient_exclusive_cql_connection(node2) +- assert_one(session2, "SELECT host_id FROM system.local", [uuid.UUID(host_id)]) +- +- # 3. check host_id in other node's table +- session1 = self.patient_exclusive_cql_connection(node1) +- assert_one(session1, "SELECT host_id FROM system.peers_v2 WHERE peer = {}".format(address2), [uuid.UUID(host_id)]) +- +- # restart node with another host_id and repeat +- node2.stop() +- node2.start(wait_for_binary_proto=True, wait_other_notice=True, jvm_args=["-Dcassandra.host_id_first_boot=setting-new-host-id-first-boot"]) +- +- # 1. wait for host_id setup +- node2.watch_log_for(host_id) +- +- # 2. check host_id in local table +- session2 = self.patient_exclusive_cql_connection(node2) +- assert_one(session2, "SELECT host_id FROM system.local", [uuid.UUID(host_id)]) +- +- # 3. check host_id in other node's table +- session1 = self.patient_exclusive_cql_connection(node1) +- assert_one(session1, "SELECT host_id FROM system.peers_v2 WHERE peer = {}".format(address2), [uuid.UUID(host_id)]) +- +->>>>>>> +- + class TestBootstrap(BootstrapTester): + """ + This child class is a helper for PyTest to pick up the test methods.