Skip to content

Commit

Permalink
redo kill broker test with continuous writes
Browse files Browse the repository at this point in the history
  • Loading branch information
zmraul committed Sep 11, 2023
1 parent d9f6623 commit 316ca00
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 36 deletions.
3 changes: 2 additions & 1 deletion lib/charms/kafka/v0/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __init__(
self.mtls = self.security_protocol == "SSL"

self._subscription = None
self._consumer_group_prefix = None

@cached_property
def _admin_client(self) -> KafkaAdminClient:
Expand Down Expand Up @@ -174,7 +175,7 @@ def _consumer_client(self) -> KafkaConsumer:
ssl_certfile=self.certfile_path if self.ssl else None,
ssl_keyfile=self.keyfile_path if self.mtls else None,
api_version=KafkaClient.API_VERSION if self.mtls else None,
group_id=self._consumer_group_prefix or None,
group_id=self._consumer_group_prefix,
enable_auto_commit=True,
auto_offset_reset="earliest",
consumer_timeout_ms=15000,
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/ha/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
24 changes: 14 additions & 10 deletions tests/integration/ha/continuous_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
wait_fixed,
wait_random,
)
from tests.integration.helpers import DUMMY_NAME, get_provider_data

from integration.helpers import DUMMY_NAME, get_provider_data

logger = logging.getLogger(__name__)

Expand All @@ -43,10 +44,10 @@ def __init__(self, ops_test: OpsTest, app: str):
wait=wait_fixed(wait=5) + wait_random(0, 5),
stop=stop_after_attempt(5),
)
async def start(self) -> None:
def start(self) -> None:
"""Run continuous writes in the background."""
if not self._is_stopped:
await self.clear()
self.clear()

# create topic
self._create_replicated_topic()
Expand All @@ -60,18 +61,18 @@ async def start(self) -> None:
# start writes
self._process.start()

async def update(self):
def update(self):
"""Update cluster related conf. Useful in cases such as scaling, pwd change etc."""
self._queue.put(SimpleNamespace(model_full_name=self._ops_test.model_full_name))

@retry(
wait=wait_fixed(wait=5) + wait_random(0, 5),
stop=stop_after_attempt(5),
)
async def clear(self) -> None:
def clear(self) -> None:
"""Stop writes and delete the topic."""
if not self._is_stopped:
await self.stop()
self.stop()

client = self._client()
try:
Expand All @@ -88,6 +89,7 @@ def consumed_messages(self) -> list:
client = self._client()
try:
client.subscribe_to_topic(topic_name=self.TOPIC_NAME)
# FIXME: loading whole list of consumed messages into memory might not be the best idea
return list(client.messages())
finally:
client.close()
Expand All @@ -106,7 +108,7 @@ def _create_replicated_topic(self):
wait=wait_fixed(wait=5) + wait_random(0, 5),
stop=stop_after_attempt(5),
)
async def stop(self) -> SimpleNamespace:
def stop(self) -> SimpleNamespace:
"""Stop the continuous writes process and return max inserted ID."""
if not self._is_stopped:
self._stop_process()
Expand All @@ -123,9 +125,11 @@ async def stop(self) -> SimpleNamespace:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)):
with attempt:
with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "r") as f:
result.last_expected_message = int(f.read().rstrip())
result.last_expected_message, result.lost_messages = (
f.read().rstrip().split(",", maxsplit=2)
)
except RetryError:
result.last_expected_message = -1
result.last_expected_message = result.lost_messages = -1

return result

Expand All @@ -136,7 +140,7 @@ def _create_process(self):
self._process = Process(
target=ContinuousWrites._run_async,
name="continuous_writes",
args=(self._event, self._queue, 0, True),
args=(self._event, self._queue, 0),
)

def _stop_process(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/ha/ha_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from subprocess import PIPE, check_output

from pytest_operator.plugin import OpsTest
from tests.integration.helpers import APP_NAME, get_address

from integration.helpers import APP_NAME, get_address
from literals import SECURITY_PROTOCOL_PORTS
from snap import KafkaSnap

Expand Down Expand Up @@ -51,14 +51,14 @@ async def get_topic_offsets(ops_test: OpsTest, topic: str, unit_name: str) -> li
+ f":{SECURITY_PROTOCOL_PORTS['SASL_PLAINTEXT'].client}"
)

