Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5553] feat: Don't restart server on keystore/truststore updates #272

Merged
merged 5 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,25 @@ def bootstrap_server(self) -> str:
)
)

@property
def bootstrap_server_internal(self) -> str:
imanenami marked this conversation as resolved.
Show resolved Hide resolved
"""Comma-delimited string of `bootstrap-server` command flag for internal access.

Returns:
List of `bootstrap-server` servers
"""
if not self.peer_relation:
return ""

return ",".join(
sorted(
[
f"{broker.internal_address}:{SECURITY_PROTOCOL_PORTS[self.default_auth].internal}"
for broker in self.brokers
]
)
)

@property
def controller_quorum_uris(self) -> str:
"""The current controller quorum uris when running KRaft mode."""
Expand Down
10 changes: 6 additions & 4 deletions src/events/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ def _trusted_relation_created(self, event: EventBase) -> None:
event.defer()
return

# Create a "mtls" flag so a new listener (CLIENT_SSL) is created
self.charm.state.cluster.update({"mtls": "enabled"})
if not self.charm.state.cluster.mtls_enabled:
# Create a "mtls" flag so a new listener (CLIENT_SSL) is created
self.charm.state.cluster.update({"mtls": "enabled"})
self.charm.on.config_changed.emit()

def _trusted_relation_joined(self, event: RelationJoinedEvent) -> None:
"""Generate a CSR so the tls-certificates operator works as expected."""
Expand Down Expand Up @@ -208,8 +210,8 @@ def _trusted_relation_changed(self, event: RelationChangedEvent) -> None:
)
self.charm.broker.tls_manager.import_cert(alias=f"{alias}", filename=filename)

# ensuring new config gets applied
self.charm.on[f"{self.charm.restart.name}"].acquire_lock.emit()
# Live reload the truststore
self.charm.broker.tls_manager.reload_truststore()
imanenami marked this conversation as resolved.
Show resolved Hide resolved

def _trusted_relation_broken(self, event: RelationBrokenEvent) -> None:
"""Handle relation broken for a trusted certificate/ca relation."""
Expand Down
17 changes: 17 additions & 0 deletions src/managers/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,20 @@ def remove_stores(self) -> None:
except (subprocess.CalledProcessError, ExecError) as e:
logger.error(e.stdout)
raise e

def reload_truststore(self) -> None:
"""Reloads the truststore using `kafka-configs` utility without restarting the broker."""
bin_args = [
f"--command-config {self.workload.paths.client_properties}",
f"--bootstrap-server {self.state.bootstrap_server_internal}",
"--entity-type brokers",
f"--entity-name {self.state.unit_broker.unit_id}",
"--alter",
f"--add-config listener.name.CLIENT_SSL_SSL.ssl.truststore.location={self.workload.paths.truststore}",
]

logger.info("Reloading truststore")
self.workload.run_bin_command(
bin_keyword="configs",
bin_args=bin_args,
)
46 changes: 46 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,25 @@ async def set_mtls_client_acls(ops_test: OpsTest, bootstrap_server: str) -> str:
return result


async def create_test_topic(ops_test: OpsTest, bootstrap_server: str) -> str:
"""Creates `test` topic and adds ACLs for principal `User:*`."""
_ = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju ssh kafka/0 sudo -i 'sudo charmed-kafka.topics --bootstrap-server {bootstrap_server} --command-config {PATHS['kafka']['CONF']}/client.properties -create -topic test'",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

result = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju ssh kafka/0 sudo -i 'sudo charmed-kafka.acls --bootstrap-server {bootstrap_server} --add --allow-principal=User:* --operation READ --operation WRITE --operation CREATE --topic test --command-config {PATHS['kafka']['CONF']}/client.properties'",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

return result


imanenami marked this conversation as resolved.
Show resolved Hide resolved
def count_lines_with(model_full_name: str | None, unit: str, file: str, pattern: str) -> int:
result = check_output(
f"JUJU_MODEL={model_full_name} juju ssh {unit} sudo -i 'grep \"{pattern}\" {file} | wc -l'",
Expand Down Expand Up @@ -330,6 +349,33 @@ def get_secret_by_label(ops_test: OpsTest, label: str, owner: str) -> dict[str,
return secret_data[secret_id]["content"]["Data"]


def search_secrets(ops_test: OpsTest, owner: str, search_key: str) -> str:
secrets_meta_raw = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju list-secrets --format json",
stderr=PIPE,
shell=True,
universal_newlines=True,
).strip()
secrets_meta = json.loads(secrets_meta_raw)

for secret_id in secrets_meta:
if owner and not secrets_meta[secret_id]["owner"] == owner:
continue

secrets_data_raw = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju show-secret --format json --reveal {secret_id}",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

secret_data = json.loads(secrets_data_raw)
if search_key in secret_data[secret_id]["content"]["Data"]:
return secret_data[secret_id]["content"]["Data"][search_key]

return ""


def show_unit(ops_test: OpsTest, unit_name: str) -> Any:
result = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju show-unit {unit_name}",
Expand Down
108 changes: 108 additions & 0 deletions tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@

import asyncio
import base64
import json
import logging
import os
import tempfile

import kafka
import pytest
from charms.tls_certificates_interface.v1.tls_certificates import generate_private_key
from pytest_operator.plugin import OpsTest
Expand All @@ -22,11 +26,13 @@
APP_NAME,
REL_NAME_ADMIN,
check_tls,
create_test_topic,
extract_ca,
extract_private_key,
get_active_brokers,
get_address,
get_kafka_zk_relation_data,
search_secrets,
set_mtls_client_acls,
set_tls_private_key,
)
Expand All @@ -37,6 +43,7 @@
TLS_NAME = "self-signed-certificates"
CERTS_NAME = "tls-certificates-operator"
MTLS_NAME = "mtls"
TLS_REQUIRER = "tls-certificates-requirer"


