Skip to content

Commit

Permalink
[DPE-3257] Fix network cut tests (#346)
Browse files Browse the repository at this point in the history
* Fix network cut test

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix network cut test without IP change

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Update unit test

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix retrieval of units IPs

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Improve checks for readiness

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix IP retrieval

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix IP retrieval

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix IP retrieval

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

---------

Signed-off-by: Marcelo Henrique Neppel <[email protected]>
  • Loading branch information
marceloneppel authored Feb 9, 2024
1 parent a87c52b commit 64b65b4
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 48 deletions.
5 changes: 4 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,14 @@ def _reconfigure_cluster(self, event: HookEvent):
and event.relation.data[event.unit].get("ip-to-remove") is not None
):
ip_to_remove = event.relation.data[event.unit].get("ip-to-remove")
logger.info("Removing %s from the cluster due to IP change", ip_to_remove)
try:
self._patroni.remove_raft_member(ip_to_remove)
except RemoveRaftMemberFailedError:
logger.debug("Deferring on_peer_relation_changed: failed to remove raft member")
return False
self._remove_from_members_ips(ip_to_remove)
if ip_to_remove in self.members_ips:
self._remove_from_members_ips(ip_to_remove)
self._add_members(event)
return True

Expand Down Expand Up @@ -818,6 +820,7 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None:

# Remove departing units when the leader changes.
for ip in self._get_ips_to_remove():
logger.info("Removing %s from the cluster", ip)
self._remove_from_members_ips(ip)

self.update_config()
Expand Down
135 changes: 101 additions & 34 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.
import logging
import os
import random
import subprocess
Expand All @@ -22,6 +23,8 @@

from ..helpers import APPLICATION_NAME, db_connect, get_unit_address, run_command_on_unit

logger = logging.getLogger(__name__)

METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
PORT = 5432
APP_NAME = METADATA["name"]
Expand Down Expand Up @@ -74,13 +77,19 @@ async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool:
return True


async def are_writes_increasing(ops_test, down_unit: str = None) -> None:
async def are_writes_increasing(
ops_test, down_unit: str = None, use_ip_from_inside: bool = False
) -> None:
"""Verify new writes are continuing by counting the number of writes."""
writes, _ = await count_writes(ops_test, down_unit=down_unit)
writes, _ = await count_writes(
ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside
)
for member, count in writes.items():
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)):
with attempt:
more_writes, _ = await count_writes(ops_test, down_unit=down_unit)
more_writes, _ = await count_writes(
ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside
)
assert more_writes[member] > count, f"{member}: writes not continuing to DB"


Expand Down Expand Up @@ -161,33 +170,46 @@ async def change_wal_settings(
)


async def is_cluster_updated(ops_test: OpsTest, primary_name: str) -> None:
async def is_cluster_updated(
ops_test: OpsTest, primary_name: str, use_ip_from_inside: bool = False
) -> None:
# Verify that the old primary is now a replica.
logger.info("checking that the former primary is now a replica")
assert await is_replica(
ops_test, primary_name
ops_test, primary_name, use_ip_from_inside
), "there are more than one primary in the cluster."

# Verify that all units are part of the same cluster.
member_ips = await fetch_cluster_members(ops_test)
logger.info("checking that all units are part of the same cluster")
member_ips = await fetch_cluster_members(ops_test, use_ip_from_inside)
app = primary_name.split("/")[0]
ip_addresses = [
await get_unit_ip(ops_test, unit.name) for unit in ops_test.model.applications[app].units
await (
get_ip_from_inside_the_unit(ops_test, unit.name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit.name)
)
for unit in ops_test.model.applications[app].units
]
assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster."

# Verify that no writes to the database were missed after stopping the writes.
total_expected_writes = await check_writes(ops_test)
logger.info("checking that no writes to the database were missed after stopping the writes")
total_expected_writes = await check_writes(ops_test, use_ip_from_inside)

# Verify that old primary is up-to-date.
logger.info("checking that the former primary is up to date with the cluster after restarting")
assert await is_secondary_up_to_date(
ops_test, primary_name, total_expected_writes
ops_test, primary_name, total_expected_writes, use_ip_from_inside
), "secondary not up to date with the cluster after restarting."


