Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5686] Fix flaky CI #261

Merged
merged 10 commits into from
Oct 23, 2024
10 changes: 8 additions & 2 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
DataPeerUnitData,
)
from charms.zookeeper.v0.client import QuorumLeaderNotFoundError, ZooKeeperManager
from kazoo.client import AuthFailedError, NoNodeError
from kazoo.client import AuthFailedError, ConnectionLoss, NoNodeError
from kazoo.exceptions import NoAuthError
from lightkube.resources.core_v1 import Node, Pod
from ops.model import Application, Relation, Unit
Expand Down Expand Up @@ -714,7 +714,13 @@ def broker_active(self) -> bool:
zk = ZooKeeperManager(hosts=hosts, username=self.username, password=self.password)
try:
brokers = zk.leader_znodes(path=path)
except (NoNodeError, AuthFailedError, QuorumLeaderNotFoundError, NoAuthError) as e:
except (
NoNodeError,
AuthFailedError,
QuorumLeaderNotFoundError,
ConnectionLoss,
NoAuthError,
) as e:
logger.debug(str(e))
brokers = set()

Expand Down
35 changes: 21 additions & 14 deletions tests/integration/test_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import pytest
from pytest_operator.plugin import OpsTest
from tenacity import Retrying, stop_after_attempt, wait_fixed

from literals import (
PEER_CLUSTER_ORCHESTRATOR_RELATION,
Expand Down Expand Up @@ -141,6 +142,7 @@ async def test_minimum_brokers_balancer_starts(self, ops_test: OpsTest):
status="active",
timeout=1800,
idle_period=60,
raise_on_error=False,
)

assert balancer_is_running(
Expand All @@ -157,10 +159,9 @@ async def test_balancer_monitor_state(self, ops_test: OpsTest):
assert balancer_is_ready(ops_test=ops_test, app_name=self.balancer_app)

@pytest.mark.abort_on_fail
@pytest.mark.skip
# @pytest.mark.skipif(
# deployment_strat == "single", reason="Testing full rebalance on large deployment"
# )
@pytest.mark.skipif(
deployment_strat == "single", reason="Testing full rebalance on large deployment"
)
async def test_add_unit_full_rebalance(self, ops_test: OpsTest):
await ops_test.model.applications[APP_NAME].add_units(
count=1 # up to 4, new unit won't have any partitions
Expand Down Expand Up @@ -279,14 +280,17 @@ async def test_balancer_prepare_unit_removal(self, ops_test: OpsTest):

assert balancer_is_ready(ops_test=ops_test, app_name=self.balancer_app)

rebalance_action = await leader_unit.run_action(
"rebalance",
mode="remove",
brokerid=new_broker_id,
dryrun=False,
)
response = await rebalance_action.wait()
assert not response.results.get("error", "")
for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(60), reraise=True):
with attempt:
rebalance_action = await leader_unit.run_action(
"rebalance",
mode="remove",
brokerid=new_broker_id,
dryrun=False,
)

response = await rebalance_action.wait()
assert not response.results.get("error", "")

post_rebalance_replica_counts = get_replica_count_by_broker_id(ops_test, self.balancer_app)

Expand Down Expand Up @@ -320,9 +324,12 @@ async def test_tls(self, ops_test: OpsTest):
await ops_test.model.add_relation(TLS_NAME, f"{BALANCER_APP}:{TLS_RELATION}")

await ops_test.model.wait_for_idle(
apps=list({APP_NAME, ZK_NAME, self.balancer_app}), idle_period=30, timeout=1800
apps=list({APP_NAME, ZK_NAME, self.balancer_app}),
status="active",
idle_period=30,
timeout=1800,
)
async with ops_test.fast_forward(fast_interval="20s"):
async with ops_test.fast_forward(fast_interval="30s"):
await asyncio.sleep(120) # ensure update-status adds broker-capacities if missed

# Assert that balancer is running and using certificates
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,5 +352,5 @@ async def test_deploy_with_existing_storage(ops_test: OpsTest):
add_unit_cmd = f"add-unit {APP_NAME} --model={ops_test.model.info.name} --attach-storage={data_storage_id}".split()
await ops_test.juju(*add_unit_cmd)
await ops_test.model.wait_for_idle(
apps=[APP_NAME], status="active", timeout=1000, idle_period=60
apps=[APP_NAME], status="active", timeout=2000, idle_period=30, raise_on_error=False
)
14 changes: 10 additions & 4 deletions tests/integration/test_password_rotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,27 @@ async def test_build_and_deploy(ops_test: OpsTest, kafka_charm, app_charm):
ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"),
)
await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK_NAME].units) == 3)
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME])
await ops_test.model.wait_for_idle(
apps=[APP_NAME, ZK_NAME], timeout=2000, idle_period=30, raise_on_error=False
)

