Skip to content

Commit

Permalink
Apply code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Iman Enami committed Nov 14, 2024
1 parent afa5abc commit a40a2ab
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 38 deletions.
22 changes: 13 additions & 9 deletions src/managers/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,17 @@ def remove_stores(self) -> None:

def reload_truststore(self) -> None:
"""Reloads the truststore using `kafka-configs` utility without restarting the broker."""
config_tool = "charmed-kafka.configs"
cmd = f"{config_tool} --command-config {self.workload.paths.client_properties} --bootstrap-server {self.state.bootstrap_server_internal} --entity-type brokers --entity-name {self.state.unit_broker.unit_id} --alter --add-config listener.name.CLIENT_SSL_SSL.ssl.keystore.location={self.workload.paths.truststore}"
bin_args = [
f"--command-config {self.workload.paths.client_properties}",
f"--bootstrap-server {self.state.bootstrap_server_internal}",
"--entity-type brokers",
f"--entity-name {self.state.unit_broker.unit_id}",
"--alter",
f"--add-config listener.name.CLIENT_SSL_SSL.ssl.truststore.location={self.workload.paths.truststore}",
]

logger.info(f"Reloading truststore: {cmd}")
try:
self.workload.exec(cmd, working_dir=self.workload.paths.conf_path)
except (subprocess.CalledProcessError, ExecError) as e:
# in case this reruns and fails
logger.error(e.stdout)
raise e
logger.info("Reloading truststore")
self.workload.run_bin_command(
bin_keyword="configs",
bin_args=bin_args,
)
69 changes: 40 additions & 29 deletions tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import base64
import json
import logging
import os
import tempfile

import kafka
import pytest
from charms.tls_certificates_interface.v1.tls_certificates import generate_private_key
from pytest_operator.plugin import OpsTest
Expand Down Expand Up @@ -256,7 +257,9 @@ async def test_truststore_live_reload(ops_test: OpsTest):
"private_key": search_secrets(ops_test=ops_test, owner=requirer, search_key="private-key"),
"cert": search_secrets(ops_test=ops_test, owner=requirer, search_key="certificate"),
"ca_cert": search_secrets(ops_test=ops_test, owner=requirer, search_key="ca-certificate"),
"broker_ca": extract_ca(ops_test=ops_test, unit_name=f"{APP_NAME}/0"),
"broker_ca": search_secrets(
ops_test=ops_test, owner=f"{APP_NAME}/0", search_key="ca-cert"
),
}

certs_operator_config = {
Expand Down Expand Up @@ -284,11 +287,7 @@ async def test_truststore_live_reload(ops_test: OpsTest):
apps=["other-op", APP_NAME], idle_period=60, timeout=2000, status="active"
)

for key_, content in local_store.items():
with open(f"test_{key_}", "w", encoding="utf-8") as f:
f.write(content)

address = await get_address(ops_test, app_name=APP_NAME)
address = await get_address(ops_test, app_name=APP_NAME, unit_num=0)
ssl_port = SECURITY_PROTOCOL_PORTS["SSL", "SSL"].client
ssl_bootstrap_server = f"{address}:{ssl_port}"
sasl_port = SECURITY_PROTOCOL_PORTS["SASL_SSL", "SCRAM-SHA-512"].client
Expand All @@ -298,37 +297,49 @@ async def test_truststore_live_reload(ops_test: OpsTest):
await create_test_topic(ops_test, bootstrap_server=sasl_bootstrap_server)

# quickly test the producer and consumer side authentication & authorization
client_config = {
"bootstrap_servers": ssl_bootstrap_server,
"security_protocol": "SSL",
"api_version": (0, 10),
"ssl_cafile": "test_broker_ca",
"ssl_certfile": "test_cert",
"ssl_keyfile": "test_private_key",
"ssl_check_hostname": False,
}

import kafka

producer = kafka.KafkaProducer(
**client_config,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
with (
tempfile.NamedTemporaryFile("w", encoding="utf-8") as cert,
tempfile.NamedTemporaryFile("w", encoding="utf-8") as private_key,
tempfile.NamedTemporaryFile("w", encoding="utf-8") as broker_ca,
tempfile.NamedTemporaryFile("w", encoding="utf-8") as ca_cert,
):
temp_store = {
"cert": cert,
"private_key": private_key,
"broker_ca": broker_ca,
"ca_cert": ca_cert,
}
for key, content in local_store.items():
file_handle = temp_store[key]
file_handle.write(content)

client_config = {
"bootstrap_servers": ssl_bootstrap_server,
"security_protocol": "SSL",
"api_version": (0, 10),
"ssl_cafile": broker_ca.name,
"ssl_certfile": cert.name,
"ssl_keyfile": private_key.name,
"ssl_check_hostname": False,
}

producer = kafka.KafkaProducer(
**client_config,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

producer.send("test", test_msg)
producer.send("test", test_msg)

consumer = kafka.KafkaConsumer("test", **client_config, auto_offset_reset="earliest")
consumer = kafka.KafkaConsumer("test", **client_config, auto_offset_reset="earliest")

msg = next(consumer)
msg = next(consumer)

assert json.loads(msg.value) == test_msg
assert json.loads(msg.value) == test_msg

# cleanup
await ops_test.model.remove_application("other-ca", block_until_done=True)
await ops_test.model.remove_application("other-op", block_until_done=True)
await ops_test.model.remove_application("other-req", block_until_done=True)
for key_ in local_store:
os.remove(f"test_{key_}")


@pytest.mark.abort_on_fail
Expand Down

0 comments on commit a40a2ab

Please sign in to comment.