diff --git a/src/core/cluster.py b/src/core/cluster.py index c66b2202..f10ee7dd 100644 --- a/src/core/cluster.py +++ b/src/core/cluster.py @@ -4,6 +4,7 @@ """Objects representing the state of KafkaCharm.""" +import logging import os from functools import cached_property from ipaddress import IPv4Address, IPv6Address @@ -58,6 +59,8 @@ if TYPE_CHECKING: from charm import KafkaCharm +logger = logging.getLogger(__name__) + custom_secret_groups = SECRET_GROUPS setattr(custom_secret_groups, "BROKER", "broker") setattr(custom_secret_groups, "BALANCER", "balancer") @@ -393,7 +396,12 @@ def bootstrap_server(self) -> str: return "" if self.config.expose_external: # implicitly checks for k8s in structured_config - return self.bootstrap_servers_external + # service might not be created yet by the broker + try: + return self.bootstrap_servers_external + except LightKubeApiError as e: + logger.debug(e) + return "" return ",".join( sorted( @@ -413,7 +421,7 @@ def controller_quorum_uris(self) -> str: node_offset = KRAFT_NODE_ID_OFFSET if self.runs_broker else 0 return ",".join( [ - f"{broker.unit_id + node_offset}@{broker.host}:{CONTROLLER_PORT}" + f"{broker.unit_id + node_offset}@{broker.internal_address}:{CONTROLLER_PORT}" for broker in self.brokers ] ) diff --git a/src/core/models.py b/src/core/models.py index 9aae4e4e..bd6eacf1 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -503,14 +503,6 @@ def internal_address(self) -> str: return addr - @property - def host(self) -> str: - """Return the hostname of a unit.""" - if self.substrate == "vm": - return self.internal_address - else: - return self.node_ip or self.internal_address - # --- TLS --- @property diff --git a/src/core/structured_config.py b/src/core/structured_config.py index 9261b57d..cd794996 100644 --- a/src/core/structured_config.py +++ b/src/core/structured_config.py @@ -239,7 +239,10 @@ def expose_external_validator(cls, value: str) -> str | None: if SUBSTRATE == "vm": return - if value == "none": + if value not in ["false", "nodeport"]: + raise ValueError("Value not one of 'false' or 'nodeport'") + + if value == "false": return return value diff --git a/src/events/balancer.py b/src/events/balancer.py index 2974a653..d46c8832 100644 --- a/src/events/balancer.py +++ b/src/events/balancer.py @@ -111,7 +111,7 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: payload = { "balancer-username": BALANCER_WEBSERVER_USER, "balancer-password": self.charm.workload.generate_password(), - "balancer-uris": f"{self.charm.state.unit_broker.host}:{BALANCER_WEBSERVER_PORT}", + "balancer-uris": f"{self.charm.state.unit_broker.internal_address}:{BALANCER_WEBSERVER_PORT}", } # Update relation data intra & extra cluster (if it exists) self.charm.state.cluster.update(payload) diff --git a/src/events/broker.py b/src/events/broker.py index cd579978..ebd5de77 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -367,9 +367,6 @@ def _on_storage_attached(self, event: StorageAttachedEvent) -> None: # set status only for running services, not on startup # FIXME re-add this self.workload.exec(["chmod", "-R", "750", f"{self.workload.paths.data_path}"]) - self.workload.exec( - ["chown", "-R", f"{USER}:{GROUP}", f"{self.workload.paths.data_path}"] - ) self.workload.exec( [ "bash", @@ -378,6 +375,13 @@ def _on_storage_attached(self, event: StorageAttachedEvent) -> None: ] ) + # all mounted data dirs should have correct ownership + self.workload.exec(["chown", "-R", f"{USER}:{GROUP}", f"{self.workload.paths.data_path}"]) + + # run this regardless of role, needed for cloud storages + ceph + for storage in self.charm.state.log_dirs.split(","): + self.workload.exec(["rm", "-rf", f"{storage}/lost+found"]) + # checks first whether the broker is active before warning if self.workload.active(): # new dirs won't be used until topic partitions are assigned to it @@ -434,8 +438,9 @@ def _init_kraft_mode(self) -> None: # cluster-uuid is only created on the broker (`cluster-manager` in large deployments) if not self.charm.state.cluster.cluster_uuid and self.charm.state.runs_broker: uuid = self.workload.run_bin_command( - bin_keyword="storage", bin_args=["random-uuid", "2>", "/dev/null"] + bin_keyword="storage", bin_args=["random-uuid"] ).strip() + self.charm.state.cluster.update({"cluster-uuid": uuid}) self.charm.state.peer_cluster.update({"cluster-uuid": uuid}) diff --git a/src/events/tls.py b/src/events/tls.py index 12253a9f..9970b5e3 100644 --- a/src/events/tls.py +++ b/src/events/tls.py @@ -152,7 +152,9 @@ def _trusted_relation_joined(self, event: RelationJoinedEvent) -> None: relation_id=event.relation.id, ) subject = ( - os.uname()[1] if self.charm.substrate == "k8s" else self.charm.state.unit_broker.host + os.uname()[1] + if self.charm.substrate == "k8s" + else self.charm.state.unit_broker.internal_address ) sans = self.charm.broker.tls_manager.build_sans() csr = ( diff --git a/src/events/zookeeper.py b/src/events/zookeeper.py index efe92d76..0e6b2a93 100644 --- a/src/events/zookeeper.py +++ b/src/events/zookeeper.py @@ -90,8 +90,12 @@ def _on_zookeeper_changed(self, event: RelationChangedEvent) -> None: try: internal_user_credentials = self._create_internal_credentials() - except (KeyError, RuntimeError, subprocess.CalledProcessError, ExecError) as e: - logger.warning(str(e)) + except (KeyError, RuntimeError) as e: + logger.warning(e) + event.defer() + return + except (subprocess.CalledProcessError, ExecError) as e: + logger.warning(f"{e.stdout}, {e.stderr}") event.defer() return diff --git a/src/managers/config.py b/src/managers/config.py index b60fe197..63f86c2d 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -538,7 +538,7 @@ def external_listeners(self) -> list[Listener]: Listener( auth_map=auth, scope="EXTERNAL", - host=self.state.unit_broker.host, + host=self.state.unit_broker.node_ip, # default in case service not created yet during cluster init # will resolve during config-changed node_port=node_port, diff --git a/src/managers/tls.py b/src/managers/tls.py index d6e3fbc5..fd937a1c 100644 --- a/src/managers/tls.py +++ b/src/managers/tls.py @@ -142,7 +142,7 @@ def build_sans(self) -> Sans: if self.substrate == "vm": return { "sans_ip": [ - self.state.unit_broker.host, + self.state.unit_broker.internal_address, ], "sans_dns": [self.state.unit_broker.unit.name, socket.getfqdn()] + self._build_extra_sans(), diff --git a/tests/unit/test_kraft.py b/tests/unit/test_kraft.py index 08f2f78b..7c6b6911 100644 --- a/tests/unit/test_kraft.py +++ b/tests/unit/test_kraft.py @@ -91,7 +91,6 @@ def test_ready_to_start_no_peer_cluster(charm_configuration, base_state: State): def test_ready_to_start_missing_data_as_controller(charm_configuration, base_state: State): # Given charm_configuration["options"]["roles"]["default"] = "controller" - charm_configuration["options"]["expose_external"]["default"] = "none" ctx = Context( KafkaCharm, meta=METADATA, @@ -112,7 +111,6 @@ def test_ready_to_start_missing_data_as_controller(charm_configuration, base_sta def test_ready_to_start_missing_data_as_broker(charm_configuration, base_state: State): # Given charm_configuration["options"]["roles"]["default"] = "broker" - charm_configuration["options"]["expose_external"]["default"] = "none" ctx = Context( KafkaCharm, meta=METADATA, @@ -136,7 +134,6 @@ def test_ready_to_start_missing_data_as_broker(charm_configuration, base_state: def test_ready_to_start(charm_configuration, base_state: State): # Given charm_configuration["options"]["roles"]["default"] = "broker,controller" - charm_configuration["options"]["expose_external"]["default"] = "none" ctx = Context( KafkaCharm, meta=METADATA, @@ -153,6 +150,7 @@ def test_ready_to_start(charm_configuration, base_state: State): ) as patched_run_bin_command, patch("health.KafkaHealth.machine_configured", return_value=True), patch("workload.KafkaWorkload.start"), + patch("workload.KafkaWorkload.active", return_value=True), patch("charms.operator_libs_linux.v1.snap.SnapCache"), ): state_out = ctx.run(ctx.on.start(), state_in)