# example of topic offset output: 'test-topic:0:10'
result = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.get-offsets --bootstrap-server {bootstrap_server} --command-config {KafkaSnap.CONF_PATH}/client.properties --topic {topic}'",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

# example of topic offset output: 'test-topic:0:10'
return re.search(rf"{topic}:(\d+:\d+)", result)[1].split(":")


Expand Down
46 changes: 35 additions & 11 deletions tests/integration/ha/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ async def c_writes(ops_test: OpsTest):
@pytest.fixture()
async def c_writes_runner(ops_test: OpsTest, c_writes: ContinuousWrites):
"""Starts continuous write operations and clears writes at the end of the test."""
await c_writes.start()
c_writes.start()
yield
await c_writes.clear()
c_writes.clear()
logger.info("\n\n\n\nThe writes have been cleared.\n\n\n\n")


Expand Down Expand Up @@ -89,30 +89,54 @@ async def test_replicated_events(ops_test: OpsTest):
kafka_unit_name=f"{APP_NAME}/1",
topic="replicated-topic",
)
assert get_topic_offsets(
assert await get_topic_offsets(
ops_test=ops_test, topic="replicated-topic", unit_name=f"{APP_NAME}/1"
) == ["0", "15"]
check_logs(
model_full_name=ops_test.model_full_name,
kafka_unit_name=f"{APP_NAME}/2",
topic="replicated-topic",
)
assert get_topic_offsets(
assert await get_topic_offsets(
ops_test=ops_test, topic="replicated-topic", unit_name=f"{APP_NAME}/2"
) == ["0", "15"]


async def test_kill_broker_with_topic_leader(ops_test: OpsTest):
initial_leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic")
logger.info(f"Killing broker of leader for topic 'replicated-topic': {initial_leader_num}")
async def test_kill_broker_with_topic_leader(
ops_test: OpsTest,
c_writes: ContinuousWrites,
c_writes_runner: ContinuousWrites,
):
initial_leader_num = await get_topic_leader(
ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME
)
initial_offsets = await get_topic_offsets(
ops_test=ops_test,
topic=ContinuousWrites.TOPIC_NAME,
unit_name=f"kafka/{initial_leader_num}",
)

logger.info(
f"Killing broker of leader for topic '{ContinuousWrites.TOPIC_NAME}': {initial_leader_num}"
)
await send_control_signal(
ops_test=ops_test, unit_name=f"{APP_NAME}/{initial_leader_num}", kill_code="SIGKILL"
)
# Give time for the service to restart
time.sleep(15)
# Check that is still possible to write to the same topic.
final_leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic")
assert initial_leader_num != final_leader_num
time.sleep(10)

# Check that leader changed
next_leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic")
next_offsets = await get_topic_offsets(
ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME, unit_name=f"kafka/{next_leader_num}"
)

assert initial_leader_num != next_leader_num
assert int(next_offsets[-1]) > int(initial_offsets[-1])

res = c_writes.stop()
assert res.lost_messages == 0
assert res.count - 1 == res.last_expected_message # NOTE: Count starts by index 0


async def test_multi_cluster_isolation(ops_test: OpsTest, kafka_charm):
Expand Down
14 changes: 2 additions & 12 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ set_env =
scaling: TEST_FILE=test_scaling.py
password-rotation: TEST_FILE=test_password_rotation.py
tls: TEST_FILE=test_tls.py
ha: TEST_FILE=test_ha.py
ha: TEST_FILE=ha/test_ha.py

pass_env =
PYTHONPATH
Expand Down Expand Up @@ -85,7 +85,7 @@ commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/

[testenv:integration-{charm,provider,scaling,password-rotation,tls}]
[testenv:integration-{charm,provider,scaling,password-rotation,tls,ha}]
description = Run integration tests
pass_env =
{[testenv]pass_env}
Expand All @@ -94,13 +94,3 @@ pass_env =
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/{env:TEST_FILE}

[testenv:integration-{ha}]
description = Run integration tests for high availability
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ha/{env:TEST_FILE}

0 comments on commit 316ca00

Please sign in to comment.