Skip to content

Commit

Permalink
[DPE-5553] feat: Don't restart server on keystore/truststore updates …
Browse files Browse the repository at this point in the history
…Kafka
  • Loading branch information
Iman Enami committed Nov 12, 2024
1 parent 808c702 commit afa5abc
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 4 deletions.
19 changes: 19 additions & 0 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,25 @@ def bootstrap_server(self) -> str:
)
)

@property
def bootstrap_server_internal(self) -> str:
"""Comma-delimited string of `bootstrap-server` command flag for internal access.
Returns:
List of `bootstrap-server` servers
"""
if not self.peer_relation:
return ""

return ",".join(
sorted(
[
f"{broker.internal_address}:{SECURITY_PROTOCOL_PORTS[self.default_auth].internal}"
for broker in self.brokers
]
)
)

@property
def controller_quorum_uris(self) -> str:
"""The current controller quorum uris when running KRaft mode."""
Expand Down
11 changes: 7 additions & 4 deletions src/events/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,11 @@ def _trusted_relation_created(self, event: EventBase) -> None:
event.defer()
return

# Create a "mtls" flag so a new listener (CLIENT_SSL) is created
self.charm.state.cluster.update({"mtls": "enabled"})
if not self.charm.state.cluster.mtls_enabled:
# Create a "mtls" flag so a new listener (CLIENT_SSL) is created
self.charm.state.cluster.update({"mtls": "enabled"})
self.charm.on[f"{self.charm.restart.name}"].acquire_lock.emit()

self.charm.app.status = ActiveStatus()

def _trusted_relation_joined(self, event: RelationJoinedEvent) -> None:
Expand Down Expand Up @@ -208,8 +211,8 @@ def _trusted_relation_changed(self, event: RelationChangedEvent) -> None:
)
self.charm.broker.tls_manager.import_cert(alias=f"{alias}", filename=filename)

# ensuring new config gets applied
self.charm.on[f"{self.charm.restart.name}"].acquire_lock.emit()
# Live reload the truststore
self.charm.broker.tls_manager.reload_truststore()

def _trusted_relation_broken(self, event: RelationBrokenEvent) -> None:
"""Handle relation broken for a trusted certificate/ca relation."""
Expand Down
13 changes: 13 additions & 0 deletions src/managers/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,16 @@ def remove_stores(self) -> None:
except (subprocess.CalledProcessError, ExecError) as e:
logger.error(e.stdout)
raise e

def reload_truststore(self) -> None:
"""Reloads the truststore using `kafka-configs` utility without restarting the broker."""
config_tool = "charmed-kafka.configs"
cmd = f"{config_tool} --command-config {self.workload.paths.client_properties} --bootstrap-server {self.state.bootstrap_server_internal} --entity-type brokers --entity-name {self.state.unit_broker.unit_id} --alter --add-config listener.name.CLIENT_SSL_SSL.ssl.keystore.location={self.workload.paths.truststore}"

logger.info(f"Reloading truststore: {cmd}")
try:
self.workload.exec(cmd, working_dir=self.workload.paths.conf_path)
except (subprocess.CalledProcessError, ExecError) as e:
# in case this reruns and fails
logger.error(e.stdout)
raise e
46 changes: 46 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,25 @@ async def set_mtls_client_acls(ops_test: OpsTest, bootstrap_server: str) -> str:
return result


async def create_test_topic(ops_test: OpsTest, bootstrap_server: str) -> str:
"""Creates `test` topic and adds ACLs for principal `User:*`."""
_ = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju ssh kafka/0 sudo -i 'sudo charmed-kafka.topics --bootstrap-server {bootstrap_server} --command-config {PATHS['kafka']['CONF']}/client.properties -create -topic test'",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

result = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju ssh kafka/0 sudo -i 'sudo charmed-kafka.acls --bootstrap-server {bootstrap_server} --add --allow-principal=User:* --operation READ --operation WRITE --operation CREATE --topic test --command-config {PATHS['kafka']['CONF']}/client.properties'",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

return result


