Skip to content

Commit

Permalink
[DPE-5553] feat: Don't restart server on keystore/truststore updates (#…
Browse files Browse the repository at this point in the history
…272)

Co-authored-by: Iman Enami <[email protected]>
  • Loading branch information
imanenami and Iman Enami authored Nov 25, 2024
1 parent 4571d08 commit 650bbe3
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 4 deletions.
19 changes: 19 additions & 0 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,25 @@ def bootstrap_server(self) -> str:
)
)

@property
def bootstrap_server_internal(self) -> str:
"""Comma-delimited string of `bootstrap-server` command flag for internal access.
Returns:
List of `bootstrap-server` servers
"""
if not self.peer_relation:
return ""

return ",".join(
sorted(
[
f"{broker.internal_address}:{SECURITY_PROTOCOL_PORTS[self.default_auth].internal}"
for broker in self.brokers
]
)
)

@property
def controller_quorum_uris(self) -> str:
"""The current controller quorum uris when running KRaft mode."""
Expand Down
10 changes: 6 additions & 4 deletions src/events/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ def _trusted_relation_created(self, event: EventBase) -> None:
event.defer()
return

# Create a "mtls" flag so a new listener (CLIENT_SSL) is created
self.charm.state.cluster.update({"mtls": "enabled"})
if not self.charm.state.cluster.mtls_enabled:
# Create a "mtls" flag so a new listener (CLIENT_SSL) is created
self.charm.state.cluster.update({"mtls": "enabled"})
self.charm.on.config_changed.emit()

def _trusted_relation_joined(self, event: RelationJoinedEvent) -> None:
"""Generate a CSR so the tls-certificates operator works as expected."""
Expand Down Expand Up @@ -208,8 +210,8 @@ def _trusted_relation_changed(self, event: RelationChangedEvent) -> None:
)
self.charm.broker.tls_manager.import_cert(alias=f"{alias}", filename=filename)

# ensuring new config gets applied
self.charm.on[f"{self.charm.restart.name}"].acquire_lock.emit()
# Live reload the truststore
self.charm.broker.tls_manager.reload_truststore()

def _trusted_relation_broken(self, event: RelationBrokenEvent) -> None:
"""Handle relation broken for a trusted certificate/ca relation."""
Expand Down
17 changes: 17 additions & 0 deletions src/managers/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,20 @@ def remove_stores(self) -> None:
except (subprocess.CalledProcessError, ExecError) as e:
logger.error(e.stdout)
raise e

def reload_truststore(self) -> None:
"""Reloads the truststore using `kafka-configs` utility without restarting the broker."""
bin_args = [
f"--command-config {self.workload.paths.client_properties}",
f"--bootstrap-server {self.state.bootstrap_server_internal}",
"--entity-type brokers",
f"--entity-name {self.state.unit_broker.unit_id}",
"--alter",
f"--add-config listener.name.CLIENT_SSL_SSL.ssl.truststore.location={self.workload.paths.truststore}",
]

logger.info("Reloading truststore")
self.workload.run_bin_command(
bin_keyword="configs",
bin_args=bin_args,
)
46 changes: 46 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,25 @@ async def set_mtls_client_acls(ops_test: OpsTest, bootstrap_server: str) -> str:
return result


async def create_test_topic(ops_test: OpsTest, bootstrap_server: str) -> str:
"""Creates `test` topic and adds ACLs for principal `User:*`."""
_ = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju ssh kafka/0 sudo -i 'sudo charmed-kafka.topics --bootstrap-server {bootstrap_server} --command-config {PATHS['kafka']['CONF']}/client.properties -create -topic test'",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

result = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju ssh kafka/0 sudo -i 'sudo charmed-kafka.acls --bootstrap-server {bootstrap_server} --add --allow-principal=User:* --operation READ --operation WRITE --operation CREATE --topic test --command-config {PATHS['kafka']['CONF']}/client.properties'",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

return result


def count_lines_with(model_full_name: str | None, unit: str, file: str, pattern: str) -> int:
result = check_output(
f"JUJU_MODEL={model_full_name} juju ssh {unit} sudo -i 'grep \"{pattern}\" {file} | wc -l'",
Expand Down Expand Up @@ -330,6 +349,33 @@ def get_secret_by_label(ops_test: OpsTest, label: str, owner: str) -> dict[str,
return secret_data[secret_id]["content"]["Data"]


def search_secrets(ops_test: OpsTest, owner: str, search_key: str) -> str:
secrets_meta_raw = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju list-secrets --format json",
stderr=PIPE,
shell=True,
universal_newlines=True,
).strip()
secrets_meta = json.loads(secrets_meta_raw)

for secret_id in secrets_meta:
if owner and not secrets_meta[secret_id]["owner"] == owner:
continue

secrets_data_raw = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju show-secret --format json --reveal {secret_id}",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

secret_data = json.loads(secrets_data_raw)
if search_key in secret_data[secret_id]["content"]["Data"]:
return secret_data[secret_id]["content"]["Data"][search_key]

return ""


def show_unit(ops_test: OpsTest, unit_name: str) -> Any:
result = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju show-unit {unit_name}",
Expand Down
108 changes: 108 additions & 0 deletions tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@

import asyncio
import base64
import json
import logging
import os
import tempfile

import kafka
import pytest
from charms.tls_certificates_interface.v1.tls_certificates import generate_private_key
from pytest_operator.plugin import OpsTest
Expand All @@ -22,11 +26,13 @@
APP_NAME,
REL_NAME_ADMIN,
check_tls,
create_test_topic,
extract_ca,
extract_private_key,
get_active_brokers,
get_address,
get_kafka_zk_relation_data,
search_secrets,
set_mtls_client_acls,
set_tls_private_key,
)
Expand All @@ -37,6 +43,7 @@
TLS_NAME = "self-signed-certificates"
CERTS_NAME = "tls-certificates-operator"
MTLS_NAME = "mtls"
TLS_REQUIRER = "tls-certificates-requirer"