@pytest.mark.abort_on_fail
Expand Down Expand Up @@ -227,6 +234,107 @@ async def test_mtls(ops_test: OpsTest):
assert max_offset == str(num_messages)


@pytest.mark.abort_on_fail
async def test_truststore_live_reload(ops_test: OpsTest):
imanenami marked this conversation as resolved.
Show resolved Hide resolved
"""Tests truststore live reload functionality using kafka-python client."""
requirer = "other-req/0"
test_msg = {"test": 123456}

await ops_test.model.deploy(
TLS_NAME, channel="stable", application_name="other-ca", revision=155
)
await ops_test.model.deploy(
TLS_REQUIRER, channel="stable", application_name="other-req", revision=102
)

await ops_test.model.add_relation("other-ca", "other-req")

await ops_test.model.wait_for_idle(
apps=["other-ca", "other-req"], idle_period=60, timeout=2000, status="active"
)

# retrieve required certificates and private key from secrets
local_store = {
"private_key": search_secrets(ops_test=ops_test, owner=requirer, search_key="private-key"),
"cert": search_secrets(ops_test=ops_test, owner=requirer, search_key="certificate"),
"ca_cert": search_secrets(ops_test=ops_test, owner=requirer, search_key="ca-certificate"),
"broker_ca": search_secrets(
ops_test=ops_test, owner=f"{APP_NAME}/0", search_key="ca-cert"
),
}

certs_operator_config = {
"generate-self-signed-certificates": "false",
"certificate": base64.b64encode(local_store["cert"].encode("utf-8")).decode("utf-8"),
"ca-certificate": base64.b64encode(local_store["ca_cert"].encode("utf-8")).decode("utf-8"),
}

await ops_test.model.deploy(
CERTS_NAME,
channel="stable",
series="jammy",
application_name="other-op",
config=certs_operator_config,
)

await ops_test.model.wait_for_idle(
apps=["other-op"], idle_period=60, timeout=2000, status="active"
)

# We don't expect a broker restart here because of truststore live reload
await ops_test.model.add_relation(f"{APP_NAME}:{TRUSTED_CERTIFICATE_RELATION}", "other-op")

await ops_test.model.wait_for_idle(
apps=["other-op", APP_NAME], idle_period=60, timeout=2000, status="active"
)

address = await get_address(ops_test, app_name=APP_NAME, unit_num=0)
ssl_port = SECURITY_PROTOCOL_PORTS["SSL", "SSL"].client
ssl_bootstrap_server = f"{address}:{ssl_port}"
sasl_port = SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
sasl_bootstrap_server = f"{address}:{sasl_port}"

# create `test` topic and set ACLs
await create_test_topic(ops_test, bootstrap_server=sasl_bootstrap_server)

# quickly test the producer and consumer side authentication & authorization
tmp_dir = tempfile.TemporaryDirectory()
tmp_paths = {}
for key, content in local_store.items():
tmp_paths[key] = os.path.join(tmp_dir.name, key)
with open(tmp_paths[key], "w", encoding="utf-8") as f:
f.write(content)

client_config = {
"bootstrap_servers": ssl_bootstrap_server,
"security_protocol": "SSL",
"api_version": (0, 10),
"ssl_cafile": tmp_paths["broker_ca"],
"ssl_certfile": tmp_paths["cert"],
"ssl_keyfile": tmp_paths["private_key"],
"ssl_check_hostname": False,
}

producer = kafka.KafkaProducer(
**client_config,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

producer.send("test", test_msg)

consumer = kafka.KafkaConsumer("test", **client_config, auto_offset_reset="earliest")

msg = next(consumer)

assert json.loads(msg.value) == test_msg

# cleanup
await ops_test.model.remove_application("other-ca", block_until_done=True)
await ops_test.model.remove_application("other-op", block_until_done=True)
await ops_test.model.remove_application("other-req", block_until_done=True)
tmp_dir.cleanup()


@pytest.mark.abort_on_fail
async def test_mtls_broken(ops_test: OpsTest):
await ops_test.model.remove_application(MTLS_NAME, block_until_done=True)
Expand Down