Skip to content

Commit

Permalink
[DPE-4257] Async replication UX Improvements (#481)
Browse files Browse the repository at this point in the history
* Syncing the UX with MySQL

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix failover and set-secret behaviour

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Improve statuses

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix app status set

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix model switch

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix config integration test

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

* Fix backups integration test

Signed-off-by: Marcelo Henrique Neppel <[email protected]>

---------

Signed-off-by: Marcelo Henrique Neppel <[email protected]>
  • Loading branch information
marceloneppel authored Jun 18, 2024
1 parent 882f06b commit 1a1c2d4
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 158 deletions.
11 changes: 9 additions & 2 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ create-backup:
Differential backup is a copy only of changed data since the last full backup.
Incremental backup is a copy only of changed data since the last backup (any type).
Possible values - full, differential, incremental.
create-replication:
description: Set up asynchronous replication between two clusters.
params:
name:
type: string
description: The name of the replication (defaults to 'default').
default: default
get-primary:
description: Get the unit which is the primary/leader in the replication.
get-password:
Expand All @@ -25,10 +32,10 @@ list-backups:
description: Lists backups in s3 storage.
pre-upgrade-check:
description: Run necessary pre-upgrade checks and preparations before executing a charm refresh.
promote-cluster:
promote-to-primary:
description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit.
params:
force-promotion:
force:
type: boolean
description: Force the promotion of a cluster when there is already a primary cluster.
restore:
Expand Down
8 changes: 4 additions & 4 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ peers:
interface: upgrade

provides:
async-primary:
interface: async_replication
replication-offer:
interface: postgresql_async
limit: 1
optional: true
database:
Expand All @@ -41,8 +41,8 @@ provides:
limit: 1

requires:
async-replica:
interface: async_replication
replication:
interface: postgresql_async
limit: 1
optional: true
certificates:
Expand Down
50 changes: 39 additions & 11 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@
USER,
USER_PASSWORD_KEY,
)
from relations.async_replication import PostgreSQLAsyncReplication
from relations.async_replication import (
REPLICATION_CONSUMER_RELATION,
REPLICATION_OFFER_RELATION,
PostgreSQLAsyncReplication,
)
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model
Expand Down Expand Up @@ -1222,15 +1226,42 @@ def _on_set_password(self, event: ActionEvent) -> None:
)
return

# Update the password in the PostgreSQL instance.
try:
self.postgresql.update_user_password(username, password)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
replication_offer_relation = self.model.get_relation(REPLICATION_OFFER_RELATION)
if (
replication_offer_relation is not None
and not self.async_replication.is_primary_cluster()
):
# Update the password in the other cluster PostgreSQL primary instance.
other_cluster_endpoints = self.async_replication.get_all_primary_cluster_endpoints()
other_cluster_primary = self._patroni.get_primary(
alternative_endpoints=other_cluster_endpoints
)
other_cluster_primary_ip = [
replication_offer_relation.data[unit].get("private-address")
for unit in replication_offer_relation.units
if unit.name.replace("/", "-") == other_cluster_primary
][0]
try:
self.postgresql.update_user_password(
username, password, database_host=other_cluster_primary_ip
)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
event.fail("Failed changing the password.")
return
elif self.model.get_relation(REPLICATION_CONSUMER_RELATION) is not None:
event.fail(
"Failed changing the password: Not all members healthy or finished initial sync."
"Failed changing the password: This action can be ran only in the cluster from the offer side."
)
return
else:
# Update the password in this cluster PostgreSQL primary instance.
try:
self.postgresql.update_user_password(username, password)
except PostgreSQLUpdateUserPasswordError as e:
logger.exception(e)
event.fail("Failed changing the password.")
return

# Update the password in the secret store.
self.set_secret(APP_SCOPE, f"{username}-password", password)
Expand All @@ -1239,9 +1270,6 @@ def _on_set_password(self, event: ActionEvent) -> None:
# Other units Patroni configuration will be reloaded in the peer relation changed event.
self.update_config()

# Update the password in the async replication data.
self.async_replication.update_async_replication_data()

event.set_results({"password": password})

def _on_update_status(self, _) -> None:
Expand Down Expand Up @@ -1357,7 +1385,7 @@ def _set_primary_status_message(self) -> None:
if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name:
self.unit.status = ActiveStatus("Primary")
elif self.is_standby_leader:
self.unit.status = ActiveStatus("Standby Leader")
self.unit.status = ActiveStatus("Standby")
elif self._patroni.member_started:
self.unit.status = ActiveStatus()
except (RetryError, ConnectionError) as e:
Expand Down
13 changes: 10 additions & 3 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,19 +230,20 @@ def get_member_status(self, member_name: str) -> str:
return member["state"]
return ""

def get_primary(self, unit_name_pattern=False) -> str:
def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str] = None) -> str:
"""Get primary instance.
Args:
unit_name_pattern: whether to convert pod name to unit name
alternative_endpoints: list of alternative endpoints to check for the primary.
Returns:
primary pod or unit name.
"""
# Request info from cluster endpoint (which returns all members of the cluster).
for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)):
with attempt:
url = self._get_alternative_patroni_url(attempt)
url = self._get_alternative_patroni_url(attempt, alternative_endpoints)
cluster_status = requests.get(
f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=self.verify,
Expand Down Expand Up @@ -301,12 +302,18 @@ def get_sync_standby_names(self) -> List[str]:
sync_standbys.append("/".join(member["name"].rsplit("-", 1)))
return sync_standbys

def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str:
def _get_alternative_patroni_url(
self, attempt: AttemptManager, alternative_endpoints: List[str] = None
) -> str:
"""Get an alternative REST API URL from another member each time.
When the Patroni process is not running in the current unit it's needed
to use a URL from another cluster member REST API to do some operations.
"""
if alternative_endpoints is not None:
return self._patroni_url.replace(
self.unit_ip, alternative_endpoints[attempt.retry_state.attempt_number - 1]
)
attempt_number = attempt.retry_state.attempt_number
if attempt_number > 1:
url = self._patroni_url
Expand Down
Loading

0 comments on commit 1a1c2d4

Please sign in to comment.