@pytest.mark.abort_on_fail
Expand Down Expand Up @@ -227,6 +234,107 @@ async def test_mtls(ops_test: OpsTest):
assert max_offset == str(num_messages)


@pytest.mark.abort_on_fail
async def test_truststore_live_reload(ops_test: OpsTest):
"""Tests truststore live reload functionality using kafka-python client."""
requirer = "other-req/0"
test_msg = {"test": 123456}

await ops_test.model.deploy(
TLS_NAME, channel="stable", application_name="other-ca", revision=155
)
await ops_test.model.deploy(
TLS_REQUIRER, channel="stable", application_name="other-req", revision=102
)

await ops_test.model.add_relation("other-ca", "other-req")

await ops_test.model.wait_for_idle(
apps=["other-ca", "other-req"], idle_period=60, timeout=2000, status="active"
)

# retrieve required certificates and private key from secrets
local_store = {
"private_key": search_secrets(ops_test=ops_test, owner=requirer, search_key="private-key"),
"cert": search_secrets(ops_test=ops_test, owner=requirer, search_key="certificate"),
"ca_cert": search_secrets(ops_test=ops_test, owner=requirer, search_key="ca-certificate"),
"broker_ca": search_secrets(
ops_test=ops_test, owner=f"{APP_NAME}/0", search_key="ca-cert"
),
}

certs_operator_config = {
"generate-self-signed-certificates": "false",
"certificate": base64.b64encode(local_store["cert"].encode("utf-8")).decode("utf-8"),
"ca-certificate": base64.b64encode(local_store["ca_cert"].encode("utf-8")).decode("utf-8"),
}

await ops_test.model.deploy(
CERTS_NAME,
channel="stable",
series="jammy",
application_name="other-op",
config=certs_operator_config,
)

await ops_test.model.wait_for_idle(
apps=["other-op"], idle_period=60, timeout=2000, status="active"
)

# We don't expect a broker restart here because of truststore live reload
await ops_test.model.add_relation(f"{APP_NAME}:{TRUSTED_CERTIFICATE_RELATION}", "other-op")

await ops_test.model.wait_for_idle(
apps=["other-op", APP_NAME], idle_period=60, timeout=2000, status="active"
)

address = await get_address(ops_test, app_name=APP_NAME, unit_num=0)
ssl_port = SECURITY_PROTOCOL_PORTS["SSL", "SSL"].client
ssl_bootstrap_server = f"{address}:{ssl_port}"
sasl_port = SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
sasl_bootstrap_server = f"{address}:{sasl_port}"

# create `test` topic and set ACLs
await create_test_topic(ops_test, bootstrap_server=sasl_bootstrap_server)

# quickly test the producer and consumer side authentication & authorization
tmp_dir = tempfile.TemporaryDirectory()
tmp_paths = {}
for key, content in local_store.items():
tmp_paths[key] = os.path.join(tmp_dir.name, key)
with open(tmp_paths[key], "w", encoding="utf-8") as f:
f.write(content)

client_config = {
"bootstrap_servers": ssl_bootstrap_server,
"security_protocol": "SSL",
"api_version": (0, 10),
"ssl_cafile": tmp_paths["broker_ca"],
"ssl_certfile": tmp_paths["cert"],
"ssl_keyfile": tmp_paths["private_key"],
"ssl_check_hostname": False,
}

producer = kafka.KafkaProducer(
**client_config,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

producer.send("test", test_msg)

consumer = kafka.KafkaConsumer("test", **client_config, auto_offset_reset="earliest")

msg = next(consumer)

assert json.loads(msg.value) == test_msg

# cleanup
await ops_test.model.remove_application("other-ca", block_until_done=True)
await ops_test.model.remove_application("other-op", block_until_done=True)
await ops_test.model.remove_application("other-req", block_until_done=True)
tmp_dir.cleanup()


@pytest.mark.abort_on_fail
async def test_mtls_broken(ops_test: OpsTest):
await ops_test.model.remove_application(MTLS_NAME, block_until_done=True)
Expand Down

0 comments on commit 650bbe3

Please sign in to comment.