diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7d75dc40..41431754 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -98,3 +98,46 @@ jobs: run: tox run -e ${{ matrix.tox-environments }} -- -m '${{ steps.select-tests.outputs.mark_expression }}' env: CI_PACKED_CHARMS: ${{ needs.build.outputs.charms }} + + integration-test-ha: + strategy: + fail-fast: false + matrix: + tox-environments: + - integration-ha-ha + name: ${{ matrix.tox-environments }} + needs: + - lint + - unit-test + - build + - integration-test + runs-on: ubuntu-latest + timeout-minutes: 120 + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Setup operator environment + # TODO: Replace with custom image on self-hosted runner + uses: charmed-kubernetes/actions-operator@main + with: + provider: lxd + bootstrap-options: "--agent-version 2.9.38" + - name: Download packed charm(s) + uses: actions/download-artifact@v3 + with: + name: ${{ needs.build.outputs.artifact-name }} + - name: Select tests + id: select-tests + run: | + if [ "${{ github.event_name }}" == "schedule" ] + then + echo Running unstable and stable tests + echo "mark_expression=" >> $GITHUB_OUTPUT + else + echo Skipping unstable tests + echo "mark_expression=not unstable" >> $GITHUB_OUTPUT + fi + - name: Run integration tests + run: tox run -e ${{ matrix.tox-environments }} -- -m '${{ steps.select-tests.outputs.mark_expression }}' + env: + CI_PACKED_CHARMS: ${{ needs.build.outputs.charms }} \ No newline at end of file diff --git a/tests/integration/ha/helpers.py b/tests/integration/ha/helpers.py new file mode 100644 index 00000000..d8123d28 --- /dev/null +++ b/tests/integration/ha/helpers.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +import logging +from pathlib import Path +from subprocess import PIPE, check_output +from typing import Any, Dict + +import yaml +from charms.kafka.v0.client import KafkaClient +from kafka.admin import NewTopic + +from snap import KafkaSnap + +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) +APP_NAME = METADATA["name"] +ZK_NAME = "zookeeper" +REL_NAME_ADMIN = "kafka-client-admin" + +logger = logging.getLogger(__name__) + + +def produce_and_check_logs( + model_full_name: str, kafka_unit_name: str, provider_unit_name: str, topic: str +) -> None: + """Produces messages from HN to chosen Kafka topic. + + Args: + model_full_name: the full name of the model + kafka_unit_name: the kafka unit to checks logs on + provider_unit_name: the app to grab credentials from + topic: the desired topic to produce to + + Raises: + KeyError: if missing relation data + AssertionError: if logs aren't found for desired topic + """ + relation_data = get_provider_data( + unit_name=provider_unit_name, + model_full_name=model_full_name, + endpoint="kafka-client-admin", + ) + topic = topic + username = relation_data.get("username", None) + password = relation_data.get("password", None) + servers = relation_data.get("endpoints", "").split(",") + security_protocol = "SASL_PLAINTEXT" + + if not (username and password and servers): + raise KeyError("missing relation data from app charm") + + client = KafkaClient( + servers=servers, + username=username, + password=password, + security_protocol=security_protocol, + ) + topic_config = NewTopic( + name=topic, + num_partitions=5, + replication_factor=1, + ) + + client.create_topic(topic=topic_config) + for i in range(15): + message = f"Message #{i}" + client.produce_message(topic_name=topic, message_content=message) + + check_logs(model_full_name, kafka_unit_name, topic) + + +def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None: + """Checks if messages for a topic have been produced. + + Args: + model_full_name: the full name of the model + kafka_unit_name: the kafka unit to checks logs on + topic: the desired topic to check + """ + logs = check_output( + f"JUJU_MODEL={model_full_name} juju ssh {kafka_unit_name} sudo -i 'find {KafkaSnap.DATA_PATH}/data'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ).splitlines() + + logger.debug(f"{logs=}") + + passed = False + for log in logs: + if topic and "index" in log: + passed = True + break + + assert passed, "logs not found" + + +def get_provider_data( + unit_name: str, model_full_name: str, endpoint: str = "kafka-client" +) -> Dict[str, str]: + result = show_unit(unit_name=unit_name, model_full_name=model_full_name) + relations_info = result[unit_name]["relation-info"] + logger.info(f"Relation info: {relations_info}") + provider_relation_data = {} + for info in relations_info: + if info["endpoint"] == endpoint: + logger.info(f"Relation data: {info}") + provider_relation_data["username"] = info["application-data"]["username"] + provider_relation_data["password"] = info["application-data"]["password"] + provider_relation_data["endpoints"] = info["application-data"]["endpoints"] + provider_relation_data["zookeeper-uris"] = info["application-data"]["zookeeper-uris"] + provider_relation_data["tls"] = info["application-data"]["tls"] + if "consumer-group-prefix" in info["application-data"]: + provider_relation_data["consumer-group-prefix"] = info["application-data"][ + "consumer-group-prefix" + ] + provider_relation_data["topic"] = info["application-data"]["topic"] + return provider_relation_data + + +def show_unit(unit_name: str, model_full_name: str) -> Any: + result = check_output( + f"JUJU_MODEL={model_full_name} juju show-unit {unit_name}", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + return yaml.safe_load(result) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py new file mode 100644 index 00000000..94c69417 --- /dev/null +++ b/tests/integration/ha/test_ha.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging + +import pytest +from helpers import ( + APP_NAME, + REL_NAME_ADMIN, + ZK_NAME, + check_logs, + produce_and_check_logs, +) +from pytest_operator.plugin import OpsTest + +logger = logging.getLogger(__name__) + + +DUMMY_NAME = "app" + + +@pytest.mark.abort_on_fail +async def test_build_and_deploy(ops_test: OpsTest, kafka_charm): + await asyncio.gather( + ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1, series="jammy"), + ops_test.model.deploy(ZK_NAME, channel="edge", num_units=1, series="jammy"), + ) + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30, timeout=3600) + assert ops_test.model.applications[APP_NAME].status == "blocked" + assert ops_test.model.applications[ZK_NAME].status == "active" + + await ops_test.model.add_relation(APP_NAME, ZK_NAME) + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30) + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[ZK_NAME].status == "active" + + +async def test_second_cluster(ops_test: OpsTest, kafka_charm, app_charm): + second_kafka_name = f"{APP_NAME}-two" + second_zk_name = f"{ZK_NAME}-two" + + await asyncio.gather( + ops_test.model.deploy( + kafka_charm, application_name=second_kafka_name, num_units=1, series="jammy" + ), + ops_test.model.deploy( + ZK_NAME, channel="edge", application_name=second_zk_name, num_units=1, series="jammy" + ), + ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"), + ) + + await ops_test.model.wait_for_idle( + apps=[second_kafka_name, second_zk_name, DUMMY_NAME], + idle_period=30, + timeout=3600, + ) + assert ops_test.model.applications[second_kafka_name].status == "blocked" + + await ops_test.model.add_relation(second_kafka_name, second_zk_name) + + # Relate "app" to the *first* cluster + await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}") + + await ops_test.model.wait_for_idle( + apps=[second_kafka_name, second_zk_name, DUMMY_NAME, APP_NAME], + idle_period=30, + ) + + produce_and_check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{APP_NAME}/0", + provider_unit_name=f"{DUMMY_NAME}/0", + topic="hot-topic", + ) + + # Check that logs are not found on the second cluster + with pytest.raises(AssertionError): + check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{second_kafka_name}/0", + topic="hot-topic", + ) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index f92b8b39..45c232d9 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -299,6 +299,17 @@ def produce_and_check_logs( message = f"Message #{i}" client.produce_message(topic_name=topic, message_content=message) + check_logs(model_full_name, kafka_unit_name, topic) + + +def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None: + """Checks if messages for a topic have been produced. + + Args: + model_full_name: the full name of the model + kafka_unit_name: the kafka unit to checks logs on + topic: the desired topic to check + """ logs = check_output( f"JUJU_MODEL={model_full_name} juju ssh {kafka_unit_name} sudo -i 'find {KafkaSnap.DATA_PATH}/data'", stderr=PIPE, diff --git a/tox.ini b/tox.ini index 34aede6c..5342d8bd 100644 --- a/tox.ini +++ b/tox.ini @@ -21,6 +21,13 @@ set_env = PYTHONPATH = {tox_root}/lib:{[vars]src_path} PYTHONBREAKPOINT=ipdb.set_trace PY_COLORS=1 + charm: TEST_FILE=test_charm.py + provider: TEST_FILE=test_provider.py + scaling: TEST_FILE=test_scaling.py + password-rotation: TEST_FILE=test_password_rotation.py + tls: TEST_FILE=test_tls.py + ha: TEST_FILE=test_ha.py + pass_env = PYTHONPATH CHARM_BUILD_DIR @@ -78,62 +85,22 @@ commands = poetry install --with integration poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ -[testenv:integration-charm] -description = Run base integration tests -pass_env = - {[testenv]pass_env} - CI - CI_PACKED_CHARMS -commands = - poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_charm.py - -[testenv:integration-provider] -description = Run integration tests for provider -pass_env = - {[testenv]pass_env} - CI - CI_PACKED_CHARMS -commands = - poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_provider.py - -[testenv:integration-scaling] -description = Run scaling integration tests -pass_env = - {[testenv]pass_env} - CI - CI_PACKED_CHARMS -commands = - poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_scaling.py - -[testenv:integration-password-rotation] -description = Run password rotation integration tests -pass_env = - {[testenv]pass_env} - CI - CI_PACKED_CHARMS -commands = - poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_password_rotation.py - -[testenv:integration-tls] -description = Run TLS integration tests +[testenv:integration-{charm,provider,scaling,password-rotation,tls}] +description = Run integration tests pass_env = {[testenv]pass_env} CI CI_PACKED_CHARMS commands = poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_tls.py + poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/{env:TEST_FILE} -[testenv:integration-upgrade] -description = Run in place upgrade integration tests +[testenv:integration-ha-{ha}] +description = Run integration tests for high availability pass_env = {[testenv]pass_env} CI CI_PACKED_CHARMS commands = poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_upgrade.py + poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ha/{env:TEST_FILE}