assert ops_test.model.applications[APP_NAME].status == "blocked"
assert ops_test.model.applications[ZK_NAME].status == "active"

# needed to open localhost ports
await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}")
await ops_test.model.add_relation(APP_NAME, ZK_NAME)

await ops_test.model.wait_for_idle(
apps=[APP_NAME, ZK_NAME], status="active", idle_period=30, timeout=3600
)
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[APP_NAME, ZK_NAME], status="active", idle_period=30, timeout=3600
)


async def test_password_rotation(ops_test: OpsTest):
"""Check that password stored on ZK has changed after a password rotation."""
initial_sync_user = get_user(
username="sync",
model_full_name=ops_test.model_full_name,
)

Expand All @@ -54,6 +59,7 @@ async def test_password_rotation(ops_test: OpsTest):
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], status="active", idle_period=30)

new_sync_user = get_user(
username="sync",
model_full_name=ops_test.model_full_name,
)

Expand Down
12 changes: 8 additions & 4 deletions tests/integration/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ async def test_deploy_charms_relate_active(
await ops_test.model.add_relation(APP_NAME, ZK)
await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME_1}:{REL_NAME_CONSUMER}")

# async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[APP_NAME, DUMMY_NAME_1, ZK], idle_period=30, status="active", timeout=1000
)
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[APP_NAME, ZK, DUMMY_NAME_1],
idle_period=30,
status="active",
timeout=2000,
raise_on_error=False,
)

usernames.update(get_client_usernames(ops_test))

Expand Down
66 changes: 30 additions & 36 deletions tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from pytest_operator.plugin import OpsTest

from literals import (
CHARM_KEY,
REL_NAME,
SECURITY_PROTOCOL_PORTS,
TLS_RELATION,
Expand All @@ -20,6 +19,7 @@
)

from .helpers import (
APP_NAME,
REL_NAME_ADMIN,
check_tls,
extract_ca,
Expand All @@ -34,8 +34,6 @@

logger = logging.getLogger(__name__)

pytestmark = pytest.mark.broker

TLS_NAME = "self-signed-certificates"
CERTS_NAME = "tls-certificates-operator"
MTLS_NAME = "mtls"
Expand All @@ -54,19 +52,17 @@ async def test_deploy_tls(ops_test: OpsTest, kafka_charm):
ops_test.model.deploy(ZK, channel="edge", series="jammy", application_name=ZK),
ops_test.model.deploy(
kafka_charm,
application_name=CHARM_KEY,
application_name=APP_NAME,
series="jammy",
config={
"ssl_principal_mapping_rules": "RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,DEFAULT"
},
),
)
await ops_test.model.block_until(lambda: len(ops_test.model.applications[ZK].units) == 1)
await ops_test.model.wait_for_idle(
apps=[CHARM_KEY, ZK, TLS_NAME], idle_period=15, timeout=1800
)
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK, TLS_NAME], idle_period=15, timeout=1800)

assert ops_test.model.applications[CHARM_KEY].status == "blocked"
assert ops_test.model.applications[APP_NAME].status == "blocked"
assert ops_test.model.applications[ZK].status == "active"
assert ops_test.model.applications[TLS_NAME].status == "active"

