Skip to content

Commit

Permalink
KRaft (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
zmraul authored Oct 23, 2024
1 parent d78ed3f commit 86f0445
Show file tree
Hide file tree
Showing 19 changed files with 816 additions and 123 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
- integration-upgrade
- integration-balancer-single
- integration-balancer-multi
- integration-kraft-single
- integration-kraft-multi
name: ${{ matrix.tox-environments }}
needs:
- lint
Expand Down
2 changes: 1 addition & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ options:
roles:
description: |
Comma separated list of the roles assigned to the nodes of this cluster.
This configuration accepts the following roles: 'broker' (standard functionality), 'balancer' (cruise control).
This configuration accepts the following roles: 'broker' (standard functionality), 'balancer' (cruise control), 'controller' (KRaft mode).
type: string
default: broker
compression_type:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ show_missing = true
minversion = "6.0"
log_cli_level = "INFO"
asyncio_mode = "auto"
markers = ["unstable", "broker", "balancer"]
markers = ["unstable", "broker", "balancer", "kraft"]

# Formatting tools configuration
[tool.black]
Expand Down
10 changes: 7 additions & 3 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ def _on_roles_changed(self, _):
This handler is in charge of stopping the workloads, since the sub-operators would not
be instantiated if roles are changed.
"""
if not self.state.runs_broker and self.broker.workload.active():
if (
not (self.state.runs_broker or self.state.runs_controller)
and self.broker.workload.active()
):
self.broker.workload.stop()

if (
Expand Down Expand Up @@ -179,8 +182,9 @@ def _on_collect_status(self, event: CollectStatusEvent):
if not self.broker.workload.active():
event.add_status(Status.BROKER_NOT_RUNNING.value.status)

if not self.state.zookeeper.broker_active():
event.add_status(Status.ZK_NOT_CONNECTED.value.status)
if not self.state.kraft_mode:
if not self.state.zookeeper.broker_active():
event.add_status(Status.ZK_NOT_CONNECTED.value.status)


if __name__ == "__main__":
Expand Down
160 changes: 116 additions & 44 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
ADMIN_USER,
BALANCER,
BROKER,
CONTROLLER,
CONTROLLER_PORT,
INTERNAL_USERS,
KRAFT_NODE_ID_OFFSET,
MIN_REPLICAS,
OAUTH_REL_NAME,
PEER,
Expand Down Expand Up @@ -84,7 +87,7 @@ class PeerClusterData(ProviderData, RequirerData):
"""Broker provider data model."""

SECRET_LABEL_MAP = SECRET_LABEL_MAP
SECRET_FIELDS = BALANCER.requested_secrets
SECRET_FIELDS = list(set(BALANCER.requested_secrets) | set(CONTROLLER.requested_secrets))


class ClusterState(Object):
Expand Down Expand Up @@ -136,46 +139,48 @@ def peer_cluster_relation(self) -> Relation | None:
@property
def peer_cluster_orchestrator(self) -> PeerCluster:
"""The state for the related `peer-cluster-orchestrator` application that this charm is requiring from."""
balancer_kwargs: dict[str, Any] = (
{
"balancer_username": self.cluster.balancer_username,
"balancer_password": self.cluster.balancer_password,
"balancer_uris": self.cluster.balancer_uris,
}
if self.runs_balancer
else {}
)
extra_kwargs: dict[str, Any] = {}

if self.runs_balancer:
extra_kwargs.update(
{
"balancer_username": self.cluster.balancer_username,
"balancer_password": self.cluster.balancer_password,
"balancer_uris": self.cluster.balancer_uris,
}
)

if self.runs_controller:
extra_kwargs.update(
{
"controller_quorum_uris": self.cluster.controller_quorum_uris,
}
)

return PeerCluster(
relation=self.peer_cluster_relation,
data_interface=PeerClusterData(self.model, PEER_CLUSTER_RELATION),
**balancer_kwargs,
**extra_kwargs,
)

@property
def peer_cluster(self) -> PeerCluster:
"""The state for the related `peer-cluster` application that this charm is providing to."""
return PeerCluster(
relation=self.peer_cluster_orchestrator_relation,
data_interface=PeerClusterOrchestratorData(
self.model, PEER_CLUSTER_ORCHESTRATOR_RELATION
),
)

@property
def balancer(self) -> PeerCluster:
"""The state for the `peer-cluster-orchestrator` related balancer application."""
balancer_kwargs: dict[str, Any] = (
{
"balancer_username": self.cluster.balancer_username,
"balancer_password": self.cluster.balancer_password,
"balancer_uris": self.cluster.balancer_uris,
}
if self.runs_balancer
else {}
)
extra_kwargs: dict[str, Any] = {}

if self.runs_broker: # must be providing, initialise with necessary broker data
if self.runs_controller or self.runs_balancer:
extra_kwargs.update(
{
"balancer_username": self.cluster.balancer_username,
"balancer_password": self.cluster.balancer_password,
"balancer_uris": self.cluster.balancer_uris,
"controller_quorum_uris": self.cluster.controller_quorum_uris,
}
)

# FIXME: `cluster_manager` check instead of running broker
# must be providing, initialise with necessary broker data
if self.runs_broker:
return PeerCluster(
relation=self.peer_cluster_orchestrator_relation, # if same app, this will be None and OK
data_interface=PeerClusterOrchestratorData(
Expand All @@ -184,12 +189,13 @@ def balancer(self) -> PeerCluster:
broker_username=ADMIN_USER,
broker_password=self.cluster.internal_user_credentials.get(ADMIN_USER, ""),
broker_uris=self.bootstrap_server,
cluster_uuid=self.cluster.cluster_uuid,
racks=self.racks,
broker_capacities=self.broker_capacities,
zk_username=self.zookeeper.username,
zk_password=self.zookeeper.password,
zk_uris=self.zookeeper.uris,
**balancer_kwargs, # in case of roles=broker,balancer on this app
**extra_kwargs, # in case of roles=broker,[balancer,controller] on this app
)

else: # must be roles=balancer only then, only load with necessary balancer data
Expand Down Expand Up @@ -345,7 +351,11 @@ def default_auth(self) -> AuthMap:
def enabled_auth(self) -> list[AuthMap]:
"""The currently enabled auth.protocols and their auth.mechanisms, based on related applications."""
enabled_auth = []
if self.client_relations or self.runs_balancer or self.peer_cluster_orchestrator_relation:
if (
self.client_relations
or self.runs_balancer
or BALANCER.value in self.peer_cluster_orchestrator.roles
):
enabled_auth.append(self.default_auth)
if self.oauth_relation:
enabled_auth.append(AuthMap(self.default_auth.protocol, "OAUTHBEARER"))
Expand Down Expand Up @@ -394,6 +404,21 @@ def bootstrap_server(self) -> str:
)
)

@property
def controller_quorum_uris(self) -> str:
"""The current controller quorum uris when running KRaft mode."""
# FIXME: when running broker node.id will be unit-id + 100. If unit is only running
# the controller node.id == unit-id. This way we can keep a human readable mapping of ids.
if self.runs_controller:
node_offset = KRAFT_NODE_ID_OFFSET if self.runs_broker else 0
return ",".join(
[
f"{broker.unit_id + node_offset}@{broker.host}:{CONTROLLER_PORT}"
for broker in self.brokers
]
)
return ""

@property
def log_dirs(self) -> str:
"""Builds the necessary log.dirs based on mounted storage volumes.
Expand Down Expand Up @@ -446,7 +471,7 @@ def ready_to_start(self) -> Status: # noqa: C901
if not self.peer_relation:
return Status.NO_PEER_RELATION

for status in [self._broker_status, self._balancer_status]:
for status in [self._broker_status, self._balancer_status, self._controller_status]:
if status != Status.ACTIVE:
return status

Expand All @@ -461,29 +486,40 @@ def _balancer_status(self) -> Status:
if not self.peer_cluster_relation and not self.runs_broker:
return Status.NO_PEER_CLUSTER_RELATION

if not self.balancer.broker_connected:
if not self.peer_cluster.broker_connected:
return Status.NO_BROKER_DATA

if len(self.balancer.broker_capacities.get("brokerCapacities", [])) < MIN_REPLICAS:
if len(self.peer_cluster.broker_capacities.get("brokerCapacities", [])) < MIN_REPLICAS:
return Status.NOT_ENOUGH_BROKERS

return Status.ACTIVE

@property
def _broker_status(self) -> Status:
def _broker_status(self) -> Status: # noqa: C901
"""Checks for role=broker specific readiness."""
if not self.runs_broker:
return Status.ACTIVE

if not self.zookeeper:
return Status.ZK_NOT_RELATED
# Neither ZooKeeper or KRaft are active
if self.kraft_mode is None:
return Status.MISSING_MODE

if self.kraft_mode:
if not self.peer_cluster.controller_quorum_uris: # FIXME: peer_cluster or cluster?
return Status.NO_QUORUM_URIS
if not self.cluster.cluster_uuid:
return Status.NO_CLUSTER_UUID

if not self.zookeeper.zookeeper_connected:
return Status.ZK_NO_DATA
if self.kraft_mode == False: # noqa: E712
if not self.zookeeper:
return Status.ZK_NOT_RELATED

# TLS must be enabled for Kafka and ZK or disabled for both
if self.cluster.tls_enabled ^ self.zookeeper.tls:
return Status.ZK_TLS_MISMATCH
if not self.zookeeper.zookeeper_connected:
return Status.ZK_NO_DATA

# TLS must be enabled for Kafka and ZK or disabled for both
if self.cluster.tls_enabled ^ self.zookeeper.tls:
return Status.ZK_TLS_MISMATCH

if self.cluster.tls_enabled and not self.unit_broker.certificate:
return Status.NO_CERT
Expand All @@ -493,6 +529,37 @@ def _broker_status(self) -> Status:

return Status.ACTIVE

@property
def _controller_status(self) -> Status:
"""Checks for role=controller specific readiness."""
if not self.runs_controller:
return Status.ACTIVE

if not self.peer_cluster_relation and not self.runs_broker:
return Status.NO_PEER_CLUSTER_RELATION

if not self.peer_cluster.broker_connected_kraft_mode:
return Status.NO_BROKER_DATA

return Status.ACTIVE

@property
def kraft_mode(self) -> bool | None:
"""Is the deployment running in KRaft mode?
Returns:
True if Kraft mode, False if ZooKeeper, None when undefined.
"""
# NOTE: self.roles when running colocated, peer_cluster.roles when multiapp
if CONTROLLER.value in (self.roles + self.peer_cluster.roles):
return True
if self.zookeeper_relation:
return False

# FIXME raise instead of none. `not kraft_mode` is falsy
# NOTE: if previous checks are not met, we don't know yet how the charm is being deployed
return None

@property
def runs_balancer(self) -> bool:
"""Is the charm enabling the balancer?"""
Expand All @@ -502,3 +569,8 @@ def runs_balancer(self) -> bool:
def runs_broker(self) -> bool:
"""Is the charm enabling the broker(s)?"""
return BROKER.value in self.roles

@property
def runs_controller(self) -> bool:
"""Is the charm enabling the controller?"""
return CONTROLLER.value in self.roles
Loading

0 comments on commit 86f0445

Please sign in to comment.