diff --git a/lib/charms/observability_libs/v1/kubernetes_service_patch.py b/lib/charms/observability_libs/v1/kubernetes_service_patch.py new file mode 100644 index 0000000..2cce729 --- /dev/null +++ b/lib/charms/observability_libs/v1/kubernetes_service_patch.py @@ -0,0 +1,341 @@ +# Copyright 2021 Canonical Ltd. +# See LICENSE file for licensing details. + +"""# KubernetesServicePatch Library. + +This library is designed to enable developers to more simply patch the Kubernetes Service created +by Juju during the deployment of a sidecar charm. When sidecar charms are deployed, Juju creates a +service named after the application in the namespace (named after the Juju model). This service by +default contains a "placeholder" port, which is 65536/TCP. + +When modifying the default set of resources managed by Juju, one must consider the lifecycle of the +charm. In this case, any modifications to the default service (created during deployment), will be +overwritten during a charm upgrade. + +When initialised, this library binds a handler to the parent charm's `install` and `upgrade_charm` +events which applies the patch to the cluster. This should ensure that the service ports are +correct throughout the charm's life. + +The constructor simply takes a reference to the parent charm, and a list of +[`lightkube`](https://github.com/gtsystem/lightkube) ServicePorts that each define a port for the +service. For information regarding the `lightkube` `ServicePort` model, please visit the +`lightkube` [docs](https://gtsystem.github.io/lightkube-models/1.23/models/core_v1/#serviceport). + +Optionally, a name of the service (in case service name needs to be patched as well), labels, +selectors, and annotations can be provided as keyword arguments. + +## Getting Started + +To get started using the library, you just need to fetch the library using `charmcraft`. **Note +that you also need to add `lightkube` and `lightkube-models` to your charm's `requirements.txt`.** + +```shell +cd some-charm +charmcraft fetch-lib charms.observability_libs.v1.kubernetes_service_patch +cat << EOF >> requirements.txt +lightkube +lightkube-models +EOF +``` + +Then, to initialise the library: + +For `ClusterIP` services: + +```python +# ... +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + port = ServicePort(443, name=f"{self.app.name}") + self.service_patcher = KubernetesServicePatch(self, [port]) + # ... +``` + +For `LoadBalancer`/`NodePort` services: + +```python +# ... +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + port = ServicePort(443, name=f"{self.app.name}", targetPort=443, nodePort=30666) + self.service_patcher = KubernetesServicePatch( + self, [port], "LoadBalancer" + ) + # ... +``` + +Port protocols can also be specified. Valid protocols are `"TCP"`, `"UDP"`, and `"SCTP"` + +```python +# ... +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + tcp = ServicePort(443, name=f"{self.app.name}-tcp", protocol="TCP") + udp = ServicePort(443, name=f"{self.app.name}-udp", protocol="UDP") + sctp = ServicePort(443, name=f"{self.app.name}-sctp", protocol="SCTP") + self.service_patcher = KubernetesServicePatch(self, [tcp, udp, sctp]) + # ... +``` + +Bound with custom events by providing `refresh_event` argument: +For example, you would like to have a configurable port in your charm and want to apply +service patch every time charm config is changed. + +```python +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + port = ServicePort(int(self.config["charm-config-port"]), name=f"{self.app.name}") + self.service_patcher = KubernetesServicePatch( + self, + [port], + refresh_event=self.on.config_changed + ) + # ... +``` + +Additionally, you may wish to use mocks in your charm's unit testing to ensure that the library +does not try to make any API calls, or open any files during testing that are unlikely to be +present, and could break your tests. The easiest way to do this is during your test `setUp`: + +```python +# ... + +@patch("charm.KubernetesServicePatch", lambda x, y: None) +def setUp(self, *unused): + self.harness = Harness(SomeCharm) + # ... +``` +""" + +import logging +from types import MethodType +from typing import List, Literal, Optional, Union + +from lightkube import ApiError, Client # pyright: ignore +from lightkube.core import exceptions +from lightkube.models.core_v1 import ServicePort, ServiceSpec +from lightkube.models.meta_v1 import ObjectMeta +from lightkube.resources.core_v1 import Service +from lightkube.types import PatchType +from ops.charm import CharmBase +from ops.framework import BoundEvent, Object + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "0042f86d0a874435adef581806cddbbb" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 9 + +ServiceType = Literal["ClusterIP", "LoadBalancer"] + + +class KubernetesServicePatch(Object): + """A utility for patching the Kubernetes service set up by Juju.""" + + def __init__( + self, + charm: CharmBase, + ports: List[ServicePort], + service_name: Optional[str] = None, + service_type: ServiceType = "ClusterIP", + additional_labels: Optional[dict] = None, + additional_selectors: Optional[dict] = None, + additional_annotations: Optional[dict] = None, + *, + refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None, + ): + """Constructor for KubernetesServicePatch. + + Args: + charm: the charm that is instantiating the library. + ports: a list of ServicePorts + service_name: allows setting custom name to the patched service. If none given, + application name will be used. + service_type: desired type of K8s service. Default value is in line with ServiceSpec's + default value. + additional_labels: Labels to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_selectors: Selectors to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_annotations: Annotations to be added to the kubernetes service. + refresh_event: an optional bound event or list of bound events which + will be observed to re-apply the patch (e.g. on port change). + The `install` and `upgrade-charm` events would be observed regardless. + """ + super().__init__(charm, "kubernetes-service-patch") + self.charm = charm + self.service_name = service_name if service_name else self._app + self.service = self._service_object( + ports, + service_name, + service_type, + additional_labels, + additional_selectors, + additional_annotations, + ) + + # Make mypy type checking happy that self._patch is a method + assert isinstance(self._patch, MethodType) + # Ensure this patch is applied during the 'install' and 'upgrade-charm' events + self.framework.observe(charm.on.install, self._patch) + self.framework.observe(charm.on.upgrade_charm, self._patch) + self.framework.observe(charm.on.update_status, self._patch) + + # apply user defined events + if refresh_event: + if not isinstance(refresh_event, list): + refresh_event = [refresh_event] + + for evt in refresh_event: + self.framework.observe(evt, self._patch) + + def _service_object( + self, + ports: List[ServicePort], + service_name: Optional[str] = None, + service_type: ServiceType = "ClusterIP", + additional_labels: Optional[dict] = None, + additional_selectors: Optional[dict] = None, + additional_annotations: Optional[dict] = None, + ) -> Service: + """Creates a valid Service representation. + + Args: + ports: a list of ServicePorts + service_name: allows setting custom name to the patched service. If none given, + application name will be used. + service_type: desired type of K8s service. Default value is in line with ServiceSpec's + default value. + additional_labels: Labels to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_selectors: Selectors to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_annotations: Annotations to be added to the kubernetes service. + + Returns: + Service: A valid representation of a Kubernetes Service with the correct ports. + """ + if not service_name: + service_name = self._app + labels = {"app.kubernetes.io/name": self._app} + if additional_labels: + labels.update(additional_labels) + selector = {"app.kubernetes.io/name": self._app} + if additional_selectors: + selector.update(additional_selectors) + return Service( + apiVersion="v1", + kind="Service", + metadata=ObjectMeta( + namespace=self._namespace, + name=service_name, + labels=labels, + annotations=additional_annotations, # type: ignore[arg-type] + ), + spec=ServiceSpec( + selector=selector, + ports=ports, + type=service_type, + ), + ) + + def _patch(self, _) -> None: + """Patch the Kubernetes service created by Juju to map the correct port. + + Raises: + PatchFailed: if patching fails due to lack of permissions, or otherwise. + """ + try: + client = Client() # pyright: ignore + except exceptions.ConfigError as e: + logger.warning("Error creating k8s client: %s", e) + return + + try: + if self._is_patched(client): + return + if self.service_name != self._app: + self._delete_and_create_service(client) + client.patch(Service, self.service_name, self.service, patch_type=PatchType.MERGE) + except ApiError as e: + if e.status.code == 403: + logger.error("Kubernetes service patch failed: `juju trust` this application.") + else: + logger.error("Kubernetes service patch failed: %s", str(e)) + else: + logger.info("Kubernetes service '%s' patched successfully", self._app) + + def _delete_and_create_service(self, client: Client): + service = client.get(Service, self._app, namespace=self._namespace) + service.metadata.name = self.service_name # type: ignore[attr-defined] + service.metadata.resourceVersion = service.metadata.uid = None # type: ignore[attr-defined] # noqa: E501 + client.delete(Service, self._app, namespace=self._namespace) + client.create(service) + + def is_patched(self) -> bool: + """Reports if the service patch has been applied. + + Returns: + bool: A boolean indicating if the service patch has been applied. + """ + client = Client() # pyright: ignore + return self._is_patched(client) + + def _is_patched(self, client: Client) -> bool: + # Get the relevant service from the cluster + try: + service = client.get(Service, name=self.service_name, namespace=self._namespace) + except ApiError as e: + if e.status.code == 404 and self.service_name != self._app: + return False + logger.error("Kubernetes service get failed: %s", str(e)) + raise + + # Construct a list of expected ports, should the patch be applied + expected_ports = [(p.port, p.targetPort) for p in self.service.spec.ports] # type: ignore[attr-defined] + # Construct a list in the same manner, using the fetched service + fetched_ports = [ + (p.port, p.targetPort) for p in service.spec.ports # type: ignore[attr-defined] + ] # noqa: E501 + return expected_ports == fetched_ports + + @property + def _app(self) -> str: + """Name of the current Juju application. + + Returns: + str: A string containing the name of the current Juju application. + """ + return self.charm.app.name + + @property + def _namespace(self) -> str: + """The Kubernetes namespace we're running in. + + Returns: + str: A string containing the name of the current Kubernetes namespace. + """ + with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f: + return f.read().strip() diff --git a/src/charm.py b/src/charm.py index 8a40643..463ef3a 100755 --- a/src/charm.py +++ b/src/charm.py @@ -9,9 +9,11 @@ from charmed_kubeflow_chisme.kubernetes import KubernetesResourceHandler from charmed_kubeflow_chisme.lightkube.batch import delete_many from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider from lightkube import ApiError from lightkube.generic_resource import load_in_cluster_generic_resources +from lightkube.models.core_v1 import ServicePort from ops.charm import CharmBase from ops.main import main from ops.model import ActiveStatus, MaintenanceStatus, WaitingStatus @@ -36,6 +38,12 @@ def __init__(self, *args): super().__init__(*args) self.logger = logging.getLogger(__name__) + metrics_port = ServicePort(int(METRICS_PORT), name="metrics-port") + self.service_patcher = KubernetesServicePatch( + self, + [metrics_port], + service_name=f"{self.model.app.name}", + ) self.prometheus_provider = MetricsEndpointProvider( charm=self, diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 4fde947..af0b12e 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -191,6 +191,12 @@ async def test_prometheus_grafana_integration(ops_test: OpsTest): logger.info(f"Response status is {response_status}") assert response_status == "success" + # Assert the unit is available by checking the query result + # The data is presented as a list [1707357912.349, '1'], where the + # first value is a timestamp and the second value is the state of the unit + # 1 means available, 0 means unavailable + assert response["data"]["result"][0]["value"][1] == "1" + response_metric = response["data"]["result"][0]["metric"] assert response_metric["juju_application"] == APP_NAME assert response_metric["juju_model"] == ops_test.model_name diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 4f28b69..69db39a 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -53,6 +53,7 @@ class TestCharm: @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) def test_not_leader( self, _: MagicMock, # k8s_resource_handler @@ -66,6 +67,7 @@ def test_not_leader( @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) def test_no_relation( self, _: MagicMock, # k8s_resource_handler @@ -89,6 +91,7 @@ def test_no_relation( @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) def test_pebble_layer( self, _: MagicMock, # k8s_resource_handler @@ -111,6 +114,7 @@ def test_pebble_layer( @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) def test_apply_k8s_resources_success( self, k8s_resource_handler: MagicMock, @@ -124,6 +128,7 @@ def test_apply_k8s_resources_success( k8s_resource_handler.apply.assert_called() assert isinstance(harness.charm.model.unit.status, MaintenanceStatus) + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") @patch("charm.ApiError", _FakeApiError) @@ -146,6 +151,7 @@ def test_blocked_on_appierror_on_k8s_resource_handler( ), ) + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") @patch("charm.ApiError", _FakeApiError) @@ -171,6 +177,7 @@ def test_blocked_on_appierror_on_crd_resource_handler( ), ) + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") @patch("charm.delete_many") @@ -187,6 +194,7 @@ def test_on_remove_success( crd_resource_handler.assert_has_calls([call.render_manifests()]) delete_many.assert_called() + @patch("charm.KubernetesServicePatch", lambda *_, **__: None) @patch("charm.TrainingOperatorCharm.k8s_resource_handler") @patch("charm.TrainingOperatorCharm.crd_resource_handler") @patch("charm.delete_many")