async def check_writes(ops_test) -> int:
async def check_writes(ops_test, use_ip_from_inside: bool = False) -> int:
"""Gets the total writes from the test charm and compares to the writes from db."""
total_expected_writes = await stop_continuous_writes(ops_test)
actual_writes, max_number_written = await count_writes(ops_test)
actual_writes, max_number_written = await count_writes(
ops_test, use_ip_from_inside=use_ip_from_inside
)
for member, count in actual_writes.items():
assert (
count == max_number_written[member]
Expand All @@ -197,14 +219,20 @@ async def check_writes(ops_test) -> int:


async def count_writes(
ops_test: OpsTest, down_unit: str = None
ops_test: OpsTest, down_unit: str = None, use_ip_from_inside: bool = False
) -> Tuple[Dict[str, int], Dict[str, int]]:
"""Count the number of writes in the database."""
app = await app_name(ops_test)
password = await get_password(ops_test, app, down_unit)
for unit in ops_test.model.applications[app].units:
if unit.name != down_unit:
cluster = get_patroni_cluster(await get_unit_ip(ops_test, unit.name))
cluster = get_patroni_cluster(
await (
get_ip_from_inside_the_unit(ops_test, unit.name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit.name)
)
)
break
down_ips = []
if down_unit:
Expand Down Expand Up @@ -263,16 +291,21 @@ def cut_network_from_unit_without_ip_change(machine_name: str) -> None:
subprocess.check_call(limit_set_command.split())


async def fetch_cluster_members(ops_test: OpsTest):
async def fetch_cluster_members(ops_test: OpsTest, use_ip_from_inside: bool = False):
"""Fetches the IPs listed by Patroni as cluster members.
Args:
ops_test: OpsTest instance.
use_ip_from_inside: whether to use the IP from inside the unit.
"""
app = await app_name(ops_test)
member_ips = {}
for unit in ops_test.model.applications[app].units:
unit_ip = await get_unit_ip(ops_test, unit.name)
unit_ip = await (
get_ip_from_inside_the_unit(ops_test, unit.name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit.name)
)
cluster_info = requests.get(f"http://{unit_ip}:8008/cluster")
if len(member_ips) > 0:
# If the list of members IPs was already fetched, also compare the
Expand Down Expand Up @@ -304,6 +337,16 @@ async def get_controller_machine(ops_test: OpsTest) -> str:
][0]


async def get_ip_from_inside_the_unit(ops_test: OpsTest, unit_name: str) -> str:
command = f"exec --unit {unit_name} -- hostname -I"
return_code, stdout, stderr = await ops_test.juju(*command.split())
if return_code != 0:
raise ProcessError(
"Expected command %s to succeed instead it failed: %s %s", command, return_code, stderr
)
return stdout.splitlines()[0].strip()


