Skip to content

Commit

Permalink
tests: Migrate units tests to scenario V7
Browse files Browse the repository at this point in the history
  • Loading branch information
Batalex committed Oct 14, 2024
1 parent 168291c commit 89bc50f
Show file tree
Hide file tree
Showing 14 changed files with 1,514 additions and 1,206 deletions.
22 changes: 13 additions & 9 deletions src/events/balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def __init__(self, charm) -> None:
config=self.charm.config,
)

# Before fast exit to avoid silently ignoring the action
self.framework.observe(getattr(self.charm.on, "rebalance_action"), self.rebalance)

# Fast exit after workload instantiation, but before any event observer
if BALANCER.value not in self.charm.config.roles or not self.charm.unit.is_leader():
return
Expand All @@ -82,8 +85,6 @@ def __init__(self, charm) -> None:
self.framework.observe(self.charm.on.update_status, self._on_config_changed)
self.framework.observe(self.charm.on.config_changed, self._on_config_changed)

self.framework.observe(getattr(self.charm.on, "rebalance_action"), self.rebalance)

def _on_install(self, event: InstallEvent) -> None:
"""Handler for `install` event."""
if not self.workload.container_can_connect:
Expand Down Expand Up @@ -208,33 +209,36 @@ def rebalance(self, event: ActionEvent) -> None:
available_brokers = [int(broker.split("/")[1]) for broker in brokers]

failure_conditions = [
(not self.charm.unit.is_leader(), "Action must be ran on the application leader"),
(
not self.balancer_manager.cruise_control.monitoring,
lambda: not self.charm.unit.is_leader(),
"Action must be ran on the application leader",
),
(
lambda: not self.balancer_manager.cruise_control.monitoring,
"CruiseControl balancer service is not yet ready",
),
(
self.balancer_manager.cruise_control.executing,
lambda: self.balancer_manager.cruise_control.executing,
"CruiseControl balancer service is currently executing a task, please try again later",
),
(
not self.balancer_manager.cruise_control.ready,
lambda: not self.balancer_manager.cruise_control.ready,
"CruiseControl balancer service has not yet collected enough data to provide a partition reallocation proposal",
),
(
event.params["mode"] in (MODE_ADD, MODE_REMOVE)
lambda: event.params["mode"] in (MODE_ADD, MODE_REMOVE)
and event.params.get("brokerid", None) is None,
"'add' and 'remove' rebalance action require passing the 'brokerid' parameter",
),
(
event.params["mode"] in (MODE_ADD, MODE_REMOVE)
lambda: event.params["mode"] in (MODE_ADD, MODE_REMOVE)
and event.params.get("brokerid") not in available_brokers,
"invalid brokerid",
),
]

for check, msg in failure_conditions:
if check:
if check():
logging.error(msg)
event.set_results({"error": msg})
event.fail(msg)
Expand Down
2 changes: 0 additions & 2 deletions src/events/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
RelationJoinedEvent,
)
from ops.framework import Object
from ops.model import ActiveStatus

from literals import TLS_RELATION, TRUSTED_CA_RELATION, TRUSTED_CERTIFICATE_RELATION, Status

Expand Down Expand Up @@ -138,7 +137,6 @@ def _trusted_relation_created(self, event: EventBase) -> None:

# Create a "mtls" flag so a new listener (CLIENT_SSL) is created
self.charm.state.cluster.update({"mtls": "enabled"})
self.charm.app.status = ActiveStatus()

def _trusted_relation_joined(self, event: RelationJoinedEvent) -> None:
"""Generate a CSR so the tls-certificates operator works as expected."""
Expand Down
126 changes: 87 additions & 39 deletions src/managers/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

"""Manager for handling Kafka Kubernetes resources for a single Kafka pod."""

import json
import logging
from functools import cached_property
import math
import time
from functools import cache

from lightkube.core.client import Client
from lightkube.core.exceptions import ApiError
Expand All @@ -15,13 +18,12 @@

from literals import SECURITY_PROTOCOL_PORTS, AuthMap, AuthMechanism

logger = logging.getLogger(__name__)

# default logging from lightkube httpx requests is very noisy
logging.getLogger("lightkube").disabled = True
logging.getLogger("lightkube.core.client").disabled = True
logging.getLogger("httpx").disabled = True
logging.getLogger("httpcore").disabled = True
logging.getLogger("lightkube").setLevel(logging.CRITICAL)
logging.getLogger("httpx").setLevel(logging.CRITICAL)
logging.getLogger("httpcore").setLevel(logging.CRITICAL)

logger = logging.getLogger(__name__)


class K8sManager:
Expand All @@ -42,54 +44,57 @@ def __init__(
"SSL": "ssl",
}

