From 73e2217c62fe4e3feef4529a07abed86faf91f15 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Wed, 16 Aug 2023 15:00:26 +0200 Subject: [PATCH 01/21] add two cluster ha test --- tests/integration/helpers.py | 11 +++++ tests/integration/test_ha.py | 86 ++++++++++++++++++++++++++++++++++++ tox.ini | 10 +++++ 3 files changed, 107 insertions(+) create mode 100644 tests/integration/test_ha.py diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index c28de71a..5de01b2a 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -262,6 +262,17 @@ def produce_and_check_logs( message = f"Message #{i}" client.produce_message(topic_name=topic, message_content=message) + check_logs(model_full_name, kafka_unit_name, topic) + + +def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None: + """Checks if messages for a topic have been produced. + + Args: + model_full_name: the full name of the model + kafka_unit_name: the kafka unit to checks logs on + topic: the desired topic to check + """ logs = check_output( f"JUJU_MODEL={model_full_name} juju ssh {kafka_unit_name} sudo -i 'find {KafkaSnap.DATA_PATH}/data'", stderr=PIPE, diff --git a/tests/integration/test_ha.py b/tests/integration/test_ha.py new file mode 100644 index 00000000..fbcd3291 --- /dev/null +++ b/tests/integration/test_ha.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging + +import pytest +from pytest_operator.plugin import OpsTest + +from .helpers import ( + APP_NAME, + REL_NAME_ADMIN, + ZK_NAME, + check_logs, + produce_and_check_logs, +) + +logger = logging.getLogger(__name__) + + +DUMMY_NAME = "app" + + +@pytest.mark.abort_on_fail +async def test_build_and_deploy(ops_test: OpsTest, kafka_charm): + await asyncio.gather( + ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1, series="jammy"), + ops_test.model.deploy(ZK_NAME, channel="edge", num_units=1, series="jammy"), + ) + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30, timeout=3600) + assert ops_test.model.applications[APP_NAME].status == "blocked" + assert ops_test.model.applications[ZK_NAME].status == "active" + + await ops_test.model.add_relation(APP_NAME, ZK_NAME) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30) + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[ZK_NAME].status == "active" + + +async def test_second_cluster(ops_test: OpsTest, kafka_charm, app_charm): + second_kafka_name = f"{APP_NAME}-two" + second_zk_name = f"{ZK_NAME}-two" + + await asyncio.gather( + ops_test.model.deploy( + kafka_charm, application_name=second_kafka_name, num_units=1, series="jammy" + ), + ops_test.model.deploy( + ZK_NAME, channel="edge", application_name=second_zk_name, num_units=1, series="jammy" + ), + ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"), + ) + + await ops_test.model.wait_for_idle( + apps=[second_kafka_name, second_zk_name, DUMMY_NAME], + idle_period=30, + timeout=3600, + ) + assert ops_test.model.applications[second_kafka_name].status == "blocked" + + await ops_test.model.add_relation(second_kafka_name, second_zk_name) + + # Relate "app" to the *first* cluster + await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}") + + await ops_test.model.wait_for_idle( + apps=[second_kafka_name, second_zk_name, DUMMY_NAME, APP_NAME], + idle_period=30, + ) + + produce_and_check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{APP_NAME}/0", + provider_unit_name=f"{DUMMY_NAME}/0", + topic="hot-topic", + ) + + # Check that logs are not found on the second cluster + with pytest.raises(AssertionError): + check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{second_kafka_name}/0", + topic="hot-topic", + ) diff --git a/tox.ini b/tox.ini index a265bb48..15d1a28e 100644 --- a/tox.ini +++ b/tox.ini @@ -127,3 +127,13 @@ pass_env = commands = poetry install --with integration poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_tls.py + +[testenv:integration-ha] +description = Run TLS integration tests +pass_env = + {[testenv]pass_env} + CI + CI_PACKED_CHARMS +commands = + poetry install --with integration + poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_ha.py From 13d857ebb0aa8cdf6831d8ae8c86033bd4fcec38 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Wed, 16 Aug 2023 15:41:52 +0200 Subject: [PATCH 02/21] add test to CI --- .github/workflows/ci.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f7504a2e..c850f423 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -54,6 +54,7 @@ jobs: - integration-scaling - integration-password-rotation - integration-tls + - integration-ha name: ${{ matrix.tox-environments }} needs: - lint From fd8aafae5574c85a8f262cc3a6fcc9d71ed0f14b Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 21 Aug 2023 12:42:36 +0200 Subject: [PATCH 03/21] restructure tests pipeline --- .github/workflows/ci.yaml | 44 ++++++++- tests/integration/ha/helpers.py | 129 ++++++++++++++++++++++++++ tests/integration/{ => ha}/test_ha.py | 2 +- tox.ini | 59 +++--------- 4 files changed, 186 insertions(+), 48 deletions(-) create mode 100644 tests/integration/ha/helpers.py rename tests/integration/{ => ha}/test_ha.py (99%) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c850f423..d848ddb1 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -54,7 +54,6 @@ jobs: - integration-scaling - integration-password-rotation - integration-tls - - integration-ha name: ${{ matrix.tox-environments }} needs: - lint @@ -91,3 +90,46 @@ jobs: run: tox run -e ${{ matrix.tox-environments }} -- -m '${{ steps.select-tests.outputs.mark_expression }}' env: CI_PACKED_CHARMS: ${{ needs.build.outputs.charms }} + + integration-test-ha: + strategy: + fail-fast: false + matrix: + tox-environments: + - integration-ha + name: ${{ matrix.tox-environments }} + needs: + - lint + - unit-test + - build + - integration-test + runs-on: ubuntu-latest + timeout-minutes: 120 + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Setup operator environment + # TODO: Replace with custom image on self-hosted runner + uses: charmed-kubernetes/actions-operator@main + with: + provider: lxd + bootstrap-options: "--agent-version 2.9.38" + - name: Download packed charm(s) + uses: actions/download-artifact@v3 + with: + name: ${{ needs.build.outputs.artifact-name }} + - name: Select tests + id: select-tests + run: | + if [ "${{ github.event_name }}" == "schedule" ] + then + echo Running unstable and stable tests + echo "mark_expression=" >> $GITHUB_OUTPUT + else + echo Skipping unstable tests + echo "mark_expression=not unstable" >> $GITHUB_OUTPUT + fi + - name: Run integration tests + run: tox run -e ${{ matrix.tox-environments }} -- -m '${{ steps.select-tests.outputs.mark_expression }}' + env: + CI_PACKED_CHARMS: ${{ needs.build.outputs.charms }} \ No newline at end of file diff --git a/tests/integration/ha/helpers.py b/tests/integration/ha/helpers.py new file mode 100644 index 00000000..d8123d28 --- /dev/null +++ b/tests/integration/ha/helpers.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +import logging +from pathlib import Path +from subprocess import PIPE, check_output +from typing import Any, Dict + +import yaml +from charms.kafka.v0.client import KafkaClient +from kafka.admin import NewTopic + +from snap import KafkaSnap + +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) +APP_NAME = METADATA["name"] +ZK_NAME = "zookeeper" +REL_NAME_ADMIN = "kafka-client-admin" + +logger = logging.getLogger(__name__) + + +def produce_and_check_logs( + model_full_name: str, kafka_unit_name: str, provider_unit_name: str, topic: str +) -> None: + """Produces messages from HN to chosen Kafka topic. + + Args: + model_full_name: the full name of the model + kafka_unit_name: the kafka unit to checks logs on + provider_unit_name: the app to grab credentials from + topic: the desired topic to produce to + + Raises: + KeyError: if missing relation data + AssertionError: if logs aren't found for desired topic + """ + relation_data = get_provider_data( + unit_name=provider_unit_name, + model_full_name=model_full_name, + endpoint="kafka-client-admin", + ) + topic = topic + username = relation_data.get("username", None) + password = relation_data.get("password", None) + servers = relation_data.get("endpoints", "").split(",") + security_protocol = "SASL_PLAINTEXT" + + if not (username and password and servers): + raise KeyError("missing relation data from app charm") + + client = KafkaClient( + servers=servers, + username=username, + password=password, + security_protocol=security_protocol, + ) + topic_config = NewTopic( + name=topic, + num_partitions=5, + replication_factor=1, + ) + + client.create_topic(topic=topic_config) + for i in range(15): + message = f"Message #{i}" + client.produce_message(topic_name=topic, message_content=message) + + check_logs(model_full_name, kafka_unit_name, topic) + + +def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None: + """Checks if messages for a topic have been produced. + + Args: + model_full_name: the full name of the model + kafka_unit_name: the kafka unit to checks logs on + topic: the desired topic to check + """ + logs = check_output( + f"JUJU_MODEL={model_full_name} juju ssh {kafka_unit_name} sudo -i 'find {KafkaSnap.DATA_PATH}/data'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ).splitlines() + + logger.debug(f"{logs=}") + + passed = False + for log in logs: + if topic and "index" in log: + passed = True + break + + assert passed, "logs not found" + + +def get_provider_data( + unit_name: str, model_full_name: str, endpoint: str = "kafka-client" +) -> Dict[str, str]: + result = show_unit(unit_name=unit_name, model_full_name=model_full_name) + relations_info = result[unit_name]["relation-info"] + logger.info(f"Relation info: {relations_info}") + provider_relation_data = {} + for info in relations_info: + if info["endpoint"] == endpoint: + logger.info(f"Relation data: {info}") + provider_relation_data["username"] = info["application-data"]["username"] + provider_relation_data["password"] = info["application-data"]["password"] + provider_relation_data["endpoints"] = info["application-data"]["endpoints"] + provider_relation_data["zookeeper-uris"] = info["application-data"]["zookeeper-uris"] + provider_relation_data["tls"] = info["application-data"]["tls"] + if "consumer-group-prefix" in info["application-data"]: + provider_relation_data["consumer-group-prefix"] = info["application-data"][ + "consumer-group-prefix" + ] + provider_relation_data["topic"] = info["application-data"]["topic"] + return provider_relation_data + + +def show_unit(unit_name: str, model_full_name: str) -> Any: + result = check_output( + f"JUJU_MODEL={model_full_name} juju show-unit {unit_name}", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + return yaml.safe_load(result) diff --git a/tests/integration/test_ha.py b/tests/integration/ha/test_ha.py similarity index 99% rename from tests/integration/test_ha.py rename to tests/integration/ha/test_ha.py index fbcd3291..e90eb1e7 100644 --- a/tests/integration/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -8,7 +8,7 @@ import pytest from pytest_operator.plugin import OpsTest -from .helpers import ( +from helpers import ( APP_NAME, REL_NAME_ADMIN, ZK_NAME, diff --git a/tox.ini b/tox.ini index 15d1a28e..5342d8bd 100644 --- a/tox.ini +++ b/tox.ini @@ -21,6 +21,13 @@ set_env = PYTHONPATH = {tox_root}/lib:{[vars]src_path} PYTHONBREAKPOINT=ipdb.set_trace PY_COLORS=1 + charm: TEST_FILE=test_charm.py + provider: TEST_FILE=test_provider.py + scaling: TEST_FILE=test_scaling.py + password-rotation: TEST_FILE=test_password_rotation.py + tls: TEST_FILE=test_tls.py + ha: TEST_FILE=test_ha.py + pass_env = PYTHONPATH CHARM_BUILD_DIR @@ -78,62 +85,22 @@ commands = poetry install --with integration poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ -[testenv:integration-charm] -description = Run base integration tests -pass_env = - {[testenv]pass_env} - CI - CI_PACKED_CHARMS -commands = - poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_charm.py - -[testenv:integration-provider] -description = Run integration tests for provider -pass_env = - {[testenv]pass_env} - CI - CI_PACKED_CHARMS -commands = - poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_provider.py - -[testenv:integration-scaling] -description = Run scaling integration tests -pass_env = - {[testenv]pass_env} - CI - CI_PACKED_CHARMS -commands = - poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_scaling.py - -[testenv:integration-password-rotation] -description = Run password rotation integration tests -pass_env = - {[testenv]pass_env} - CI - CI_PACKED_CHARMS -commands = - poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_password_rotation.py - -[testenv:integration-tls] -description = Run TLS integration tests +[testenv:integration-{charm,provider,scaling,password-rotation,tls}] +description = Run integration tests pass_env = {[testenv]pass_env} CI CI_PACKED_CHARMS commands = poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_tls.py + poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/{env:TEST_FILE} -[testenv:integration-ha] -description = Run TLS integration tests +[testenv:integration-ha-{ha}] +description = Run integration tests for high availability pass_env = {[testenv]pass_env} CI CI_PACKED_CHARMS commands = poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_ha.py + poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ha/{env:TEST_FILE} From 60e6e7a9206d77a36188cc1fef698adf69e91bd3 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 21 Aug 2023 12:52:44 +0200 Subject: [PATCH 04/21] fix lint --- tests/integration/ha/test_ha.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index e90eb1e7..94c69417 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -6,8 +6,6 @@ import logging import pytest -from pytest_operator.plugin import OpsTest - from helpers import ( APP_NAME, REL_NAME_ADMIN, @@ -15,6 +13,7 @@ check_logs, produce_and_check_logs, ) +from pytest_operator.plugin import OpsTest logger = logging.getLogger(__name__) From 4cbdadb4044c0d210c689da6ea3f520dd9394164 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 21 Aug 2023 14:00:04 +0200 Subject: [PATCH 05/21] tell better joke to the pipeline --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d848ddb1..0eba70c5 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -96,7 +96,7 @@ jobs: fail-fast: false matrix: tox-environments: - - integration-ha + - integration-ha-ha name: ${{ matrix.tox-environments }} needs: - lint From 2c388039612bbf1fd55c9de73e12b7fd93fb8c3a Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Fri, 25 Aug 2023 16:07:16 +0200 Subject: [PATCH 06/21] add tests and restructure --- .github/workflows/ci.yaml | 2 +- tests/integration/ha/conftest.py | 26 ++++++ .../ha/{helpers.py => ha_helpers.py} | 70 ++++++++++++++++ .../ha/{test_ha.py => test_replication.py} | 80 ++++++++++++++++--- tox.ini | 4 +- 5 files changed, 168 insertions(+), 14 deletions(-) create mode 100644 tests/integration/ha/conftest.py rename tests/integration/ha/{helpers.py => ha_helpers.py} (61%) rename tests/integration/ha/{test_ha.py => test_replication.py} (50%) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 0eba70c5..27565426 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -96,7 +96,7 @@ jobs: fail-fast: false matrix: tox-environments: - - integration-ha-ha + - integration-ha-replication name: ${{ matrix.tox-environments }} needs: - lint diff --git a/tests/integration/ha/conftest.py b/tests/integration/ha/conftest.py new file mode 100644 index 00000000..e6fabf1d --- /dev/null +++ b/tests/integration/ha/conftest.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +import pytest +from ha_helpers import TEST_APP, get_application_name +from pytest_operator.plugin import OpsTest + + +@pytest.fixture() +async def continuous_writes(ops_test: OpsTest): + """Starts continuous writes to the MySQL cluster for a test and clear the writes at the end.""" + application_name = get_application_name(ops_test, TEST_APP) + + application_unit = ops_test.model.applications[application_name].units[0] + + clear_writes_action = await application_unit.run_action("clear-continuous-writes") + await clear_writes_action.wait() + + start_writes_action = await application_unit.run_action("start-continuous-writes") + await start_writes_action.wait() + + yield + + clear_writes_action = await application_unit.run_action("clear-continuous-writes") + await clear_writes_action.wait() diff --git a/tests/integration/ha/helpers.py b/tests/integration/ha/ha_helpers.py similarity index 61% rename from tests/integration/ha/helpers.py rename to tests/integration/ha/ha_helpers.py index d8123d28..075bbb82 100644 --- a/tests/integration/ha/helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -2,6 +2,7 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. import logging +import re from pathlib import Path from subprocess import PIPE, check_output from typing import Any, Dict @@ -9,6 +10,7 @@ import yaml from charms.kafka.v0.client import KafkaClient from kafka.admin import NewTopic +from pytest_operator.plugin import OpsTest from snap import KafkaSnap @@ -16,10 +18,63 @@ APP_NAME = METADATA["name"] ZK_NAME = "zookeeper" REL_NAME_ADMIN = "kafka-client-admin" +TEST_APP = "kafka-test-app" + +PROCESS = "kafka.Kafka" logger = logging.getLogger(__name__) +def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str, str]: + result = show_unit(unit_name=unit_name, model_full_name=model_full_name) + relations_info = result[unit_name]["relation-info"] + + zk_relation_data = {} + for info in relations_info: + if info["endpoint"] == "zookeeper": + zk_relation_data["chroot"] = info["application-data"]["chroot"] + zk_relation_data["endpoints"] = info["application-data"]["endpoints"] + zk_relation_data["password"] = info["application-data"]["password"] + zk_relation_data["uris"] = info["application-data"]["uris"] + zk_relation_data["username"] = info["application-data"]["username"] + zk_relation_data["tls"] = info["application-data"]["tls"] + return zk_relation_data + + +def get_topic_leader(model_full_name: str, zookeeper_uri: str, topic: str) -> int: + """Get the broker with the topic leader. + + Args: + model_full_name: the full name of the model + zookeeper_uri: uri from zookeeper + topic: the desired topic to check + """ + result = check_output( + f"JUJU_MODEL={model_full_name} juju ssh kafka/0 sudo -i 'charmed-kafka.topics --describe --zookeeper {zookeeper_uri} --describe --topic {topic}'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + return re.search(r"Leader: (\d+)", result)[1] + + +async def kill_unit_process( + ops_test: OpsTest, unit_name: str, kill_code: str, app_name: str = APP_NAME +) -> None: + if len(ops_test.model.applications[app_name].units) < 2: + await ops_test.model.applications[app_name].add_unit(count=1) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) + + kill_cmd = f"run --unit {unit_name} -- pkill --signal {kill_code} -f {PROCESS}" + return_code, _, _ = await ops_test.juju(*kill_cmd.split()) + + if return_code != 0: + raise Exception( + f"Expected kill command {kill_cmd} to succeed instead it failed: {return_code}" + ) + + def produce_and_check_logs( model_full_name: str, kafka_unit_name: str, provider_unit_name: str, topic: str ) -> None: @@ -127,3 +182,18 @@ def show_unit(unit_name: str, model_full_name: str) -> Any: ) return yaml.safe_load(result) + + +def get_application_name(ops_test: OpsTest, application_name_substring: str) -> str: + """Returns the name of the application with the provided application name. + + This enables us to retrieve the name of the deployed application in an existing model. + + Note: if multiple applications with the application name exist, + the first one found will be returned. + """ + for application in ops_test.model.applications: + if application_name_substring in application: + return application + + return None diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_replication.py similarity index 50% rename from tests/integration/ha/test_ha.py rename to tests/integration/ha/test_replication.py index 94c69417..959bd5c4 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_replication.py @@ -6,11 +6,14 @@ import logging import pytest -from helpers import ( +from ha_helpers import ( APP_NAME, REL_NAME_ADMIN, ZK_NAME, check_logs, + get_kafka_zk_relation_data, + get_topic_leader, + kill_unit_process, produce_and_check_logs, ) from pytest_operator.plugin import OpsTest @@ -22,12 +25,17 @@ @pytest.mark.abort_on_fail -async def test_build_and_deploy(ops_test: OpsTest, kafka_charm): +async def test_build_and_deploy(ops_test: OpsTest, kafka_charm, app_charm): await asyncio.gather( ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1, series="jammy"), ops_test.model.deploy(ZK_NAME, channel="edge", num_units=1, series="jammy"), + ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"), + ) + await ops_test.model.wait_for_idle( + apps=[APP_NAME, ZK_NAME, DUMMY_NAME], + idle_period=30, + timeout=3600, ) - await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30, timeout=3600) assert ops_test.model.applications[APP_NAME].status == "blocked" assert ops_test.model.applications[ZK_NAME].status == "active" @@ -37,8 +45,14 @@ async def test_build_and_deploy(ops_test: OpsTest, kafka_charm): assert ops_test.model.applications[APP_NAME].status == "active" assert ops_test.model.applications[ZK_NAME].status == "active" + await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}") + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME], idle_period=30) + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[DUMMY_NAME].status == "active" + -async def test_second_cluster(ops_test: OpsTest, kafka_charm, app_charm): +@pytest.mark.skip +async def test_second_cluster(ops_test: OpsTest, kafka_charm): second_kafka_name = f"{APP_NAME}-two" second_zk_name = f"{ZK_NAME}-two" @@ -49,23 +63,18 @@ async def test_second_cluster(ops_test: OpsTest, kafka_charm, app_charm): ops_test.model.deploy( ZK_NAME, channel="edge", application_name=second_zk_name, num_units=1, series="jammy" ), - ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"), ) await ops_test.model.wait_for_idle( - apps=[second_kafka_name, second_zk_name, DUMMY_NAME], + apps=[second_kafka_name, second_zk_name], idle_period=30, timeout=3600, ) assert ops_test.model.applications[second_kafka_name].status == "blocked" await ops_test.model.add_relation(second_kafka_name, second_zk_name) - - # Relate "app" to the *first* cluster - await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}") - await ops_test.model.wait_for_idle( - apps=[second_kafka_name, second_zk_name, DUMMY_NAME, APP_NAME], + apps=[second_kafka_name, second_zk_name, APP_NAME], idle_period=30, ) @@ -83,3 +92,52 @@ async def test_second_cluster(ops_test: OpsTest, kafka_charm, app_charm): kafka_unit_name=f"{second_kafka_name}/0", topic="hot-topic", ) + + +async def test_replicated_events(ops_test: OpsTest): + await ops_test.model.applications[APP_NAME].add_units(count=2) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="active", timeout=600, idle_period=120, wait_for_exact_units=3 + ) + + produce_and_check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{APP_NAME}/0", + provider_unit_name=f"{DUMMY_NAME}/0", + topic="replicated-topic", + ) + # check logs in the two remaining units + check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{APP_NAME}/1", + topic="replicated-topic", + ) + check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{APP_NAME}/2", + topic="replicated-topic", + ) + + +async def test_remove_topic_leader(ops_test: OpsTest): + relation_data = get_kafka_zk_relation_data( + unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name + ) + uri = relation_data["uris"].split(",")[-1] + + leader_num = get_topic_leader( + model_full_name=ops_test.model_full_name, + zookeeper_uri=uri, + topic="replicated-topic", + ) + + await kill_unit_process( + ops_test=ops_test, unit_name=f"{APP_NAME}/{leader_num}", kill_code="SIGKILL" + ) + # Check that is still possible to write to the same topic. + produce_and_check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{APP_NAME}/0", + provider_unit_name=f"{DUMMY_NAME}/0", + topic="replicated-topic", + ) diff --git a/tox.ini b/tox.ini index 5342d8bd..73a85d70 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ set_env = scaling: TEST_FILE=test_scaling.py password-rotation: TEST_FILE=test_password_rotation.py tls: TEST_FILE=test_tls.py - ha: TEST_FILE=test_ha.py + replication: TEST_FILE=test_replication.py pass_env = PYTHONPATH @@ -95,7 +95,7 @@ commands = poetry install --with integration poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/{env:TEST_FILE} -[testenv:integration-ha-{ha}] +[testenv:integration-ha-{replication}] description = Run integration tests for high availability pass_env = {[testenv]pass_env} From ca80901684698824a6a617ad5acde8994673e99f Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Wed, 6 Sep 2023 17:22:17 +0200 Subject: [PATCH 07/21] fix kafka command --- tests/integration/ha/ha_helpers.py | 20 ++++++++++++++++---- tests/integration/ha/test_replication.py | 12 +----------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py index 075bbb82..4a8fba98 100644 --- a/tests/integration/ha/ha_helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -12,6 +12,7 @@ from kafka.admin import NewTopic from pytest_operator.plugin import OpsTest +from literals import SECURITY_PROTOCOL_PORTS from snap import KafkaSnap METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) @@ -41,16 +42,20 @@ def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str return zk_relation_data -def get_topic_leader(model_full_name: str, zookeeper_uri: str, topic: str) -> int: +async def get_topic_leader(ops_test: OpsTest, topic: str) -> int: """Get the broker with the topic leader. Args: - model_full_name: the full name of the model - zookeeper_uri: uri from zookeeper + ops_test: OpsTest utility class topic: the desired topic to check """ + bootstrap_server = ( + await get_address(ops_test=ops_test) + + f":{SECURITY_PROTOCOL_PORTS['SASL_PLAINTEXT'].client}" + ) + result = check_output( - f"JUJU_MODEL={model_full_name} juju ssh kafka/0 sudo -i 'charmed-kafka.topics --describe --zookeeper {zookeeper_uri} --describe --topic {topic}'", + f"JUJU_MODEL={ops_test.model_full_name} juju ssh kafka/0 sudo -i 'charmed-kafka.topics --bootstrap-server {bootstrap_server} --command-config {KafkaSnap.CONF_PATH}/client.properties --describe --topic {topic}'", stderr=PIPE, shell=True, universal_newlines=True, @@ -197,3 +202,10 @@ def get_application_name(ops_test: OpsTest, application_name_substring: str) -> return application return None + + +async def get_address(ops_test: OpsTest, app_name=APP_NAME, unit_num=0) -> str: + """Get the address for a unit.""" + status = await ops_test.model.get_status() # noqa: F821 + address = status["applications"][app_name]["units"][f"{app_name}/{unit_num}"]["public-address"] + return address diff --git a/tests/integration/ha/test_replication.py b/tests/integration/ha/test_replication.py index 959bd5c4..2ea42c94 100644 --- a/tests/integration/ha/test_replication.py +++ b/tests/integration/ha/test_replication.py @@ -11,7 +11,6 @@ REL_NAME_ADMIN, ZK_NAME, check_logs, - get_kafka_zk_relation_data, get_topic_leader, kill_unit_process, produce_and_check_logs, @@ -120,16 +119,7 @@ async def test_replicated_events(ops_test: OpsTest): async def test_remove_topic_leader(ops_test: OpsTest): - relation_data = get_kafka_zk_relation_data( - unit_name=f"{APP_NAME}/0", model_full_name=ops_test.model_full_name - ) - uri = relation_data["uris"].split(",")[-1] - - leader_num = get_topic_leader( - model_full_name=ops_test.model_full_name, - zookeeper_uri=uri, - topic="replicated-topic", - ) + leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic") await kill_unit_process( ops_test=ops_test, unit_name=f"{APP_NAME}/{leader_num}", kill_code="SIGKILL" From 8db9f39cfa05f48883987703315a949f73f73de6 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Wed, 6 Sep 2023 22:25:31 +0200 Subject: [PATCH 08/21] fix produce logs --- tests/integration/ha/ha_helpers.py | 10 ++++++++-- tests/integration/ha/test_replication.py | 1 + 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py index 4a8fba98..f0d0d3c2 100644 --- a/tests/integration/ha/ha_helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -81,7 +81,11 @@ async def kill_unit_process( def produce_and_check_logs( - model_full_name: str, kafka_unit_name: str, provider_unit_name: str, topic: str + model_full_name: str, + kafka_unit_name: str, + provider_unit_name: str, + topic: str, + create_topic: bool = True, ) -> None: """Produces messages from HN to chosen Kafka topic. @@ -90,6 +94,7 @@ def produce_and_check_logs( kafka_unit_name: the kafka unit to checks logs on provider_unit_name: the app to grab credentials from topic: the desired topic to produce to + create_topic: if the topic needs to be created Raises: KeyError: if missing relation data @@ -121,7 +126,8 @@ def produce_and_check_logs( replication_factor=1, ) - client.create_topic(topic=topic_config) + if create_topic: + client.create_topic(topic=topic_config) for i in range(15): message = f"Message #{i}" client.produce_message(topic_name=topic, message_content=message) diff --git a/tests/integration/ha/test_replication.py b/tests/integration/ha/test_replication.py index 2ea42c94..af8bc98b 100644 --- a/tests/integration/ha/test_replication.py +++ b/tests/integration/ha/test_replication.py @@ -130,4 +130,5 @@ async def test_remove_topic_leader(ops_test: OpsTest): kafka_unit_name=f"{APP_NAME}/0", provider_unit_name=f"{DUMMY_NAME}/0", topic="replicated-topic", + create_topic=False, ) From 239f71395101077bf12612610cae05157f85deec Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Thu, 7 Sep 2023 09:59:48 +0200 Subject: [PATCH 09/21] general fixes --- tests/integration/ha/ha_helpers.py | 14 +++++++++----- tests/integration/ha/test_replication.py | 10 +++++++--- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py index f0d0d3c2..12e134ac 100644 --- a/tests/integration/ha/ha_helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -86,6 +86,8 @@ def produce_and_check_logs( provider_unit_name: str, topic: str, create_topic: bool = True, + replication_factor: int = 1, + num_partitions: int = 5, ) -> None: """Produces messages from HN to chosen Kafka topic. @@ -95,6 +97,8 @@ def produce_and_check_logs( provider_unit_name: the app to grab credentials from topic: the desired topic to produce to create_topic: if the topic needs to be created + replication_factor: replication factor of the created topic + num_partitions: number of partitions for the topic Raises: KeyError: if missing relation data @@ -120,13 +124,13 @@ def produce_and_check_logs( password=password, security_protocol=security_protocol, ) - topic_config = NewTopic( - name=topic, - num_partitions=5, - replication_factor=1, - ) if create_topic: + topic_config = NewTopic( + name=topic, + num_partitions=num_partitions, + replication_factor=replication_factor, + ) client.create_topic(topic=topic_config) for i in range(15): message = f"Message #{i}" diff --git a/tests/integration/ha/test_replication.py b/tests/integration/ha/test_replication.py index af8bc98b..424fb03c 100644 --- a/tests/integration/ha/test_replication.py +++ b/tests/integration/ha/test_replication.py @@ -4,6 +4,7 @@ import asyncio import logging +import time import pytest from ha_helpers import ( @@ -50,7 +51,6 @@ async def test_build_and_deploy(ops_test: OpsTest, kafka_charm, app_charm): assert ops_test.model.applications[DUMMY_NAME].status == "active" -@pytest.mark.skip async def test_second_cluster(ops_test: OpsTest, kafka_charm): second_kafka_name = f"{APP_NAME}-two" second_zk_name = f"{ZK_NAME}-two" @@ -98,12 +98,14 @@ async def test_replicated_events(ops_test: OpsTest): await ops_test.model.wait_for_idle( apps=[APP_NAME], status="active", timeout=600, idle_period=120, wait_for_exact_units=3 ) - + logger.info("Producing messages and checking on all units") produce_and_check_logs( model_full_name=ops_test.model_full_name, kafka_unit_name=f"{APP_NAME}/0", provider_unit_name=f"{DUMMY_NAME}/0", topic="replicated-topic", + replication_factor=3, + num_partitions=1, ) # check logs in the two remaining units check_logs( @@ -120,10 +122,12 @@ async def test_replicated_events(ops_test: OpsTest): async def test_remove_topic_leader(ops_test: OpsTest): leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic") - + logger.info(f"Killing broker of leader for topic 'replicated-topic': {leader_num}") await kill_unit_process( ops_test=ops_test, unit_name=f"{APP_NAME}/{leader_num}", kill_code="SIGKILL" ) + # Give time for the service to restart + time.sleep(15) # Check that is still possible to write to the same topic. produce_and_check_logs( model_full_name=ops_test.model_full_name, From 2c42417d2ef9ea88e3bab6e245df1f882904a9d7 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Thu, 7 Sep 2023 18:22:04 +0200 Subject: [PATCH 10/21] add pr feedback and remove unused files --- .github/workflows/ci.yaml | 2 +- tests/integration/ha/ha_helpers.py | 24 ++-- tests/integration/ha/helpers.py | 129 --------------------- tests/integration/ha/test_ha.py | 78 +++++++++++-- tests/integration/ha/test_replication.py | 138 ----------------------- tox.ini | 4 +- 6 files changed, 78 insertions(+), 297 deletions(-) delete mode 100644 tests/integration/ha/helpers.py delete mode 100644 tests/integration/ha/test_replication.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 27565426..d848ddb1 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -96,7 +96,7 @@ jobs: fail-fast: false matrix: tox-environments: - - integration-ha-replication + - integration-ha name: ${{ matrix.tox-environments }} needs: - lint diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py index 12e134ac..110262c8 100644 --- a/tests/integration/ha/ha_helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -39,6 +39,7 @@ def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str zk_relation_data["uris"] = info["application-data"]["uris"] zk_relation_data["username"] = info["application-data"]["username"] zk_relation_data["tls"] = info["application-data"]["tls"] + break return zk_relation_data @@ -64,14 +65,14 @@ async def get_topic_leader(ops_test: OpsTest, topic: str) -> int: return re.search(r"Leader: (\d+)", result)[1] -async def kill_unit_process( +async def send_control_signal( ops_test: OpsTest, unit_name: str, kill_code: str, app_name: str = APP_NAME ) -> None: - if len(ops_test.model.applications[app_name].units) < 2: + if len(ops_test.model.applications[app_name].units) < 3: await ops_test.model.applications[app_name].add_unit(count=1) await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) - kill_cmd = f"run --unit {unit_name} -- pkill --signal {kill_code} -f {PROCESS}" + kill_cmd = f"exec --unit {unit_name} -- pkill --signal {kill_code} -f {PROCESS}" return_code, _, _ = await ops_test.juju(*kill_cmd.split()) if return_code != 0: @@ -109,20 +110,11 @@ def produce_and_check_logs( model_full_name=model_full_name, endpoint="kafka-client-admin", ) - topic = topic - username = relation_data.get("username", None) - password = relation_data.get("password", None) - servers = relation_data.get("endpoints", "").split(",") - security_protocol = "SASL_PLAINTEXT" - - if not (username and password and servers): - raise KeyError("missing relation data from app charm") - client = KafkaClient( - servers=servers, - username=username, - password=password, - security_protocol=security_protocol, + servers=relation_data["servers"].split(","), + username=relation_data["username"], + password=relation_data["password"], + security_protocol="SASL_PLAINTEXT", ) if create_topic: diff --git a/tests/integration/ha/helpers.py b/tests/integration/ha/helpers.py deleted file mode 100644 index d8123d28..00000000 --- a/tests/integration/ha/helpers.py +++ /dev/null @@ -1,129 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. -import logging -from pathlib import Path -from subprocess import PIPE, check_output -from typing import Any, Dict - -import yaml -from charms.kafka.v0.client import KafkaClient -from kafka.admin import NewTopic - -from snap import KafkaSnap - -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -APP_NAME = METADATA["name"] -ZK_NAME = "zookeeper" -REL_NAME_ADMIN = "kafka-client-admin" - -logger = logging.getLogger(__name__) - - -def produce_and_check_logs( - model_full_name: str, kafka_unit_name: str, provider_unit_name: str, topic: str -) -> None: - """Produces messages from HN to chosen Kafka topic. - - Args: - model_full_name: the full name of the model - kafka_unit_name: the kafka unit to checks logs on - provider_unit_name: the app to grab credentials from - topic: the desired topic to produce to - - Raises: - KeyError: if missing relation data - AssertionError: if logs aren't found for desired topic - """ - relation_data = get_provider_data( - unit_name=provider_unit_name, - model_full_name=model_full_name, - endpoint="kafka-client-admin", - ) - topic = topic - username = relation_data.get("username", None) - password = relation_data.get("password", None) - servers = relation_data.get("endpoints", "").split(",") - security_protocol = "SASL_PLAINTEXT" - - if not (username and password and servers): - raise KeyError("missing relation data from app charm") - - client = KafkaClient( - servers=servers, - username=username, - password=password, - security_protocol=security_protocol, - ) - topic_config = NewTopic( - name=topic, - num_partitions=5, - replication_factor=1, - ) - - client.create_topic(topic=topic_config) - for i in range(15): - message = f"Message #{i}" - client.produce_message(topic_name=topic, message_content=message) - - check_logs(model_full_name, kafka_unit_name, topic) - - -def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None: - """Checks if messages for a topic have been produced. - - Args: - model_full_name: the full name of the model - kafka_unit_name: the kafka unit to checks logs on - topic: the desired topic to check - """ - logs = check_output( - f"JUJU_MODEL={model_full_name} juju ssh {kafka_unit_name} sudo -i 'find {KafkaSnap.DATA_PATH}/data'", - stderr=PIPE, - shell=True, - universal_newlines=True, - ).splitlines() - - logger.debug(f"{logs=}") - - passed = False - for log in logs: - if topic and "index" in log: - passed = True - break - - assert passed, "logs not found" - - -def get_provider_data( - unit_name: str, model_full_name: str, endpoint: str = "kafka-client" -) -> Dict[str, str]: - result = show_unit(unit_name=unit_name, model_full_name=model_full_name) - relations_info = result[unit_name]["relation-info"] - logger.info(f"Relation info: {relations_info}") - provider_relation_data = {} - for info in relations_info: - if info["endpoint"] == endpoint: - logger.info(f"Relation data: {info}") - provider_relation_data["username"] = info["application-data"]["username"] - provider_relation_data["password"] = info["application-data"]["password"] - provider_relation_data["endpoints"] = info["application-data"]["endpoints"] - provider_relation_data["zookeeper-uris"] = info["application-data"]["zookeeper-uris"] - provider_relation_data["tls"] = info["application-data"]["tls"] - if "consumer-group-prefix" in info["application-data"]: - provider_relation_data["consumer-group-prefix"] = info["application-data"][ - "consumer-group-prefix" - ] - provider_relation_data["topic"] = info["application-data"]["topic"] - return provider_relation_data - - -def show_unit(unit_name: str, model_full_name: str) -> Any: - result = check_output( - f"JUJU_MODEL={model_full_name} juju show-unit {unit_name}", - stderr=PIPE, - shell=True, - universal_newlines=True, - ) - - return yaml.safe_load(result) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index 94c69417..89b62f08 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -4,14 +4,17 @@ import asyncio import logging +import time import pytest -from helpers import ( +from ha_helpers import ( APP_NAME, REL_NAME_ADMIN, ZK_NAME, check_logs, + get_topic_leader, produce_and_check_logs, + send_control_signal, ) from pytest_operator.plugin import OpsTest @@ -22,12 +25,17 @@ @pytest.mark.abort_on_fail -async def test_build_and_deploy(ops_test: OpsTest, kafka_charm): +async def test_build_and_deploy(ops_test: OpsTest, kafka_charm, app_charm): await asyncio.gather( ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1, series="jammy"), ops_test.model.deploy(ZK_NAME, channel="edge", num_units=1, series="jammy"), + ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"), + ) + await ops_test.model.wait_for_idle( + apps=[APP_NAME, ZK_NAME, DUMMY_NAME], + idle_period=30, + timeout=3600, ) - await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30, timeout=3600) assert ops_test.model.applications[APP_NAME].status == "blocked" assert ops_test.model.applications[ZK_NAME].status == "active" @@ -37,8 +45,61 @@ async def test_build_and_deploy(ops_test: OpsTest, kafka_charm): assert ops_test.model.applications[APP_NAME].status == "active" assert ops_test.model.applications[ZK_NAME].status == "active" + await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}") + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME], idle_period=30) + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[DUMMY_NAME].status == "active" + -async def test_second_cluster(ops_test: OpsTest, kafka_charm, app_charm): +async def test_replicated_events(ops_test: OpsTest): + await ops_test.model.applications[APP_NAME].add_units(count=2) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="active", timeout=600, idle_period=120, wait_for_exact_units=3 + ) + logger.info("Producing messages and checking on all units") + produce_and_check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{APP_NAME}/0", + provider_unit_name=f"{DUMMY_NAME}/0", + topic="replicated-topic", + replication_factor=3, + num_partitions=1, + ) + # check logs in the two remaining units + check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{APP_NAME}/1", + topic="replicated-topic", + ) + check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{APP_NAME}/2", + topic="replicated-topic", + ) + + +async def test_kill_broker_with_topic_leader(ops_test: OpsTest): + initial_leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic") + logger.info(f"Killing broker of leader for topic 'replicated-topic': {leader_num}") + await send_control_signal( + ops_test=ops_test, unit_name=f"{APP_NAME}/{leader_num}", kill_code="SIGKILL" + ) + # Give time for the service to restart + time.sleep(15) + # Check that is still possible to write to the same topic. + final_leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic") + assert initial_leader_num != final_leader_num + + produce_and_check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{APP_NAME}/0", + provider_unit_name=f"{DUMMY_NAME}/0", + topic="replicated-topic", + create_topic=False, + ) + + +async def test_multi_cluster_isolation(ops_test: OpsTest, kafka_charm): second_kafka_name = f"{APP_NAME}-two" second_zk_name = f"{ZK_NAME}-two" @@ -49,23 +110,18 @@ async def test_second_cluster(ops_test: OpsTest, kafka_charm, app_charm): ops_test.model.deploy( ZK_NAME, channel="edge", application_name=second_zk_name, num_units=1, series="jammy" ), - ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"), ) await ops_test.model.wait_for_idle( - apps=[second_kafka_name, second_zk_name, DUMMY_NAME], + apps=[second_kafka_name, second_zk_name], idle_period=30, timeout=3600, ) assert ops_test.model.applications[second_kafka_name].status == "blocked" await ops_test.model.add_relation(second_kafka_name, second_zk_name) - - # Relate "app" to the *first* cluster - await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}") - await ops_test.model.wait_for_idle( - apps=[second_kafka_name, second_zk_name, DUMMY_NAME, APP_NAME], + apps=[second_kafka_name, second_zk_name, APP_NAME], idle_period=30, ) diff --git a/tests/integration/ha/test_replication.py b/tests/integration/ha/test_replication.py deleted file mode 100644 index 424fb03c..00000000 --- a/tests/integration/ha/test_replication.py +++ /dev/null @@ -1,138 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. - -import asyncio -import logging -import time - -import pytest -from ha_helpers import ( - APP_NAME, - REL_NAME_ADMIN, - ZK_NAME, - check_logs, - get_topic_leader, - kill_unit_process, - produce_and_check_logs, -) -from pytest_operator.plugin import OpsTest - -logger = logging.getLogger(__name__) - - -DUMMY_NAME = "app" - - -@pytest.mark.abort_on_fail -async def test_build_and_deploy(ops_test: OpsTest, kafka_charm, app_charm): - await asyncio.gather( - ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1, series="jammy"), - ops_test.model.deploy(ZK_NAME, channel="edge", num_units=1, series="jammy"), - ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"), - ) - await ops_test.model.wait_for_idle( - apps=[APP_NAME, ZK_NAME, DUMMY_NAME], - idle_period=30, - timeout=3600, - ) - assert ops_test.model.applications[APP_NAME].status == "blocked" - assert ops_test.model.applications[ZK_NAME].status == "active" - - await ops_test.model.add_relation(APP_NAME, ZK_NAME) - async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30) - assert ops_test.model.applications[APP_NAME].status == "active" - assert ops_test.model.applications[ZK_NAME].status == "active" - - await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}") - await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME], idle_period=30) - assert ops_test.model.applications[APP_NAME].status == "active" - assert ops_test.model.applications[DUMMY_NAME].status == "active" - - -async def test_second_cluster(ops_test: OpsTest, kafka_charm): - second_kafka_name = f"{APP_NAME}-two" - second_zk_name = f"{ZK_NAME}-two" - - await asyncio.gather( - ops_test.model.deploy( - kafka_charm, application_name=second_kafka_name, num_units=1, series="jammy" - ), - ops_test.model.deploy( - ZK_NAME, channel="edge", application_name=second_zk_name, num_units=1, series="jammy" - ), - ) - - await ops_test.model.wait_for_idle( - apps=[second_kafka_name, second_zk_name], - idle_period=30, - timeout=3600, - ) - assert ops_test.model.applications[second_kafka_name].status == "blocked" - - await ops_test.model.add_relation(second_kafka_name, second_zk_name) - await ops_test.model.wait_for_idle( - apps=[second_kafka_name, second_zk_name, APP_NAME], - idle_period=30, - ) - - produce_and_check_logs( - model_full_name=ops_test.model_full_name, - kafka_unit_name=f"{APP_NAME}/0", - provider_unit_name=f"{DUMMY_NAME}/0", - topic="hot-topic", - ) - - # Check that logs are not found on the second cluster - with pytest.raises(AssertionError): - check_logs( - model_full_name=ops_test.model_full_name, - kafka_unit_name=f"{second_kafka_name}/0", - topic="hot-topic", - ) - - -async def test_replicated_events(ops_test: OpsTest): - await ops_test.model.applications[APP_NAME].add_units(count=2) - await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="active", timeout=600, idle_period=120, wait_for_exact_units=3 - ) - logger.info("Producing messages and checking on all units") - produce_and_check_logs( - model_full_name=ops_test.model_full_name, - kafka_unit_name=f"{APP_NAME}/0", - provider_unit_name=f"{DUMMY_NAME}/0", - topic="replicated-topic", - replication_factor=3, - num_partitions=1, - ) - # check logs in the two remaining units - check_logs( - model_full_name=ops_test.model_full_name, - kafka_unit_name=f"{APP_NAME}/1", - topic="replicated-topic", - ) - check_logs( - model_full_name=ops_test.model_full_name, - kafka_unit_name=f"{APP_NAME}/2", - topic="replicated-topic", - ) - - -async def test_remove_topic_leader(ops_test: OpsTest): - leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic") - logger.info(f"Killing broker of leader for topic 'replicated-topic': {leader_num}") - await kill_unit_process( - ops_test=ops_test, unit_name=f"{APP_NAME}/{leader_num}", kill_code="SIGKILL" - ) - # Give time for the service to restart - time.sleep(15) - # Check that is still possible to write to the same topic. - produce_and_check_logs( - model_full_name=ops_test.model_full_name, - kafka_unit_name=f"{APP_NAME}/0", - provider_unit_name=f"{DUMMY_NAME}/0", - topic="replicated-topic", - create_topic=False, - ) diff --git a/tox.ini b/tox.ini index 73a85d70..8d505e1f 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ set_env = scaling: TEST_FILE=test_scaling.py password-rotation: TEST_FILE=test_password_rotation.py tls: TEST_FILE=test_tls.py - replication: TEST_FILE=test_replication.py + ha: TEST_FILE=test_ha.py pass_env = PYTHONPATH @@ -95,7 +95,7 @@ commands = poetry install --with integration poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/{env:TEST_FILE} -[testenv:integration-ha-{replication}] +[testenv:integration-{ha}] description = Run integration tests for high availability pass_env = {[testenv]pass_env} From 4989b11d5cc3143918fcc6a040d3c6f6964f38c1 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Thu, 7 Sep 2023 18:26:15 +0200 Subject: [PATCH 11/21] fix test --- tests/integration/ha/test_ha.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index 89b62f08..d3a3e284 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -80,9 +80,9 @@ async def test_replicated_events(ops_test: OpsTest): async def test_kill_broker_with_topic_leader(ops_test: OpsTest): initial_leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic") - logger.info(f"Killing broker of leader for topic 'replicated-topic': {leader_num}") + logger.info(f"Killing broker of leader for topic 'replicated-topic': {initial_leader_num}") await send_control_signal( - ops_test=ops_test, unit_name=f"{APP_NAME}/{leader_num}", kill_code="SIGKILL" + ops_test=ops_test, unit_name=f"{APP_NAME}/{initial_leader_num}", kill_code="SIGKILL" ) # Give time for the service to restart time.sleep(15) From cc35140027402c4ac107e97e4d486c9f69d06809 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Thu, 7 Sep 2023 21:51:32 +0200 Subject: [PATCH 12/21] fix --- tests/integration/ha/ha_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py index 110262c8..2e7c27b1 100644 --- a/tests/integration/ha/ha_helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -111,7 +111,7 @@ def produce_and_check_logs( endpoint="kafka-client-admin", ) client = KafkaClient( - servers=relation_data["servers"].split(","), + servers=relation_data["endpoints"].split(","), username=relation_data["username"], password=relation_data["password"], security_protocol="SASL_PLAINTEXT", From e02dd84fec18b92ed3fee1b1834a6fd69f8b0953 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Fri, 8 Sep 2023 12:15:28 +0200 Subject: [PATCH 13/21] restructure tests --- tests/integration/ha/conftest.py | 26 --- tests/integration/ha/continuous_writes.py | 0 tests/integration/ha/ha_helpers.py | 184 +++------------------- tests/integration/ha/test_ha.py | 19 ++- tests/integration/helpers.py | 42 ++--- 5 files changed, 60 insertions(+), 211 deletions(-) delete mode 100644 tests/integration/ha/conftest.py create mode 100644 tests/integration/ha/continuous_writes.py diff --git a/tests/integration/ha/conftest.py b/tests/integration/ha/conftest.py deleted file mode 100644 index e6fabf1d..00000000 --- a/tests/integration/ha/conftest.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. - -import pytest -from ha_helpers import TEST_APP, get_application_name -from pytest_operator.plugin import OpsTest - - -@pytest.fixture() -async def continuous_writes(ops_test: OpsTest): - """Starts continuous writes to the MySQL cluster for a test and clear the writes at the end.""" - application_name = get_application_name(ops_test, TEST_APP) - - application_unit = ops_test.model.applications[application_name].units[0] - - clear_writes_action = await application_unit.run_action("clear-continuous-writes") - await clear_writes_action.wait() - - start_writes_action = await application_unit.run_action("start-continuous-writes") - await start_writes_action.wait() - - yield - - clear_writes_action = await application_unit.run_action("clear-continuous-writes") - await clear_writes_action.wait() diff --git a/tests/integration/ha/continuous_writes.py b/tests/integration/ha/continuous_writes.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py index 2e7c27b1..4163f991 100644 --- a/tests/integration/ha/ha_helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -3,46 +3,20 @@ # See LICENSE file for licensing details. import logging import re -from pathlib import Path from subprocess import PIPE, check_output -from typing import Any, Dict -import yaml -from charms.kafka.v0.client import KafkaClient -from kafka.admin import NewTopic from pytest_operator.plugin import OpsTest +from tests.integration.helpers import APP_NAME, get_address from literals import SECURITY_PROTOCOL_PORTS from snap import KafkaSnap -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -APP_NAME = METADATA["name"] -ZK_NAME = "zookeeper" -REL_NAME_ADMIN = "kafka-client-admin" -TEST_APP = "kafka-test-app" - +DUMMY_NAME = "app" PROCESS = "kafka.Kafka" logger = logging.getLogger(__name__) -def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str, str]: - result = show_unit(unit_name=unit_name, model_full_name=model_full_name) - relations_info = result[unit_name]["relation-info"] - - zk_relation_data = {} - for info in relations_info: - if info["endpoint"] == "zookeeper": - zk_relation_data["chroot"] = info["application-data"]["chroot"] - zk_relation_data["endpoints"] = info["application-data"]["endpoints"] - zk_relation_data["password"] = info["application-data"]["password"] - zk_relation_data["uris"] = info["application-data"]["uris"] - zk_relation_data["username"] = info["application-data"]["username"] - zk_relation_data["tls"] = info["application-data"]["tls"] - break - return zk_relation_data - - async def get_topic_leader(ops_test: OpsTest, topic: str) -> int: """Get the broker with the topic leader. @@ -65,149 +39,41 @@ async def get_topic_leader(ops_test: OpsTest, topic: str) -> int: return re.search(r"Leader: (\d+)", result)[1] -async def send_control_signal( - ops_test: OpsTest, unit_name: str, kill_code: str, app_name: str = APP_NAME -) -> None: - if len(ops_test.model.applications[app_name].units) < 3: - await ops_test.model.applications[app_name].add_unit(count=1) - await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) - - kill_cmd = f"exec --unit {unit_name} -- pkill --signal {kill_code} -f {PROCESS}" - return_code, _, _ = await ops_test.juju(*kill_cmd.split()) - - if return_code != 0: - raise Exception( - f"Expected kill command {kill_cmd} to succeed instead it failed: {return_code}" - ) - - -def produce_and_check_logs( - model_full_name: str, - kafka_unit_name: str, - provider_unit_name: str, - topic: str, - create_topic: bool = True, - replication_factor: int = 1, - num_partitions: int = 5, -) -> None: - """Produces messages from HN to chosen Kafka topic. +async def get_topic_offsets(ops_test: OpsTest, topic: str, unit_name: str) -> list[str]: + """Get the offsets of a topic on a unit. Args: - model_full_name: the full name of the model - kafka_unit_name: the kafka unit to checks logs on - provider_unit_name: the app to grab credentials from - topic: the desired topic to produce to - create_topic: if the topic needs to be created - replication_factor: replication factor of the created topic - num_partitions: number of partitions for the topic - - Raises: - KeyError: if missing relation data - AssertionError: if logs aren't found for desired topic + ops_test: OpsTest utility class + topic: the desired topic to check + unit_name: unit to check the offsets on """ - relation_data = get_provider_data( - unit_name=provider_unit_name, - model_full_name=model_full_name, - endpoint="kafka-client-admin", - ) - client = KafkaClient( - servers=relation_data["endpoints"].split(","), - username=relation_data["username"], - password=relation_data["password"], - security_protocol="SASL_PLAINTEXT", + bootstrap_server = ( + await get_address(ops_test=ops_test) + + f":{SECURITY_PROTOCOL_PORTS['SASL_PLAINTEXT'].client}" ) - if create_topic: - topic_config = NewTopic( - name=topic, - num_partitions=num_partitions, - replication_factor=replication_factor, - ) - client.create_topic(topic=topic_config) - for i in range(15): - message = f"Message #{i}" - client.produce_message(topic_name=topic, message_content=message) - - check_logs(model_full_name, kafka_unit_name, topic) - - -def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None: - """Checks if messages for a topic have been produced. - - Args: - model_full_name: the full name of the model - kafka_unit_name: the kafka unit to checks logs on - topic: the desired topic to check - """ - logs = check_output( - f"JUJU_MODEL={model_full_name} juju ssh {kafka_unit_name} sudo -i 'find {KafkaSnap.DATA_PATH}/data'", - stderr=PIPE, - shell=True, - universal_newlines=True, - ).splitlines() - - logger.debug(f"{logs=}") - - passed = False - for log in logs: - if topic and "index" in log: - passed = True - break - - assert passed, "logs not found" - - -def get_provider_data( - unit_name: str, model_full_name: str, endpoint: str = "kafka-client" -) -> Dict[str, str]: - result = show_unit(unit_name=unit_name, model_full_name=model_full_name) - relations_info = result[unit_name]["relation-info"] - logger.info(f"Relation info: {relations_info}") - provider_relation_data = {} - for info in relations_info: - if info["endpoint"] == endpoint: - logger.info(f"Relation data: {info}") - provider_relation_data["username"] = info["application-data"]["username"] - provider_relation_data["password"] = info["application-data"]["password"] - provider_relation_data["endpoints"] = info["application-data"]["endpoints"] - provider_relation_data["zookeeper-uris"] = info["application-data"]["zookeeper-uris"] - provider_relation_data["tls"] = info["application-data"]["tls"] - if "consumer-group-prefix" in info["application-data"]: - provider_relation_data["consumer-group-prefix"] = info["application-data"][ - "consumer-group-prefix" - ] - provider_relation_data["topic"] = info["application-data"]["topic"] - return provider_relation_data - - -def show_unit(unit_name: str, model_full_name: str) -> Any: result = check_output( - f"JUJU_MODEL={model_full_name} juju show-unit {unit_name}", + f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.get-offsets --bootstrap-server {bootstrap_server} --command-config {KafkaSnap.CONF_PATH}/client.properties --topic {topic}'", stderr=PIPE, shell=True, universal_newlines=True, ) - return yaml.safe_load(result) + # example of topic offset output: 'test-topic:0:10' + return re.search(rf"{topic}:(\d+:\d+)", result)[1].split(":") -def get_application_name(ops_test: OpsTest, application_name_substring: str) -> str: - """Returns the name of the application with the provided application name. - - This enables us to retrieve the name of the deployed application in an existing model. - - Note: if multiple applications with the application name exist, - the first one found will be returned. - """ - for application in ops_test.model.applications: - if application_name_substring in application: - return application - - return None +async def send_control_signal( + ops_test: OpsTest, unit_name: str, kill_code: str, app_name: str = APP_NAME +) -> None: + if len(ops_test.model.applications[app_name].units) < 3: + await ops_test.model.applications[app_name].add_unit(count=1) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) + kill_cmd = f"exec --unit {unit_name} -- pkill --signal {kill_code} -f {PROCESS}" + return_code, _, _ = await ops_test.juju(*kill_cmd.split()) -async def get_address(ops_test: OpsTest, app_name=APP_NAME, unit_num=0) -> str: - """Get the address for a unit.""" - status = await ops_test.model.get_status() # noqa: F821 - address = status["applications"][app_name]["units"][f"{app_name}/{unit_num}"]["public-address"] - return address + if return_code != 0: + raise Exception( + f"Expected kill command {kill_cmd} to succeed instead it failed: {return_code}" + ) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index d3a3e284..9ede7fc8 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -8,22 +8,23 @@ import pytest from ha_helpers import ( + DUMMY_NAME, + get_topic_leader, + get_topic_offsets, + send_control_signal, +) +from pytest_operator.plugin import OpsTest +from tests.integration.helpers import ( APP_NAME, REL_NAME_ADMIN, ZK_NAME, check_logs, - get_topic_leader, produce_and_check_logs, - send_control_signal, ) -from pytest_operator.plugin import OpsTest logger = logging.getLogger(__name__) -DUMMY_NAME = "app" - - @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest, kafka_charm, app_charm): await asyncio.gather( @@ -71,11 +72,17 @@ async def test_replicated_events(ops_test: OpsTest): kafka_unit_name=f"{APP_NAME}/1", topic="replicated-topic", ) + assert get_topic_offsets( + ops_test=ops_test, topic="replicated-topic", unit_name=f"{APP_NAME}/1" + ) == ["0", "15"] check_logs( model_full_name=ops_test.model_full_name, kafka_unit_name=f"{APP_NAME}/2", topic="replicated-topic", ) + assert get_topic_offsets( + ops_test=ops_test, topic="replicated-topic", unit_name=f"{APP_NAME}/2" + ) == ["0", "15"] async def test_kill_broker_with_topic_leader(ops_test: OpsTest): diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 5de01b2a..473beaaf 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -120,6 +120,7 @@ def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str zk_relation_data["uris"] = info["application-data"]["uris"] zk_relation_data["username"] = info["application-data"]["username"] zk_relation_data["tls"] = info["application-data"]["tls"] + break return zk_relation_data @@ -217,7 +218,13 @@ def check_tls(ip: str, port: int) -> bool: def produce_and_check_logs( - model_full_name: str, kafka_unit_name: str, provider_unit_name: str, topic: str + model_full_name: str, + kafka_unit_name: str, + provider_unit_name: str, + topic: str, + create_topic: bool = True, + replication_factor: int = 1, + num_partitions: int = 5, ) -> None: """Produces messages from HN to chosen Kafka topic. @@ -226,6 +233,9 @@ def produce_and_check_logs( kafka_unit_name: the kafka unit to checks logs on provider_unit_name: the app to grab credentials from topic: the desired topic to produce to + create_topic: if the topic needs to be created + replication_factor: replication factor of the created topic + num_partitions: number of partitions for the topic Raises: KeyError: if missing relation data @@ -236,28 +246,20 @@ def produce_and_check_logs( model_full_name=model_full_name, endpoint="kafka-client-admin", ) - topic = topic - username = relation_data.get("username", None) - password = relation_data.get("password", None) - servers = relation_data.get("endpoints", "").split(",") - security_protocol = "SASL_PLAINTEXT" - - if not (username and password and servers): - raise KeyError("missing relation data from app charm") - client = KafkaClient( - servers=servers, - username=username, - password=password, - security_protocol=security_protocol, - ) - topic_config = NewTopic( - name=topic, - num_partitions=5, - replication_factor=1, + servers=relation_data["endpoints"].split(","), + username=relation_data["username"], + password=relation_data["password"], + security_protocol="SASL_PLAINTEXT", ) - client.create_topic(topic=topic_config) + if create_topic: + topic_config = NewTopic( + name=topic, + num_partitions=num_partitions, + replication_factor=replication_factor, + ) + client.create_topic(topic=topic_config) for i in range(15): message = f"Message #{i}" client.produce_message(topic_name=topic, message_content=message) From d9f6623b65b0ce5f8c597eb423571ea01d7bb4fa Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Fri, 8 Sep 2023 17:02:47 +0200 Subject: [PATCH 14/21] add continuous writes structure --- lib/charms/kafka/v0/client.py | 17 +- tests/integration/ha/continuous_writes.py | 219 ++++++++++++++++++++++ tests/integration/ha/ha_helpers.py | 1 - tests/integration/ha/test_ha.py | 31 +-- tests/integration/helpers.py | 1 + tests/integration/test_charm.py | 2 +- tests/integration/test_scaling.py | 5 - 7 files changed, 256 insertions(+), 20 deletions(-) diff --git a/lib/charms/kafka/v0/client.py b/lib/charms/kafka/v0/client.py index 00db6903..39c732e4 100644 --- a/lib/charms/kafka/v0/client.py +++ b/lib/charms/kafka/v0/client.py @@ -93,7 +93,7 @@ def on_kafka_relation_created(self, event: RelationCreatedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 1 +LIBPATCH = 2 class KafkaClient: @@ -188,10 +188,17 @@ def create_topic(self, topic: NewTopic) -> None: Args: topic: the configuration of the topic to create - """ self._admin_client.create_topics(new_topics=[topic], validate_only=False) + def delete_topics(self, topics: list[str]) -> None: + """Deletes a topic. + + Args: + topics: list of topics to delete + """ + self._admin_client.delete_topics(topics=topics) + def subscribe_to_topic( self, topic_name: str, consumer_group_prefix: Optional[str] = None ) -> None: @@ -243,6 +250,12 @@ def produce_message(self, topic_name: str, message_content: str) -> None: future.get(timeout=60) logger.info(f"Message published to topic={topic_name}, message content: {item_content}") + def close(self) -> None: + """Close the connection to the client.""" + self._admin_client.close() + self._producer_client.close() + self._consumer_client.close() + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Handler for running a Kafka client") diff --git a/tests/integration/ha/continuous_writes.py b/tests/integration/ha/continuous_writes.py index e69de29b..f2221607 100644 --- a/tests/integration/ha/continuous_writes.py +++ b/tests/integration/ha/continuous_writes.py @@ -0,0 +1,219 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +import asyncio +import logging +import os +from multiprocessing import Event, Process, Queue +from types import SimpleNamespace + +from charms.kafka.v0.client import KafkaClient +from kafka.admin import NewTopic +from kafka.errors import KafkaTimeoutError +from pytest_operator.plugin import OpsTest +from tenacity import ( + RetryError, + Retrying, + retry, + stop_after_attempt, + stop_after_delay, + wait_fixed, + wait_random, +) +from tests.integration.helpers import DUMMY_NAME, get_provider_data + +logger = logging.getLogger(__name__) + + +class ContinuousWrites: + """Utility class for managing continuous writes.""" + + TOPIC_NAME = "ha-test-topic" + LAST_WRITTEN_VAL_PATH = "last_written_value" + + def __init__(self, ops_test: OpsTest, app: str): + self._ops_test = ops_test + self._app = app + self._is_stopped = True + self._event = None + self._queue = None + self._process = None + + @retry( + wait=wait_fixed(wait=5) + wait_random(0, 5), + stop=stop_after_attempt(5), + ) + async def start(self) -> None: + """Run continuous writes in the background.""" + if not self._is_stopped: + await self.clear() + + # create topic + self._create_replicated_topic() + + # create process + self._create_process() + + # pass the model full name to the process once it starts + self.update() + + # start writes + self._process.start() + + async def update(self): + """Update cluster related conf. Useful in cases such as scaling, pwd change etc.""" + self._queue.put(SimpleNamespace(model_full_name=self._ops_test.model_full_name)) + + @retry( + wait=wait_fixed(wait=5) + wait_random(0, 5), + stop=stop_after_attempt(5), + ) + async def clear(self) -> None: + """Stop writes and delete the topic.""" + if not self._is_stopped: + await self.stop() + + client = self._client() + try: + client.delete_topics(topics=[self.TOPIC_NAME]) + finally: + client.close() + + @retry( + wait=wait_fixed(wait=5) + wait_random(0, 5), + stop=stop_after_attempt(5), + ) + def consumed_messages(self) -> list: + """Consume the messages in the topic.""" + client = self._client() + try: + client.subscribe_to_topic(topic_name=self.TOPIC_NAME) + return list(client.messages()) + finally: + client.close() + + def _create_replicated_topic(self): + """Create topic with replication_factor = 3.""" + client = self._client() + topic_config = NewTopic( + name=self.TOPIC_NAME, + num_partitions=1, + replication_factor=3, + ) + client.create_topic(topic=topic_config) + + @retry( + wait=wait_fixed(wait=5) + wait_random(0, 5), + stop=stop_after_attempt(5), + ) + async def stop(self) -> SimpleNamespace: + """Stop the continuous writes process and return max inserted ID.""" + if not self._is_stopped: + self._stop_process() + + result = SimpleNamespace() + + # messages count + consumed_messages = self.consumed_messages() + result.count = len(consumed_messages) + result.last_message = consumed_messages[-1] + + # last expected message stored on disk + try: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)): + with attempt: + with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "r") as f: + result.last_expected_message = int(f.read().rstrip()) + except RetryError: + result.last_expected_message = -1 + + return result + + def _create_process(self): + self._is_stopped = False + self._event = Event() + self._queue = Queue() + self._process = Process( + target=ContinuousWrites._run_async, + name="continuous_writes", + args=(self._event, self._queue, 0, True), + ) + + def _stop_process(self): + self._event.set() + self._process.join() + self._queue.close() + self._is_stopped = True + + def _client(self): + """Build a Kafka client.""" + relation_data = get_provider_data( + unit_name=f"{DUMMY_NAME}/0", + model_full_name=self._ops_test.model_full_name, + endpoint="kafka-client-admin", + ) + return KafkaClient( + servers=relation_data["endpoints"].split(","), + username=relation_data["username"], + password=relation_data["password"], + security_protocol="SASL_PLAINTEXT", + ) + + @staticmethod + async def _run(event: Event, data_queue: Queue, starting_number: int) -> None: # noqa: C901 + """Continuous writing.""" + initial_data = data_queue.get(True) + + def _client(): + """Build a Kafka client.""" + relation_data = get_provider_data( + unit_name=f"{DUMMY_NAME}/0", + model_full_name=initial_data.model_full_name, + endpoint="kafka-client-admin", + ) + return KafkaClient( + servers=relation_data["endpoints"].split(","), + username=relation_data["username"], + password=relation_data["password"], + security_protocol="SASL_PLAINTEXT", + ) + + write_value = starting_number + lost_messages = 0 + client = _client() + + while True: + if not data_queue.empty(): # currently evaluates to false as we don't make updates + data_queue.get(False) + client.close() + client = _client() + + try: + ContinuousWrites._produce_message(client, str(write_value)) + except (KafkaTimeoutError, ConnectionRefusedError): + client.close() + client = _client() + lost_messages += 1 + finally: + # process termination requested + if event.is_set(): + break + + write_value += 1 + + # write last expected written value on disk + with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "w") as f: + f.write(f"{str(write_value)},{str(lost_messages)}") + os.fsync(f) + + client.close() + + @staticmethod + def _produce_message(client: KafkaClient, write_value: str) -> None: + """Produce a single message.""" + client.produce_message(topic_name=ContinuousWrites.TOPIC_NAME, message_content=write_value) + + @staticmethod + def _run_async(event: Event, data_queue: Queue, starting_number: int): + """Run async code.""" + asyncio.run(ContinuousWrites._run(event, data_queue, starting_number)) diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py index 4163f991..67d21e39 100644 --- a/tests/integration/ha/ha_helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -11,7 +11,6 @@ from literals import SECURITY_PROTOCOL_PORTS from snap import KafkaSnap -DUMMY_NAME = "app" PROCESS = "kafka.Kafka" logger = logging.getLogger(__name__) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index 9ede7fc8..e12154a0 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -7,15 +7,16 @@ import time import pytest -from ha_helpers import ( - DUMMY_NAME, +from continuous_writes import ContinuousWrites +from pytest_operator.plugin import OpsTest +from tests.integration.ha.ha_helpers import ( get_topic_leader, get_topic_offsets, send_control_signal, ) -from pytest_operator.plugin import OpsTest from tests.integration.helpers import ( APP_NAME, + DUMMY_NAME, REL_NAME_ADMIN, ZK_NAME, check_logs, @@ -25,6 +26,22 @@ logger = logging.getLogger(__name__) +@pytest.fixture() +async def c_writes(ops_test: OpsTest): + """Creates instance of the ContinuousWrites.""" + app = APP_NAME + return ContinuousWrites(ops_test, app) + + +@pytest.fixture() +async def c_writes_runner(ops_test: OpsTest, c_writes: ContinuousWrites): + """Starts continuous write operations and clears writes at the end of the test.""" + await c_writes.start() + yield + await c_writes.clear() + logger.info("\n\n\n\nThe writes have been cleared.\n\n\n\n") + + @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest, kafka_charm, app_charm): await asyncio.gather( @@ -97,14 +114,6 @@ async def test_kill_broker_with_topic_leader(ops_test: OpsTest): final_leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic") assert initial_leader_num != final_leader_num - produce_and_check_logs( - model_full_name=ops_test.model_full_name, - kafka_unit_name=f"{APP_NAME}/0", - provider_unit_name=f"{DUMMY_NAME}/0", - topic="replicated-topic", - create_topic=False, - ) - async def test_multi_cluster_isolation(ops_test: OpsTest, kafka_charm): second_kafka_name = f"{APP_NAME}-two" diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 473beaaf..54af7ade 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -22,6 +22,7 @@ METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) APP_NAME = METADATA["name"] ZK_NAME = "zookeeper" +DUMMY_NAME = "app" REL_NAME_ADMIN = "kafka-client-admin" logger = logging.getLogger(__name__) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index a045274f..1f4d0c54 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -16,6 +16,7 @@ from .helpers import ( APP_NAME, + DUMMY_NAME, REL_NAME_ADMIN, ZK_NAME, check_socket, @@ -26,7 +27,6 @@ logger = logging.getLogger(__name__) -DUMMY_NAME = "app" SAME_ZK = f"{ZK_NAME}-same" SAME_KAFKA = f"{APP_NAME}-same" diff --git a/tests/integration/test_scaling.py b/tests/integration/test_scaling.py index 70fdf718..5f3b45e0 100644 --- a/tests/integration/test_scaling.py +++ b/tests/integration/test_scaling.py @@ -5,10 +5,8 @@ import asyncio import logging import time -from pathlib import Path import pytest -import yaml from pytest_operator.plugin import OpsTest from literals import CHARM_KEY, ZK @@ -18,9 +16,6 @@ logger = logging.getLogger(__name__) -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -DUMMY_NAME_1 = "app" - @pytest.mark.abort_on_fail async def test_kafka_simple_scale_up(ops_test: OpsTest, kafka_charm): From 316ca00dfe741654ce15f35ac3cbb9419a0fa6a4 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 11 Sep 2023 10:53:02 +0200 Subject: [PATCH 15/21] redo kill broker test with continuous writes --- lib/charms/kafka/v0/client.py | 3 +- tests/integration/ha/__init__.py | 2 + tests/integration/ha/continuous_writes.py | 24 +++++++----- tests/integration/ha/ha_helpers.py | 4 +- tests/integration/ha/test_ha.py | 46 +++++++++++++++++------ tox.ini | 14 +------ 6 files changed, 57 insertions(+), 36 deletions(-) create mode 100644 tests/integration/ha/__init__.py diff --git a/lib/charms/kafka/v0/client.py b/lib/charms/kafka/v0/client.py index 39c732e4..28a7f0c3 100644 --- a/lib/charms/kafka/v0/client.py +++ b/lib/charms/kafka/v0/client.py @@ -126,6 +126,7 @@ def __init__( self.mtls = self.security_protocol == "SSL" self._subscription = None + self._consumer_group_prefix = None @cached_property def _admin_client(self) -> KafkaAdminClient: @@ -174,7 +175,7 @@ def _consumer_client(self) -> KafkaConsumer: ssl_certfile=self.certfile_path if self.ssl else None, ssl_keyfile=self.keyfile_path if self.mtls else None, api_version=KafkaClient.API_VERSION if self.mtls else None, - group_id=self._consumer_group_prefix or None, + group_id=self._consumer_group_prefix, enable_auto_commit=True, auto_offset_reset="earliest", consumer_timeout_ms=15000, diff --git a/tests/integration/ha/__init__.py b/tests/integration/ha/__init__.py new file mode 100644 index 00000000..db3bfe1a --- /dev/null +++ b/tests/integration/ha/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. diff --git a/tests/integration/ha/continuous_writes.py b/tests/integration/ha/continuous_writes.py index f2221607..89fd5336 100644 --- a/tests/integration/ha/continuous_writes.py +++ b/tests/integration/ha/continuous_writes.py @@ -20,7 +20,8 @@ wait_fixed, wait_random, ) -from tests.integration.helpers import DUMMY_NAME, get_provider_data + +from integration.helpers import DUMMY_NAME, get_provider_data logger = logging.getLogger(__name__) @@ -43,10 +44,10 @@ def __init__(self, ops_test: OpsTest, app: str): wait=wait_fixed(wait=5) + wait_random(0, 5), stop=stop_after_attempt(5), ) - async def start(self) -> None: + def start(self) -> None: """Run continuous writes in the background.""" if not self._is_stopped: - await self.clear() + self.clear() # create topic self._create_replicated_topic() @@ -60,7 +61,7 @@ async def start(self) -> None: # start writes self._process.start() - async def update(self): + def update(self): """Update cluster related conf. Useful in cases such as scaling, pwd change etc.""" self._queue.put(SimpleNamespace(model_full_name=self._ops_test.model_full_name)) @@ -68,10 +69,10 @@ async def update(self): wait=wait_fixed(wait=5) + wait_random(0, 5), stop=stop_after_attempt(5), ) - async def clear(self) -> None: + def clear(self) -> None: """Stop writes and delete the topic.""" if not self._is_stopped: - await self.stop() + self.stop() client = self._client() try: @@ -88,6 +89,7 @@ def consumed_messages(self) -> list: client = self._client() try: client.subscribe_to_topic(topic_name=self.TOPIC_NAME) + # FIXME: loading whole list of consumed messages into memory might not be the best idea return list(client.messages()) finally: client.close() @@ -106,7 +108,7 @@ def _create_replicated_topic(self): wait=wait_fixed(wait=5) + wait_random(0, 5), stop=stop_after_attempt(5), ) - async def stop(self) -> SimpleNamespace: + def stop(self) -> SimpleNamespace: """Stop the continuous writes process and return max inserted ID.""" if not self._is_stopped: self._stop_process() @@ -123,9 +125,11 @@ async def stop(self) -> SimpleNamespace: for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)): with attempt: with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "r") as f: - result.last_expected_message = int(f.read().rstrip()) + result.last_expected_message, result.lost_messages = ( + f.read().rstrip().split(",", maxsplit=2) + ) except RetryError: - result.last_expected_message = -1 + result.last_expected_message = result.lost_messages = -1 return result @@ -136,7 +140,7 @@ def _create_process(self): self._process = Process( target=ContinuousWrites._run_async, name="continuous_writes", - args=(self._event, self._queue, 0, True), + args=(self._event, self._queue, 0), ) def _stop_process(self): diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py index 67d21e39..f9ff09a3 100644 --- a/tests/integration/ha/ha_helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -6,8 +6,8 @@ from subprocess import PIPE, check_output from pytest_operator.plugin import OpsTest -from tests.integration.helpers import APP_NAME, get_address +from integration.helpers import APP_NAME, get_address from literals import SECURITY_PROTOCOL_PORTS from snap import KafkaSnap @@ -51,6 +51,7 @@ async def get_topic_offsets(ops_test: OpsTest, topic: str, unit_name: str) -> li + f":{SECURITY_PROTOCOL_PORTS['SASL_PLAINTEXT'].client}" ) + # example of topic offset output: 'test-topic:0:10' result = check_output( f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.get-offsets --bootstrap-server {bootstrap_server} --command-config {KafkaSnap.CONF_PATH}/client.properties --topic {topic}'", stderr=PIPE, @@ -58,7 +59,6 @@ async def get_topic_offsets(ops_test: OpsTest, topic: str, unit_name: str) -> li universal_newlines=True, ) - # example of topic offset output: 'test-topic:0:10' return re.search(rf"{topic}:(\d+:\d+)", result)[1].split(":") diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index e12154a0..509dd216 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -36,9 +36,9 @@ async def c_writes(ops_test: OpsTest): @pytest.fixture() async def c_writes_runner(ops_test: OpsTest, c_writes: ContinuousWrites): """Starts continuous write operations and clears writes at the end of the test.""" - await c_writes.start() + c_writes.start() yield - await c_writes.clear() + c_writes.clear() logger.info("\n\n\n\nThe writes have been cleared.\n\n\n\n") @@ -89,7 +89,7 @@ async def test_replicated_events(ops_test: OpsTest): kafka_unit_name=f"{APP_NAME}/1", topic="replicated-topic", ) - assert get_topic_offsets( + assert await get_topic_offsets( ops_test=ops_test, topic="replicated-topic", unit_name=f"{APP_NAME}/1" ) == ["0", "15"] check_logs( @@ -97,22 +97,46 @@ async def test_replicated_events(ops_test: OpsTest): kafka_unit_name=f"{APP_NAME}/2", topic="replicated-topic", ) - assert get_topic_offsets( + assert await get_topic_offsets( ops_test=ops_test, topic="replicated-topic", unit_name=f"{APP_NAME}/2" ) == ["0", "15"] -async def test_kill_broker_with_topic_leader(ops_test: OpsTest): - initial_leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic") - logger.info(f"Killing broker of leader for topic 'replicated-topic': {initial_leader_num}") +async def test_kill_broker_with_topic_leader( + ops_test: OpsTest, + c_writes: ContinuousWrites, + c_writes_runner: ContinuousWrites, +): + initial_leader_num = await get_topic_leader( + ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME + ) + initial_offsets = await get_topic_offsets( + ops_test=ops_test, + topic=ContinuousWrites.TOPIC_NAME, + unit_name=f"kafka/{initial_leader_num}", + ) + + logger.info( + f"Killing broker of leader for topic '{ContinuousWrites.TOPIC_NAME}': {initial_leader_num}" + ) await send_control_signal( ops_test=ops_test, unit_name=f"{APP_NAME}/{initial_leader_num}", kill_code="SIGKILL" ) # Give time for the service to restart - time.sleep(15) - # Check that is still possible to write to the same topic. - final_leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic") - assert initial_leader_num != final_leader_num + time.sleep(10) + + # Check that leader changed + next_leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic") + next_offsets = await get_topic_offsets( + ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME, unit_name=f"kafka/{next_leader_num}" + ) + + assert initial_leader_num != next_leader_num + assert int(next_offsets[-1]) > int(initial_offsets[-1]) + + res = c_writes.stop() + assert res.lost_messages == 0 + assert res.count - 1 == res.last_expected_message # NOTE: Count starts by index 0 async def test_multi_cluster_isolation(ops_test: OpsTest, kafka_charm): diff --git a/tox.ini b/tox.ini index 8d505e1f..80a4e1b7 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ set_env = scaling: TEST_FILE=test_scaling.py password-rotation: TEST_FILE=test_password_rotation.py tls: TEST_FILE=test_tls.py - ha: TEST_FILE=test_ha.py + ha: TEST_FILE=ha/test_ha.py pass_env = PYTHONPATH @@ -85,7 +85,7 @@ commands = poetry install --with integration poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ -[testenv:integration-{charm,provider,scaling,password-rotation,tls}] +[testenv:integration-{charm,provider,scaling,password-rotation,tls,ha}] description = Run integration tests pass_env = {[testenv]pass_env} @@ -94,13 +94,3 @@ pass_env = commands = poetry install --with integration poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/{env:TEST_FILE} - -[testenv:integration-{ha}] -description = Run integration tests for high availability -pass_env = - {[testenv]pass_env} - CI - CI_PACKED_CHARMS -commands = - poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ha/{env:TEST_FILE} From d80831e8b27cdea073aada728a09b4afd9eee550 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 11 Sep 2023 12:10:30 +0200 Subject: [PATCH 16/21] fix imports --- tests/integration/ha/test_ha.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index 509dd216..d3d60b1c 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -7,14 +7,14 @@ import time import pytest -from continuous_writes import ContinuousWrites +from integration.ha.continuous_writes import ContinuousWrites from pytest_operator.plugin import OpsTest -from tests.integration.ha.ha_helpers import ( +from integration.ha.ha_helpers import ( get_topic_leader, get_topic_offsets, send_control_signal, ) -from tests.integration.helpers import ( +from integration.helpers import ( APP_NAME, DUMMY_NAME, REL_NAME_ADMIN, @@ -130,11 +130,10 @@ async def test_kill_broker_with_topic_leader( next_offsets = await get_topic_offsets( ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME, unit_name=f"kafka/{next_leader_num}" ) + res = c_writes.stop() assert initial_leader_num != next_leader_num assert int(next_offsets[-1]) > int(initial_offsets[-1]) - - res = c_writes.stop() assert res.lost_messages == 0 assert res.count - 1 == res.last_expected_message # NOTE: Count starts by index 0 From 0e82c7e1f4dcc7939151779667ac5e8d20058876 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 11 Sep 2023 13:30:50 +0200 Subject: [PATCH 17/21] add delay to restart --- last_written_value | 1 + tests/integration/ha/ha_helpers.py | 42 ++++++++++++++++++++++++++++++ tests/integration/ha/test_ha.py | 12 +++++++++ 3 files changed, 55 insertions(+) create mode 100644 last_written_value diff --git a/last_written_value b/last_written_value new file mode 100644 index 00000000..7de346d2 --- /dev/null +++ b/last_written_value @@ -0,0 +1 @@ +0,0 \ No newline at end of file diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py index f9ff09a3..41fbae74 100644 --- a/tests/integration/ha/ha_helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -3,8 +3,10 @@ # See LICENSE file for licensing details. import logging import re +import subprocess from subprocess import PIPE, check_output +from ops.model import Unit from pytest_operator.plugin import OpsTest from integration.helpers import APP_NAME, get_address @@ -12,10 +14,19 @@ from snap import KafkaSnap PROCESS = "kafka.Kafka" +SERVICE_DEFAULT_PATH = "/etc/systemd/system/snap.charmed-kafka.daemon.service" logger = logging.getLogger(__name__) +class ProcessError(Exception): + """Raised when a process fails.""" + + +class ProcessRunningError(Exception): + """Raised when a process is running when it is not expected to be.""" + + async def get_topic_leader(ops_test: OpsTest, topic: str) -> int: """Get the broker with the topic leader. @@ -76,3 +87,34 @@ async def send_control_signal( raise Exception( f"Expected kill command {kill_cmd} to succeed instead it failed: {return_code}" ) + + +async def patch_restart_delay(ops_test: OpsTest, unit_name: str, delay: int) -> None: + """Adds a restart delay in the DB service file. + + When the DB service fails it will now wait for `delay` number of seconds. + """ + add_delay_cmd = ( + f"exec --unit {unit_name} -- " + f"sudo sed -i -e '/^[Service]/a RestartSec={delay}' " + f"{SERVICE_DEFAULT_PATH}" + ) + await ops_test.juju(*add_delay_cmd.split(), check=True) + + # reload the daemon for systemd to reflect changes + reload_cmd = f"exec --unit {unit_name} -- sudo systemctl daemon-reload" + await ops_test.juju(*reload_cmd.split(), check=True) + + +async def remove_restart_delay(ops_test: OpsTest, unit_name: str) -> None: + """Removes the restart delay from the service.""" + remove_delay_cmd = ( + f"exec --unit {unit_name} -- " + "sed -i -e '/^RestartSec=.*/d' " + f"{SERVICE_DEFAULT_PATH}" + ) + await ops_test.juju(*remove_delay_cmd.split(), check=True) + + # reload the daemon for systemd to reflect changes + reload_cmd = f"exec --unit {unit_name} -- sudo systemctl daemon-reload" + await ops_test.juju(*reload_cmd.split(), check=True) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index d3d60b1c..4692d1dd 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -12,6 +12,8 @@ from integration.ha.ha_helpers import ( get_topic_leader, get_topic_offsets, + patch_restart_delay, + remove_restart_delay, send_control_signal, ) from integration.helpers import ( @@ -42,6 +44,15 @@ async def c_writes_runner(ops_test: OpsTest, c_writes: ContinuousWrites): logger.info("\n\n\n\nThe writes have been cleared.\n\n\n\n") +@pytest.fixture() +async def restart_delay(ops_test: OpsTest): + for unit in ops_test.model.applications[APP_NAME].units: + await patch_restart_delay(ops_test=ops_test, unit_name=unit.name, delay=5) + yield + for unit in ops_test.model.applications[APP_NAME].units: + await remove_restart_delay(ops_test=ops_test, unit_name=unit.name) + + @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest, kafka_charm, app_charm): await asyncio.gather( @@ -106,6 +117,7 @@ async def test_kill_broker_with_topic_leader( ops_test: OpsTest, c_writes: ContinuousWrites, c_writes_runner: ContinuousWrites, + restart_delay, ): initial_leader_num = await get_topic_leader( ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME From ca356d870a4862ee8c5fce93f2da0e4b061daa21 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 11 Sep 2023 13:33:43 +0200 Subject: [PATCH 18/21] fix lint --- tests/integration/ha/ha_helpers.py | 6 +----- tests/integration/ha/test_ha.py | 3 ++- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py index 41fbae74..1590e692 100644 --- a/tests/integration/ha/ha_helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -3,10 +3,8 @@ # See LICENSE file for licensing details. import logging import re -import subprocess from subprocess import PIPE, check_output -from ops.model import Unit from pytest_operator.plugin import OpsTest from integration.helpers import APP_NAME, get_address @@ -109,9 +107,7 @@ async def patch_restart_delay(ops_test: OpsTest, unit_name: str, delay: int) -> async def remove_restart_delay(ops_test: OpsTest, unit_name: str) -> None: """Removes the restart delay from the service.""" remove_delay_cmd = ( - f"exec --unit {unit_name} -- " - "sed -i -e '/^RestartSec=.*/d' " - f"{SERVICE_DEFAULT_PATH}" + f"exec --unit {unit_name} -- sed -i -e '/^RestartSec=.*/d' {SERVICE_DEFAULT_PATH}" ) await ops_test.juju(*remove_delay_cmd.split(), check=True) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index 4692d1dd..0dcbf38e 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -7,8 +7,9 @@ import time import pytest -from integration.ha.continuous_writes import ContinuousWrites from pytest_operator.plugin import OpsTest + +from integration.ha.continuous_writes import ContinuousWrites from integration.ha.ha_helpers import ( get_topic_leader, get_topic_offsets, From 413492e199334ddace4b51ce48bbb337d21615f5 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 11 Sep 2023 14:59:08 +0200 Subject: [PATCH 19/21] fix asserts --- tests/integration/ha/test_ha.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index 0dcbf38e..d46289f9 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -147,8 +147,8 @@ async def test_kill_broker_with_topic_leader( assert initial_leader_num != next_leader_num assert int(next_offsets[-1]) > int(initial_offsets[-1]) - assert res.lost_messages == 0 - assert res.count - 1 == res.last_expected_message # NOTE: Count starts by index 0 + assert res.lost_messages == "0" + assert res.count - 1 == int(res.last_expected_message) # NOTE: Count starts by index 0 async def test_multi_cluster_isolation(ops_test: OpsTest, kafka_charm): From b2271d4d23a3b45f35e7c2b7c1d1869e1955de53 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 11 Sep 2023 22:02:45 +0200 Subject: [PATCH 20/21] general fixes --- last_written_value | 2 +- lib/charms/kafka/v0/client.py | 10 ++++++++-- tests/integration/ha/continuous_writes.py | 18 +++++++++--------- tests/integration/ha/test_ha.py | 19 +++---------------- 4 files changed, 21 insertions(+), 28 deletions(-) diff --git a/last_written_value b/last_written_value index 7de346d2..885bfe9b 100644 --- a/last_written_value +++ b/last_written_value @@ -1 +1 @@ -0,0 \ No newline at end of file +20052,0 \ No newline at end of file diff --git a/lib/charms/kafka/v0/client.py b/lib/charms/kafka/v0/client.py index 28a7f0c3..0016a112 100644 --- a/lib/charms/kafka/v0/client.py +++ b/lib/charms/kafka/v0/client.py @@ -81,6 +81,7 @@ def on_kafka_relation_created(self, event: RelationCreatedEvent): from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer from kafka.admin import NewTopic +from kafka.errors import KafkaError logger = logging.getLogger(__name__) logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -248,8 +249,13 @@ def produce_message(self, topic_name: str, message_content: str) -> None: """ item_content = f"Message #{message_content}" future = self._producer_client.send(topic_name, str.encode(item_content)) - future.get(timeout=60) - logger.info(f"Message published to topic={topic_name}, message content: {item_content}") + try: + future.get(timeout=60) + logger.info( + f"Message published to topic={topic_name}, message content: {item_content}" + ) + except KafkaError as e: + logger.error(f"Error producing message {message_content} to topic {topic_name}: {e}") def close(self) -> None: """Close the connection to the client.""" diff --git a/tests/integration/ha/continuous_writes.py b/tests/integration/ha/continuous_writes.py index 89fd5336..f4f63399 100644 --- a/tests/integration/ha/continuous_writes.py +++ b/tests/integration/ha/continuous_writes.py @@ -80,17 +80,17 @@ def clear(self) -> None: finally: client.close() - @retry( - wait=wait_fixed(wait=5) + wait_random(0, 5), - stop=stop_after_attempt(5), - ) - def consumed_messages(self) -> list: + def consumed_messages(self) -> list | None: """Consume the messages in the topic.""" client = self._client() try: - client.subscribe_to_topic(topic_name=self.TOPIC_NAME) - # FIXME: loading whole list of consumed messages into memory might not be the best idea - return list(client.messages()) + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(5)): + with attempt: + client.subscribe_to_topic(topic_name=self.TOPIC_NAME) + # FIXME: loading whole list of consumed messages into memory might not be the best idea + return list(client.messages()) + except RetryError: + return [] finally: client.close() @@ -194,7 +194,7 @@ def _client(): try: ContinuousWrites._produce_message(client, str(write_value)) - except (KafkaTimeoutError, ConnectionRefusedError): + except KafkaTimeoutError: client.close() client = _client() lost_messages += 1 diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index d46289f9..4b33ea53 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -95,22 +95,9 @@ async def test_replicated_events(ops_test: OpsTest): replication_factor=3, num_partitions=1, ) - # check logs in the two remaining units - check_logs( - model_full_name=ops_test.model_full_name, - kafka_unit_name=f"{APP_NAME}/1", - topic="replicated-topic", - ) - assert await get_topic_offsets( - ops_test=ops_test, topic="replicated-topic", unit_name=f"{APP_NAME}/1" - ) == ["0", "15"] - check_logs( - model_full_name=ops_test.model_full_name, - kafka_unit_name=f"{APP_NAME}/2", - topic="replicated-topic", - ) + # check offsets in the two remaining units assert await get_topic_offsets( - ops_test=ops_test, topic="replicated-topic", unit_name=f"{APP_NAME}/2" + ops_test=ops_test, topic="replicated-topic", unit_name=f"{APP_NAME}/0" ) == ["0", "15"] @@ -148,7 +135,7 @@ async def test_kill_broker_with_topic_leader( assert initial_leader_num != next_leader_num assert int(next_offsets[-1]) > int(initial_offsets[-1]) assert res.lost_messages == "0" - assert res.count - 1 == int(res.last_expected_message) # NOTE: Count starts by index 0 + assert res.count == int(res.last_expected_message) async def test_multi_cluster_isolation(ops_test: OpsTest, kafka_charm): From ffd448519a8dcf2a7d19132b31cefd7faee121a0 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Wed, 13 Sep 2023 16:07:09 +0200 Subject: [PATCH 21/21] add pf feedback --- .gitignore | 3 ++- tests/integration/ha/continuous_writes.py | 9 +++------ tests/integration/ha/ha_helpers.py | 4 ++-- tests/integration/ha/test_ha.py | 4 ++-- tests/integration/helpers.py | 3 ++- 5 files changed, 11 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 4b4412e7..f48d73e8 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ __pycache__/ *.py[cod] .vscode .idea -.python-version \ No newline at end of file +.python-version +last_written_value diff --git a/tests/integration/ha/continuous_writes.py b/tests/integration/ha/continuous_writes.py index f4f63399..f75332e0 100644 --- a/tests/integration/ha/continuous_writes.py +++ b/tests/integration/ha/continuous_writes.py @@ -193,7 +193,9 @@ def _client(): client = _client() try: - ContinuousWrites._produce_message(client, str(write_value)) + client.produce_message( + topic_name=ContinuousWrites.TOPIC_NAME, message_content=str(write_value) + ) except KafkaTimeoutError: client.close() client = _client() @@ -212,11 +214,6 @@ def _client(): client.close() - @staticmethod - def _produce_message(client: KafkaClient, write_value: str) -> None: - """Produce a single message.""" - client.produce_message(topic_name=ContinuousWrites.TOPIC_NAME, message_content=write_value) - @staticmethod def _run_async(event: Event, data_queue: Queue, starting_number: int): """Run async code.""" diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py index 1590e692..c89b8a04 100644 --- a/tests/integration/ha/ha_helpers.py +++ b/tests/integration/ha/ha_helpers.py @@ -79,11 +79,11 @@ async def send_control_signal( await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) kill_cmd = f"exec --unit {unit_name} -- pkill --signal {kill_code} -f {PROCESS}" - return_code, _, _ = await ops_test.juju(*kill_cmd.split()) + return_code, stdout, stderr = await ops_test.juju(*kill_cmd.split()) if return_code != 0: raise Exception( - f"Expected kill command {kill_cmd} to succeed instead it failed: {return_code}" + f"Expected kill command {kill_cmd} to succeed instead it failed: {return_code}, {stdout}, {stderr}" ) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index 4b33ea53..4a60804a 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -21,6 +21,7 @@ APP_NAME, DUMMY_NAME, REL_NAME_ADMIN, + TEST_DEFAULT_MESSAGES, ZK_NAME, check_logs, produce_and_check_logs, @@ -95,10 +96,9 @@ async def test_replicated_events(ops_test: OpsTest): replication_factor=3, num_partitions=1, ) - # check offsets in the two remaining units assert await get_topic_offsets( ops_test=ops_test, topic="replicated-topic", unit_name=f"{APP_NAME}/0" - ) == ["0", "15"] + ) == ["0", str(TEST_DEFAULT_MESSAGES)] async def test_kill_broker_with_topic_leader( diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 54af7ade..bd447b4f 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -24,6 +24,7 @@ ZK_NAME = "zookeeper" DUMMY_NAME = "app" REL_NAME_ADMIN = "kafka-client-admin" +TEST_DEFAULT_MESSAGES = 15 logger = logging.getLogger(__name__) @@ -261,7 +262,7 @@ def produce_and_check_logs( replication_factor=replication_factor, ) client.create_topic(topic=topic_config) - for i in range(15): + for i in range(TEST_DEFAULT_MESSAGES): message = f"Message #{i}" client.produce_message(topic_name=topic, message_content=message)