async def get_patroni_setting(ops_test: OpsTest, setting: str) -> Optional[int]:
"""Get the value of one of the integer Patroni settings.
Expand Down Expand Up @@ -388,20 +431,28 @@ async def get_unit_ip(ops_test: OpsTest, unit_name: str) -> str:


@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True)
async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool:
async def is_connection_possible(
ops_test: OpsTest, unit_name: str, use_ip_from_inside: bool = False
) -> bool:
"""Test a connection to a PostgreSQL server."""
app = unit_name.split("/")[0]
password = await get_password(ops_test, app, unit_name)
address = await get_unit_ip(ops_test, unit_name)
address = await (
get_ip_from_inside_the_unit(ops_test, unit_name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit_name)
)
try:
with db_connect(
host=address, password=password
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT 1;")
success = cursor.fetchone()[0] == 1
connection.close()
return success
except psycopg2.Error:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
with db_connect(
host=address, password=password
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT 1;")
success = cursor.fetchone()[0] == 1
connection.close()
return success
except (psycopg2.Error, RetryError):
# Error raised when the connection is not possible.
return False

Expand All @@ -420,9 +471,13 @@ def is_machine_reachable_from(origin_machine: str, target_machine: str) -> bool:
return False


async def is_replica(ops_test: OpsTest, unit_name: str) -> bool:
async def is_replica(ops_test: OpsTest, unit_name: str, use_ip_from_inside: bool = False) -> bool:
"""Returns whether the unit a replica in the cluster."""
unit_ip = await get_unit_ip(ops_test, unit_name)
unit_ip = await (
get_ip_from_inside_the_unit(ops_test, unit_name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit_name)
)
member_name = unit_name.replace("/", "-")

try:
Expand Down Expand Up @@ -532,9 +587,13 @@ async def send_signal_to_process(
)


async def is_postgresql_ready(ops_test, unit_name: str) -> bool:
async def is_postgresql_ready(ops_test, unit_name: str, use_ip_from_inside: bool = False) -> bool:
"""Verifies a PostgreSQL instance is running and available."""
unit_ip = get_unit_address(ops_test, unit_name)
unit_ip = (
(await get_ip_from_inside_the_unit(ops_test, unit_name))
if use_ip_from_inside
else get_unit_address(ops_test, unit_name)
)
try:
for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3)):
with attempt:
Expand Down Expand Up @@ -571,15 +630,21 @@ def restore_network_for_unit_without_ip_change(machine_name: str) -> None:
subprocess.check_call(limit_set_command.split())


async def is_secondary_up_to_date(ops_test: OpsTest, unit_name: str, expected_writes: int) -> bool:
async def is_secondary_up_to_date(
ops_test: OpsTest, unit_name: str, expected_writes: int, use_ip_from_inside: bool = False
) -> bool:
"""Checks if secondary is up-to-date with the cluster.
Retries over the period of one minute to give secondary adequate time to copy over data.
"""
app = await app_name(ops_test)
password = await get_password(ops_test, app)
host = [
await get_unit_ip(ops_test, unit.name)
await (
get_ip_from_inside_the_unit(ops_test, unit.name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit.name)
)
for unit in ops_test.model.applications[app].units
if unit.name == unit_name
][0]
Expand Down Expand Up @@ -679,15 +744,17 @@ async def update_restart_condition(ops_test: OpsTest, unit, condition: str):


@retry(stop=stop_after_attempt(20), wait=wait_fixed(30))
async def wait_network_restore(ops_test: OpsTest, hostname: str, old_ip: str) -> None:
async def wait_network_restore(ops_test: OpsTest, unit_name: str, old_ip: str) -> None:
"""Wait until network is restored.
Args:
ops_test: pytest plugin helper
hostname: The name of the instance
unit_name: name of the unit
old_ip: old registered IP address
"""
if await instance_ip(ops_test, hostname) == old_ip:
# Retrieve the unit IP from inside the unit because it may not be updated in the
# Juju status too quickly.
if (await get_ip_from_inside_the_unit(ops_test, unit_name)) == old_ip:
raise Exception


Expand Down
20 changes: 13 additions & 7 deletions tests/integration/ha_tests/test_self_healing.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ async def test_forceful_restart_without_data_and_transaction_logs(


@pytest.mark.group(1)
@pytest.mark.unstable
async def test_network_cut(ops_test: OpsTest, continuous_writes, primary_start_timeout):
"""Completely cut and restore network."""
# Locate primary unit.
Expand Down Expand Up @@ -456,19 +455,22 @@ async def test_network_cut(ops_test: OpsTest, continuous_writes, primary_start_t

# Wait the LXD unit has its IP updated.
logger.info("waiting for IP address to be updated on Juju unit")
await wait_network_restore(ops_test, primary_hostname, primary_ip)
await wait_network_restore(ops_test, primary_name, primary_ip)

# Verify that the database service got restarted and is ready in the old primary.
logger.info(f"waiting for the database service to be ready on {primary_name}")
assert await is_postgresql_ready(ops_test, primary_name, use_ip_from_inside=True)

# Verify that connection is possible.
logger.info("checking whether the connectivity to the database is working")
assert await is_connection_possible(
ops_test, primary_name
ops_test, primary_name, use_ip_from_inside=True
), "Connection is not possible after network restore"

await is_cluster_updated(ops_test, primary_name)
await is_cluster_updated(ops_test, primary_name, use_ip_from_inside=True)


@pytest.mark.group(1)
@pytest.mark.unstable
async def test_network_cut_without_ip_change(
ops_test: OpsTest, continuous_writes, primary_start_timeout
):
Expand Down Expand Up @@ -516,7 +518,7 @@ async def test_network_cut_without_ip_change(

async with ops_test.fast_forward():
logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test, primary_name)
await are_writes_increasing(ops_test, primary_name, use_ip_from_inside=True)

logger.info("checking whether a new primary was elected")
# Verify that a new primary gets elected (ie old primary is secondary).
Expand All @@ -533,10 +535,14 @@ async def test_network_cut_without_ip_change(
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active")

# Verify that the database service got restarted and is ready in the old primary.
logger.info(f"waiting for the database service to be ready on {primary_name}")
assert await is_postgresql_ready(ops_test, primary_name)

# Verify that connection is possible.
logger.info("checking whether the connectivity to the database is working")
assert await is_connection_possible(
ops_test, primary_name
), "Connection is not possible after network restore"

await is_cluster_updated(ops_test, primary_name)
await is_cluster_updated(ops_test, primary_name, use_ip_from_inside=True)
Loading

0 comments on commit 64b65b4

Please sign in to comment.