Expand All @@ -86,13 +82,13 @@ async def test_kafka_tls(ops_test: OpsTest, app_charm):
"""
# Relate Zookeeper[TLS] to Kafka[Non-TLS]
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.add_relation(ZK, CHARM_KEY)
await ops_test.model.add_relation(ZK, APP_NAME)
await ops_test.model.wait_for_idle(
apps=[ZK], idle_period=15, timeout=1000, status="active"
)

# Unit is on 'blocked' but whole app is on 'waiting'
assert ops_test.model.applications[CHARM_KEY].status == "blocked"
assert ops_test.model.applications[APP_NAME].status == "blocked"

# Set a custom private key, by running set-tls-private-key action with no parameters,
# as this will generate a random one
Expand All @@ -102,19 +98,19 @@ async def test_kafka_tls(ops_test: OpsTest, app_charm):
# Extract the key
private_key = extract_private_key(
ops_test=ops_test,
unit_name=f"{CHARM_KEY}/{num_unit}",
unit_name=f"{APP_NAME}/{num_unit}",
)

# ensuring at least a few update-status
await ops_test.model.add_relation(f"{CHARM_KEY}:{TLS_RELATION}", TLS_NAME)
await ops_test.model.add_relation(f"{APP_NAME}:{TLS_RELATION}", TLS_NAME)
async with ops_test.fast_forward(fast_interval="20s"):
await asyncio.sleep(60)

await ops_test.model.wait_for_idle(
apps=[CHARM_KEY, ZK, TLS_NAME], idle_period=30, timeout=1200, status="active"
apps=[APP_NAME, ZK, TLS_NAME], idle_period=30, timeout=1200, status="active"
)

kafka_address = await get_address(ops_test=ops_test, app_name=CHARM_KEY)
kafka_address = await get_address(ops_test=ops_test, app_name=APP_NAME)

assert not check_tls(
ip=kafka_address, port=SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
Expand All @@ -123,15 +119,15 @@ async def test_kafka_tls(ops_test: OpsTest, app_charm):
await asyncio.gather(
ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"),
)
await ops_test.model.wait_for_idle(apps=[CHARM_KEY, DUMMY_NAME], timeout=1000, idle_period=30)
await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME], timeout=1000, idle_period=30)

# ensuring at least a few update-status
await ops_test.model.add_relation(CHARM_KEY, f"{DUMMY_NAME}:{REL_NAME_ADMIN}")
await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}")
async with ops_test.fast_forward(fast_interval="20s"):
await asyncio.sleep(60)

await ops_test.model.wait_for_idle(
apps=[CHARM_KEY, DUMMY_NAME], idle_period=30, status="active"
apps=[APP_NAME, DUMMY_NAME], idle_period=30, status="active"
)

assert check_tls(
Expand All @@ -150,7 +146,7 @@ async def test_kafka_tls(ops_test: OpsTest, app_charm):
# Extract the key
private_key_2 = extract_private_key(
ops_test=ops_test,
unit_name=f"{CHARM_KEY}/{num_unit}",
unit_name=f"{APP_NAME}/{num_unit}",
)

assert private_key != private_key_2
Expand Down Expand Up @@ -181,16 +177,16 @@ async def test_mtls(ops_test: OpsTest):
)
await ops_test.model.wait_for_idle(apps=[MTLS_NAME], timeout=1000, idle_period=15)
await ops_test.model.add_relation(
f"{CHARM_KEY}:{TRUSTED_CERTIFICATE_RELATION}", f"{MTLS_NAME}:{TLS_RELATION}"
f"{APP_NAME}:{TRUSTED_CERTIFICATE_RELATION}", f"{MTLS_NAME}:{TLS_RELATION}"
)
await ops_test.model.wait_for_idle(
apps=[CHARM_KEY, MTLS_NAME], idle_period=60, timeout=2000, status="active"
apps=[APP_NAME, MTLS_NAME], idle_period=60, timeout=2000, status="active"
)

# getting kafka ca and address
broker_ca = extract_ca(ops_test=ops_test, unit_name=f"{CHARM_KEY}/0")
broker_ca = extract_ca(ops_test=ops_test, unit_name=f"{APP_NAME}/0")

address = await get_address(ops_test, app_name=CHARM_KEY)
address = await get_address(ops_test, app_name=APP_NAME)
ssl_port = SECURITY_PROTOCOL_PORTS["SSL", "SSL"].client
sasl_port = SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
ssl_bootstrap_server = f"{address}:{ssl_port}"
Expand Down Expand Up @@ -235,7 +231,7 @@ async def test_mtls(ops_test: OpsTest):
async def test_mtls_broken(ops_test: OpsTest):
await ops_test.model.remove_application(MTLS_NAME, block_until_done=True)
await ops_test.model.wait_for_idle(
apps=[CHARM_KEY],
apps=[APP_NAME],
status="active",
idle_period=30,
timeout=2000,
Expand All @@ -245,20 +241,18 @@ async def test_mtls_broken(ops_test: OpsTest):
@pytest.mark.abort_on_fail
async def test_kafka_tls_scaling(ops_test: OpsTest):
"""Scale the application while using TLS to check that new units will configure correctly."""
await ops_test.model.applications[CHARM_KEY].add_units(count=2)
await ops_test.model.applications[APP_NAME].add_units(count=2)
await ops_test.model.block_until(
lambda: len(ops_test.model.applications[CHARM_KEY].units) == 3, timeout=1000
lambda: len(ops_test.model.applications[APP_NAME].units) == 3, timeout=1000
)

# Wait for model to settle
await ops_test.model.wait_for_idle(
apps=[CHARM_KEY],
status="active",
idle_period=40,
timeout=1000,
apps=[APP_NAME], status="active", idle_period=40, timeout=1000, raise_on_error=False
)

kafka_zk_relation_data = get_kafka_zk_relation_data(
unit_name=f"{CHARM_KEY}/2",
unit_name=f"{APP_NAME}/2",
ops_test=ops_test,
owner=ZK,
)
Expand All @@ -268,16 +262,16 @@ async def test_kafka_tls_scaling(ops_test: OpsTest):
assert f"{chroot}/brokers/ids/1" in active_brokers
assert f"{chroot}/brokers/ids/2" in active_brokers

kafka_address = await get_address(ops_test=ops_test, app_name=CHARM_KEY, unit_num=2)
kafka_address = await get_address(ops_test=ops_test, app_name=APP_NAME, unit_num=2)
assert check_tls(
ip=kafka_address, port=SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
)

# remove relation and check connection again
await ops_test.model.applications[CHARM_KEY].remove_relation(
f"{CHARM_KEY}:{REL_NAME}", f"{DUMMY_NAME}:{REL_NAME_ADMIN}"
await ops_test.model.applications[APP_NAME].remove_relation(
f"{APP_NAME}:{REL_NAME}", f"{DUMMY_NAME}:{REL_NAME_ADMIN}"
)
await ops_test.model.wait_for_idle(apps=[CHARM_KEY])
await ops_test.model.wait_for_idle(apps=[APP_NAME])
assert not check_tls(
ip=kafka_address, port=SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
)
Expand All @@ -286,10 +280,10 @@ async def test_kafka_tls_scaling(ops_test: OpsTest):
async def test_tls_removed(ops_test: OpsTest):
await ops_test.model.remove_application(TLS_NAME, block_until_done=True)
await ops_test.model.wait_for_idle(
apps=[CHARM_KEY, ZK], timeout=3600, idle_period=30, status="active"
apps=[APP_NAME, ZK], timeout=3600, idle_period=30, status="active"
)

kafka_address = await get_address(ops_test=ops_test, app_name=CHARM_KEY)
kafka_address = await get_address(ops_test=ops_test, app_name=APP_NAME)
assert not check_tls(
ip=kafka_address, port=SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
)
Loading