@cached_property
def __eq__(self, other: object) -> bool:
"""__eq__ dunder.
Needed to get an cache hit on calls on the same method from different instances of K8sManager
as `self` is passed to methods.
"""
return isinstance(other, K8sManager) and self.__dict__ == other.__dict__

def __hash__(self) -> int:
"""__hash__ dunder.
K8sManager needs to be hashable so that `self` can be passed to the 'dict-like' cache.
"""
return hash(json.dumps(self.__dict__, sort_keys=True))

@property
def client(self) -> Client:
"""The Lightkube client."""
return Client( # pyright: ignore[reportArgumentType]
field_manager=self.pod_name,
namespace=self.namespace,
)

@staticmethod
def get_ttl_hash(seconds=60 * 2) -> int:
"""Gets a unique time hash for the cache, expiring after 2 minutes.
When 2m has passed, a new value will be created, ensuring an cache miss
and a re-loading of that K8s API call.
"""
return math.floor(time.time() / seconds)

# --- GETTERS ---

def get_pod(self, pod_name: str = "") -> Pod:
"""Gets the Pod via the K8s API."""
# Allows us to get pods from other peer units
pod_name = pod_name or self.pod_name

return self.client.get(
res=Pod,
name=self.pod_name,
)
return self._get_pod(pod_name, self.get_ttl_hash())

def get_node(self, pod: Pod) -> Node:
def get_node(self, pod_name: str) -> Node:
"""Gets the Node the Pod is running on via the K8s API."""
if not pod.spec or not pod.spec.nodeName:
raise Exception("Could not find podSpec or nodeName")

return self.client.get(
Node,
name=pod.spec.nodeName,
)

def get_node_ip(self, node: Node) -> str:
"""Gets the IP Address of the Node via the K8s API."""
# all these redundant checks are because Lightkube's typing is awful
if not node.status or not node.status.addresses:
raise Exception(f"No status found for {node}")
return self._get_node(pod_name, self.get_ttl_hash())

for addresses in node.status.addresses:
if addresses.type in ["ExternalIP", "InternalIP", "Hostname"]:
return addresses.address

return ""
def get_node_ip(self, pod_name: str) -> str:
"""Gets the IP Address of the Node of a given Pod via the K8s API."""
return self._get_node_ip(pod_name, self.get_ttl_hash())

def get_service(self, service_name: str) -> Service | None:
"""Gets the Service via the K8s API."""
return self.client.get(
res=Service,
name=service_name,
)
return self._get_service(service_name, self.get_ttl_hash())

# SERVICE BUILDERS

def get_node_port(
self,
Expand Down Expand Up @@ -139,7 +144,7 @@ def get_bootstrap_nodeport(self, auth_map: AuthMap) -> int:

def build_bootstrap_services(self) -> Service:
"""Builds a ClusterIP service for initial client connection."""
pod = self.get_pod(pod_name=self.pod_name)
pod = self.get_pod(self.pod_name)
if not pod.metadata:
raise Exception(f"Could not find metadata for {pod}")

Expand Down Expand Up @@ -231,3 +236,46 @@ def apply_service(self, service: Service) -> None:
return
else:
raise

# PRIVATE METHODS

@cache
def _get_pod(self, pod_name: str = "", *_) -> Pod:
# Allows us to get pods from other peer units
pod_name = pod_name or self.pod_name

return self.client.get(
res=Pod,
name=pod_name,
)

@cache
def _get_node(self, pod_name: str, *_) -> Node:
pod = self.get_pod(pod_name)
if not pod.spec or not pod.spec.nodeName:
raise Exception("Could not find podSpec or nodeName")

return self.client.get(
Node,
name=pod.spec.nodeName,
)

@cache
def _get_node_ip(self, pod_name: str, *_) -> str:
# all these redundant checks are because Lightkube's typing is awful
node = self.get_node(pod_name)
if not node.status or not node.status.addresses:
raise Exception(f"No status found for {node}")

for addresses in node.status.addresses:
if addresses.type in ["ExternalIP", "InternalIP", "Hostname"]:
return addresses.address

return ""

@cache
def _get_service(self, service_name: str, *_) -> Service | None:
return self.client.get(
res=Service,
name=service_name,
)
12 changes: 12 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,15 @@ def patched_node_ip():
yield patched_node_ip
else:
yield


@pytest.fixture(autouse=True)
def patched_node_port():
if SUBSTRATE == "k8s":
with patch(
"managers.k8s.K8sManager.get_listener_nodeport",
return_value=20000,
) as patched_node_port:
yield patched_node_port
else:
yield
Loading

0 comments on commit 89bc50f

Please sign in to comment.