def count_lines_with(model_full_name: str | None, unit: str, file: str, pattern: str) -> int:
result = check_output(
f"JUJU_MODEL={model_full_name} juju ssh {unit} sudo -i 'grep \"{pattern}\" {file} | wc -l'",
Expand Down Expand Up @@ -330,6 +349,33 @@ def get_secret_by_label(ops_test: OpsTest, label: str, owner: str) -> dict[str,
return secret_data[secret_id]["content"]["Data"]


def search_secrets(ops_test: OpsTest, owner: str, search_key: str) -> str:
secrets_meta_raw = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju list-secrets --format json",
stderr=PIPE,
shell=True,
universal_newlines=True,
).strip()
secrets_meta = json.loads(secrets_meta_raw)

for secret_id in secrets_meta:
if owner and not secrets_meta[secret_id]["owner"] == owner:
continue

secrets_data_raw = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju show-secret --format json --reveal {secret_id}",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

secret_data = json.loads(secrets_data_raw)
if search_key in secret_data[secret_id]["content"]["Data"]:
return secret_data[secret_id]["content"]["Data"][search_key]

return ""


def show_unit(ops_test: OpsTest, unit_name: str) -> Any:
result = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju show-unit {unit_name}",
Expand Down
104 changes: 104 additions & 0 deletions tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import asyncio
import base64
import json
import logging
import os

import pytest
from charms.tls_certificates_interface.v1.tls_certificates import generate_private_key
Expand All @@ -22,11 +24,13 @@
APP_NAME,
REL_NAME_ADMIN,
check_tls,
create_test_topic,
extract_ca,
extract_private_key,
get_active_brokers,
get_address,
get_kafka_zk_relation_data,
search_secrets,
set_mtls_client_acls,
set_tls_private_key,
)
Expand All @@ -37,6 +41,7 @@
TLS_NAME = "self-signed-certificates"
CERTS_NAME = "tls-certificates-operator"
MTLS_NAME = "mtls"
TLS_REQUIRER = "tls-certificates-requirer"


@pytest.mark.abort_on_fail
Expand Down Expand Up @@ -227,6 +232,105 @@ async def test_mtls(ops_test: OpsTest):
assert max_offset == str(num_messages)


@pytest.mark.abort_on_fail
async def test_truststore_live_reload(ops_test: OpsTest):
"""Tests truststore live reload functionality using kafka-python client."""
requirer = "other-req/0"
test_msg = {"test": 123456}

await ops_test.model.deploy(
TLS_NAME, channel="stable", series="jammy", application_name="other-ca"
)
await ops_test.model.deploy(
TLS_REQUIRER, channel="stable", series="jammy", application_name="other-req"
)

await ops_test.model.add_relation("other-ca", "other-req")

await ops_test.model.wait_for_idle(
apps=["other-ca", "other-req"], idle_period=60, timeout=2000, status="active"
)

# retrieve required certificates and private key from secrets
local_store = {
"private_key": search_secrets(ops_test=ops_test, owner=requirer, search_key="private-key"),
"cert": search_secrets(ops_test=ops_test, owner=requirer, search_key="certificate"),
"ca_cert": search_secrets(ops_test=ops_test, owner=requirer, search_key="ca-certificate"),
"broker_ca": extract_ca(ops_test=ops_test, unit_name=f"{APP_NAME}/0"),
}

certs_operator_config = {
"generate-self-signed-certificates": "false",
"certificate": base64.b64encode(local_store["cert"].encode("utf-8")).decode("utf-8"),
"ca-certificate": base64.b64encode(local_store["ca_cert"].encode("utf-8")).decode("utf-8"),
}

await ops_test.model.deploy(
CERTS_NAME,
channel="stable",
series="jammy",
application_name="other-op",
config=certs_operator_config,
)

await ops_test.model.wait_for_idle(
apps=["other-op"], idle_period=60, timeout=2000, status="active"
)

# We don't expect a broker restart here because of truststore live reload
await ops_test.model.add_relation(f"{APP_NAME}:{TRUSTED_CERTIFICATE_RELATION}", "other-op")

await ops_test.model.wait_for_idle(
apps=["other-op", APP_NAME], idle_period=60, timeout=2000, status="active"
)

for key_, content in local_store.items():
with open(f"test_{key_}", "w", encoding="utf-8") as f:
f.write(content)

address = await get_address(ops_test, app_name=APP_NAME)
ssl_port = SECURITY_PROTOCOL_PORTS["SSL", "SSL"].client
ssl_bootstrap_server = f"{address}:{ssl_port}"
sasl_port = SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
sasl_bootstrap_server = f"{address}:{sasl_port}"

# create `test` topic and set ACLs
await create_test_topic(ops_test, bootstrap_server=sasl_bootstrap_server)

# quickly test the producer and consumer side authentication & authorization
client_config = {
"bootstrap_servers": ssl_bootstrap_server,
"security_protocol": "SSL",
"api_version": (0, 10),
"ssl_cafile": "test_broker_ca",
"ssl_certfile": "test_cert",
"ssl_keyfile": "test_private_key",
"ssl_check_hostname": False,
}

import kafka

producer = kafka.KafkaProducer(
**client_config,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

producer.send("test", test_msg)

consumer = kafka.KafkaConsumer("test", **client_config, auto_offset_reset="earliest")

msg = next(consumer)

assert json.loads(msg.value) == test_msg

# cleanup
await ops_test.model.remove_application("other-ca", block_until_done=True)
await ops_test.model.remove_application("other-op", block_until_done=True)
await ops_test.model.remove_application("other-req", block_until_done=True)
for key_ in local_store:
os.remove(f"test_{key_}")


@pytest.mark.abort_on_fail
async def test_mtls_broken(ops_test: OpsTest):
await ops_test.model.remove_application(MTLS_NAME, block_until_done=True)
Expand Down

0 comments on commit afa5abc

Please sign in to comment.