Skip to content

Commit

Permalink
STAR-543 Port guardrail tests and changes (#19)
Browse files Browse the repository at this point in the history
Co-authored-by: Aleksandr Sorokoumov <[email protected]>
  • Loading branch information
djatnieks and Gerrrr authored Jun 8, 2021
1 parent bff9530 commit 6bc11b6
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 39 deletions.
8 changes: 8 additions & 0 deletions byteman/guardrails/disk_usage_full.btm
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
RULE return FULL disk usage
CLASS org.apache.cassandra.service.disk.usage.DiskUsageMonitor
METHOD getState
AT EXIT
IF TRUE
DO
return org.apache.cassandra.service.disk.usage.DiskUsageState.FULL;
ENDRULE
8 changes: 8 additions & 0 deletions byteman/guardrails/disk_usage_stuffed.btm
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
RULE return STUFFED disk usage
CLASS org.apache.cassandra.service.disk.usage.DiskUsageMonitor
METHOD getState
AT EXIT
IF TRUE
DO
return org.apache.cassandra.service.disk.usage.DiskUsageState.STUFFED;
ENDRULE
3 changes: 2 additions & 1 deletion client_request_metrics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
fixture_dtest_setup.ignore_log_patterns = (
'Testing write failures', # The error to simulate a write failure
'ERROR WRITE_FAILURE', # Logged in DEBUG mode for write failures
f"Scanned over {TOMBSTONE_FAILURE_THRESHOLD + 1} tombstones during query" # Caused by the read failure tests
f"Scanned over {TOMBSTONE_FAILURE_THRESHOLD + 1} (tombstones|tombstone rows) during query" # Caused by the read failure tests
)

def setup_once(self):
cluster = self.cluster
cluster.set_configuration_options({'read_request_timeout_in_ms': 3000,
'write_request_timeout_in_ms': 3000,
'phi_convict_threshold': 12,
'tombstone_warn_threshold': -1,
'tombstone_failure_threshold': TOMBSTONE_FAILURE_THRESHOLD,
'enable_materialized_views': 'true'})
cluster.populate(2, debug=True)
Expand Down
10 changes: 8 additions & 2 deletions compaction_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,10 @@ def test_large_compaction_warning(self):
Check that we log a warning when the partition size is bigger than compaction_large_partition_warning_threshold_mb
"""
cluster = self.cluster
cluster.set_configuration_options({'compaction_large_partition_warning_threshold_mb': 1})
if self.supports_guardrails:
cluster.set_configuration_options({'guardrails': {'partition_size_warn_threshold_in_mb': 1}})
else:
cluster.set_configuration_options({'compaction_large_partition_warning_threshold_mb': 1})
cluster.populate(1).start()
[node] = cluster.nodelist()

Expand All @@ -361,7 +364,10 @@ def test_large_compaction_warning(self):
node.nodetool('compact ks large')
verb = 'Writing' if self.cluster.version() > '2.2' else 'Compacting'
sizematcher = '\d+ bytes' if self.cluster.version() < LooseVersion('3.6') else '\d+\.\d{3}(K|M|G)iB'
node.watch_log_for('{} large partition ks/large:user \({}'.format(verb, sizematcher), from_mark=mark, timeout=180)
log_message = '{} large partition ks/large:user \({}'.format(verb, sizematcher)
if self.supports_guardrails:
log_message = "Detected partition 'user' in ks.large of size 2MB is greater than the maximum recommended size \(1MB\)"
node.watch_log_for(log_message, from_mark=mark, timeout=180)

ret = list(session.execute("SELECT properties from ks.large where userid = 'user'"))
assert_length_equal(ret, 1)
Expand Down
7 changes: 5 additions & 2 deletions cqlsh_tests/test_cqlsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -1810,8 +1810,11 @@ def test_client_warnings(self):
"""
max_partitions_per_batch = 5
self.cluster.populate(3)
self.cluster.set_configuration_options({
'unlogged_batch_across_partitions_warn_threshold': str(max_partitions_per_batch)})

config_opts = {'unlogged_batch_across_partitions_warn_threshold': str(max_partitions_per_batch)}
if self.supports_guardrails:
config_opts = {"guardrails": config_opts}
self.cluster.set_configuration_options(config_opts)

self.cluster.start()

Expand Down
26 changes: 21 additions & 5 deletions cqlsh_tests/test_cqlsh_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2475,8 +2475,12 @@ def test_bulk_round_trip_blogposts(self):
@jira_ticket CASSANDRA-9302
"""
config_opts = {'batch_size_warn_threshold_in_kb': '10'}
if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0
config_opts = {'guardrails': config_opts}

self._test_bulk_round_trip(nodes=3, partitioner="murmur3", num_operations=10000,
configuration_options={'batch_size_warn_threshold_in_kb': '10'},
configuration_options=config_opts,
profile=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'blogposts.yaml'),
stress_table='stresscql.blogposts')

Expand All @@ -2489,9 +2493,16 @@ def test_bulk_round_trip_blogposts_with_max_connections(self):
@jira_ticket CASSANDRA-10938
"""
batch_size_warn_threshold_in_kb = '10'
native_transport_max_concurrent_connections = '12'
if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0
config_opts = {'guardrails': {'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb},
'native_transport_max_concurrent_connections': native_transport_max_concurrent_connections}
else:
config_opts = {'native_transport_max_concurrent_connections': native_transport_max_concurrent_connections,
'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb}
self._test_bulk_round_trip(nodes=3, partitioner="murmur3", num_operations=10000,
configuration_options={'native_transport_max_concurrent_connections': '12',
'batch_size_warn_threshold_in_kb': '10'},
configuration_options=config_opts,
profile=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'blogposts.yaml'),
stress_table='stresscql.blogposts',
copy_to_options={'NUMPROCESSES': 5, 'MAXATTEMPTS': 20},
Expand Down Expand Up @@ -2821,8 +2832,13 @@ def test_copy_from_with_large_cql_rows(self):
@jira_ticket CASSANDRA-11474
"""
num_records = 100
self.prepare(nodes=1, configuration_options={'batch_size_warn_threshold_in_kb': '1', # warn with 1kb and fail
'batch_size_fail_threshold_in_kb': '5'}) # with 5kb size batches
batch_size_warn_threshold_in_kb = '1' # warn with 1kb and fail
batch_size_fail_threshold_in_kb = '5' # with 5kb size batches
config_opts = {'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb,
'batch_size_fail_threshold_in_kb': batch_size_fail_threshold_in_kb}
if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0
config_opts = {'guardrails': config_opts}
self.prepare(nodes=1, configuration_options=config_opts)

logger.debug('Running stress')
stress_table_name = 'standard1'
Expand Down
4 changes: 4 additions & 0 deletions dtest_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ def dump_jfr_recording(self, nodes):
def supports_v5_protocol(self, cluster_version):
return cluster_version >= LooseVersion('4.0')

def supports_guardrails(self):
return self.cluster.version() >= LooseVersion('4.0')


def cleanup_last_test_dir(self):
if os.path.exists(self.last_test_dir):
os.remove(self.last_test_dir)
Expand Down
99 changes: 99 additions & 0 deletions guardrails_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import logging
import time
import pytest
import re

from cassandra import InvalidRequest

from dtest import Tester, create_ks
from tools.assertions import assert_one

since = pytest.mark.since
logger = logging.getLogger(__name__)

class BaseGuardrailsTester(Tester):

def prepare(self, rf=1, options=None, nodes=3, install_byteman=False, extra_jvm_args=None, **kwargs):
if options is None:
options = {}

if extra_jvm_args is None:
extra_jvm_args = []

cluster = self.cluster
cluster.set_log_level('TRACE')
cluster.populate(nodes, install_byteman=install_byteman)
if options:
cluster.set_configuration_options(values=options)

cluster.start(jvm_args=extra_jvm_args)
node1 = cluster.nodelist()[0]

session = self.patient_cql_connection(node1, **kwargs)
create_ks(session, 'ks', rf)

return session


@since('4.0')
class TestGuardrails(BaseGuardrailsTester):

def test_disk_usage_guardrail(self):
"""
Test disk usage guardrail will warn if exceeds warn threshold and reject writes if exceeds failure threshold
"""

self.fixture_dtest_setup.ignore_log_patterns = ["Write request failed because disk usage exceeds failure threshold"]
guardrails_config = {'guardrails': {'disk_usage_percentage_warn_threshold': 98,
'disk_usage_percentage_failure_threshold': 99}}

logger.debug("prepare 2-node cluster with rf=1 and guardrails enabled")
session = self.prepare(rf=1, nodes=2, options=guardrails_config, extra_jvm_args=['-Dcassandra.disk_usage.monitor_interval_ms=100'], install_byteman=True)
node1, node2 = self.cluster.nodelist()
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")

logger.debug("Inject FULL to node1, expect log on node1 and node2 rejects writes")
mark = node1.mark_log()
self.disk_usage_injection(node1, "full", False)
node1.watch_log_for("Adding state DISK_USAGE: FULL", filename='debug.log', from_mark=mark, timeout=10)

# verify node2 will reject writes if node1 is the replica
session2 = self.patient_exclusive_cql_connection(node2, keyspace="ks")
rows = 100
failed = 0
for x in range(rows):
try:
session2.execute("INSERT INTO t(id, v) VALUES({v}, {v})".format(v=x))
except InvalidRequest as e:
assert re.search("Write request failed because disk usage exceeds failure threshold", str(e))
failed = failed + 1

assert rows != failed, "Expect node2 rejects some writes, but rejected all"
assert 0 != failed, "Expect node2 rejects some writes, but rejected nothing"
assert_one(session2, "SELECT COUNT(*) FROM t", [rows - failed])

logger.debug("Inject STUFFED to node1, node2 should warn client")
session2.execute("TRUNCATE t")
mark = node1.mark_log()
self.disk_usage_injection(node1, "stuffed")
node1.watch_log_for("Adding state DISK_USAGE: STUFFED", filename='debug.log', from_mark=mark, timeout=10)

warnings = 0
for x in range(rows):
fut = session2.execute_async("INSERT INTO t(id, v) VALUES({v}, {v})".format(v=x))
fut.result()
if fut.warnings:
assert ["Replica disk usage exceeds warn threshold"] == fut.warnings
warnings = warnings + 1

assert rows != warnings,"Expect node2 emits some warnings, but got all warnings"
assert 0 != warnings,"Expect node2 emits some warnings, but got no warnings"
assert_one(session2, "SELECT COUNT(*) FROM t", [rows])

session.cluster.shutdown()
session2.cluster.shutdown()

def disk_usage_injection(self, node, state, clear_byteman=True):
if clear_byteman:
node.byteman_submit(['-u'])
node.byteman_submit(["./byteman/guardrails/disk_usage_{}.btm".format(state)])
26 changes: 17 additions & 9 deletions paging_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
assert_one, assert_lists_equal_ignoring_order)
from tools.data import rows_to_list
from tools.datahelp import create_rows, flatten_into_set, parse_data_into_dicts
from tools.misc import restart_cluster_and_update_config
from tools.paging import PageAssertionMixin, PageFetcher

since = pytest.mark.since
Expand Down Expand Up @@ -3423,19 +3424,26 @@ def test_failure_threshold_deletions(self):
supports_v5_protocol = self.supports_v5_protocol(self.cluster.version())

self.fixture_dtest_setup.allow_log_errors = True
self.cluster.set_configuration_options(
values={'tombstone_failure_threshold': 500}
)
if self.supports_guardrails:
config_opts = {'guardrails': {'tombstone_failure_threshold': 500,
'tombstone_warn_threshold': -1,
'write_consistency_levels_disallowed': {}}}
else:
config_opts = {'tombstone_failure_threshold': 500}
restart_cluster_and_update_config(self.cluster, config_opts)
self.session = self.prepare()
self.setup_data()

# Add more data
if self.supports_guardrails:
# cell tombstones are not counted towards the threshold, so we delete rows
query = "delete from paging_test where id = 1 and mytext = '{}'"
else:
# Add more data
query = "insert into paging_test (id, mytext, col1) values (1, '{}', null)"

values = [uuid.uuid4() for i in range(3000)]
for value in values:
self.session.execute(SimpleStatement(
"insert into paging_test (id, mytext, col1) values (1, '{}', null) ".format(
value
),
self.session.execute(SimpleStatement(query.format(value),
consistency_level=CL.ALL
))

Expand All @@ -3456,7 +3464,7 @@ def test_failure_threshold_deletions(self):
failure_msg = ("Scanned over.* tombstones in test_paging_size."
"paging_test.* query aborted")
else:
failure_msg = ("Scanned over.* tombstones during query.* query aborted")
failure_msg = ("Scanned over.* (tombstones|tombstone rows) during query.* query aborted")

self.cluster.wait_for_any_log(failure_msg, 25)

Expand Down
37 changes: 21 additions & 16 deletions pushed_notifications_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,18 @@ def test_tombstone_failure_threshold_message(self):
have_v5_protocol = self.supports_v5_protocol(self.cluster.version())

self.fixture_dtest_setup.allow_log_errors = True
self.cluster.set_configuration_options(
values={
'tombstone_failure_threshold': 500,
'read_request_timeout_in_ms': 30000, # 30 seconds
'range_request_timeout_in_ms': 40000
}
)

if self.supports_guardrails:
config_options = {'guardrails': {'tombstone_warn_threshold': -1,
'tombstone_failure_threshold': 500},
'read_request_timeout_in_ms': 30000, # 30 seconds
'range_request_timeout_in_ms': 40000}
else:
config_options = {'tombstone_failure_threshold': 500,
'read_request_timeout_in_ms': 30000, # 30 seconds
'range_request_timeout_in_ms': 40000}

self.cluster.set_configuration_options(values=config_options)
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
proto_version = 5 if have_v5_protocol else None
Expand All @@ -407,17 +412,17 @@ def test_tombstone_failure_threshold_message(self):
"PRIMARY KEY (id, mytext) )"
)

# Add data with tombstones
if self.supports_guardrails:
# cell tombstones are not counted towards the threshold, so we delete rows
query = "delete from test where id = 1 and mytext = '{}'"
else:
# Add data with tombstones
query = "insert into test (id, mytext, col1) values (1, '{}', null)"
values = [str(i) for i in range(1000)]
for value in values:
session.execute(SimpleStatement(
"insert into test (id, mytext, col1) values (1, '{}', null) ".format(
value
),
consistency_level=CL.ALL
))

failure_msg = ("Scanned over.* tombstones.* query aborted")
session.execute(SimpleStatement(query.format(value),consistency_level=CL.ALL))

failure_msg = ("Scanned over.* (tombstones|tombstone rows).* query aborted")

@pytest.mark.timeout(25)
def read_failure_query():
Expand Down
17 changes: 13 additions & 4 deletions read_failures_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from cassandra import ConsistencyLevel, ReadFailure, ReadTimeout
from cassandra.policies import FallthroughRetryPolicy
from cassandra.query import SimpleStatement
from distutils.version import LooseVersion

from dtest import Tester

Expand All @@ -21,7 +22,9 @@ class TestReadFailures(Tester):
@pytest.fixture(autouse=True)
def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
fixture_dtest_setup.ignore_log_patterns = (
"Scanned over [1-9][0-9]* tombstones", # This is expected when testing read failures due to tombstones
# These are expected when testing read failures due to tombstones,
"Scanned over [1-9][0-9]* tombstones",
"Scanned over [1-9][0-9]* tombstone rows",
)
return fixture_dtest_setup

Expand All @@ -33,9 +36,15 @@ def fixture_dtest_setup_params(self):
self.expected_expt = ReadFailure

def _prepare_cluster(self):
self.cluster.set_configuration_options(
values={'tombstone_failure_threshold': self.tombstone_failure_threshold}
)
if self.supports_guardrails:
self.cluster.set_configuration_options(
values={'guardrails': {'tombstone_warn_threshold': -1,
'tombstone_failure_threshold': self.tombstone_failure_threshold}}
)
else:
self.cluster.set_configuration_options(
values={'tombstone_failure_threshold': self.tombstone_failure_threshold}
)
self.cluster.populate(3)
self.cluster.start()
self.nodes = list(self.cluster.nodes.values())
Expand Down
11 changes: 11 additions & 0 deletions tools/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,14 @@ def add_skip(cls, reason=""):
else:
cls.pytestmark = [pytest.mark.skip(reason)]
return cls


def restart_cluster_and_update_config(cluster, config):
"""
Takes a new config, and applies it to a cluster. We need to restart
for it to take effect. We _could_ take a node here, but we don't want to.
If you really want to change the config of just one node, use JMX.
"""
cluster.stop()
cluster.set_configuration_options(values=config)
cluster.start()

0 comments on commit 6bc11b6

Please sign in to comment.