From 8dd89afb4b1d5c1cbec301a9f685e1a7c002f43f Mon Sep 17 00:00:00 2001 From: Eder Ignatowicz Date: Fri, 13 Dec 2024 09:29:07 -0500 Subject: [PATCH] feat(bff): update bff service filtering to match model registry services by component: model-registry label (#633) Signed-off-by: Eder Ignatowicz --- clients/ui/bff/README.md | 35 ++++++ clients/ui/bff/internal/integrations/k8s.go | 124 +++++++++----------- clients/ui/bff/internal/mocks/k8s_mock.go | 21 +++- 3 files changed, 109 insertions(+), 71 deletions(-) diff --git a/clients/ui/bff/README.md b/clients/ui/bff/README.md index aa92da5b..fa2ad679 100644 --- a/clients/ui/bff/README.md +++ b/clients/ui/bff/README.md @@ -216,3 +216,38 @@ curl -i -H "kubeflow-userid: user@example.com" "http://localhost:4000/api/v1/mod # Get with a page size of 5, order by last update time in descending order. curl -i -H "kubeflow-userid: user@example.com" "http://localhost:4000/api/v1/model_registry/model-registry/registered_models?pageSize=5&orderBy=LAST_UPDATE_TIME&sortOrder=DESC" ``` + + +### FAQ + +#### 1. How do we filter model registry services from other Kubernetes services? + +We filter Model Registry services by using the Kubernetes label `component: model-registry. This label helps distinguish Model Registry services from other services in the cluster. + +For example, in our service manifest, the `component label is defined as follows: +```yaml +# ... +labels: + # ... + component: model-registry +#... +``` +You can view the complete Model Registry service manifest [here](https://github.com/kubeflow/model-registry/blob/main/manifests/kustomize/base/model-registry-service.yaml#L10). + +#### 2. What is the structure of the mock Kubernetes environment? + +The mock Kubernetes environment is activated when the environment variable `MOCK_K8S_CLIENT` is set to `true`. It is based on `env-test` and is designed to simulate a realistic Kubernetes setup for testing. The mock has the following characteristics: + +- **Namespaces**: + - `kubeflow` + - `dora-namespace` + +- **Users**: + - `user@example.com` (has `cluster-admin` privileges) + - `doraNonAdmin@example.com` (restricted to the `dora-namespace`) + +- **Services (Model Registries)**: + - `model-registry`: resides in the `kubeflow` namespace with the label `component: model-registry`. + - `model-registry-dora`: resides in the `dora-namespace` namespace with the label `component: model-registry`. + - `model-registry-bella`: resides in the `kubeflow` namespace with the label `component: model-registry`. + - `non-model-registry`: resides in the `kubeflow` namespace *without* the label `component: model-registry`. \ No newline at end of file diff --git a/clients/ui/bff/internal/integrations/k8s.go b/clients/ui/bff/internal/integrations/k8s.go index 5e3943be..8b6c7170 100644 --- a/clients/ui/bff/internal/integrations/k8s.go +++ b/clients/ui/bff/internal/integrations/k8s.go @@ -8,6 +8,7 @@ import ( corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "log/slog" @@ -18,7 +19,7 @@ import ( "time" ) -const ComponentName = "model-registry-server" +const ComponentLabelValue = "model-registry" type KubernetesClientInterface interface { GetServiceNames() ([]string, error) @@ -150,42 +151,32 @@ func (kc *KubernetesClient) BearerToken() (string, error) { } func (kc *KubernetesClient) GetServiceNames() ([]string, error) { - //TODO (ederign) we should consider and rethinking listing all services on cluster - // what if we have thousand of those? - // we should consider label filtering for instance - - serviceList := &corev1.ServiceList{} - //TODO (ederign) review the context timeout - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) - defer cancel() - - err := kc.ControllerRuntimeClient.List(ctx, serviceList, &client.ListOptions{}) + services, err := kc.GetServiceDetails() if err != nil { - return nil, fmt.Errorf("failed to list services: %w", err) - } - - var serviceNames []string - for _, service := range serviceList.Items { - if value, ok := service.Spec.Selector["component"]; ok && value == ComponentName { - serviceNames = append(serviceNames, service.Name) - } + return nil, err } - if len(serviceNames) == 0 { - return nil, fmt.Errorf("no services found with component: %s", ComponentName) + names := make([]string, 0, len(services)) + for _, svc := range services { + names = append(names, svc.Name) } - return serviceNames, nil + return names, nil } func (kc *KubernetesClient) GetServiceDetails() ([]ServiceDetails, error) { - //TODO (ederign) review the context timeout - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) - defer cancel() // Ensure the context is canceled to free up resources + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() serviceList := &corev1.ServiceList{} - err := kc.ControllerRuntimeClient.List(ctx, serviceList, &client.ListOptions{}) + labelSelector := labels.SelectorFromSet(labels.Set{ + "component": ComponentLabelValue, + }) + + err := kc.ControllerRuntimeClient.List(ctx, serviceList, &client.ListOptions{ + LabelSelector: labelSelector, + }) if err != nil { return nil, fmt.Errorf("failed to list services: %w", err) } @@ -193,52 +184,51 @@ func (kc *KubernetesClient) GetServiceDetails() ([]ServiceDetails, error) { var services []ServiceDetails for _, service := range serviceList.Items { - if svcComponent, exists := service.Spec.Selector["component"]; exists && svcComponent == ComponentName { - var httpPort int32 - hasHTTPPort := false - for _, port := range service.Spec.Ports { - if port.Name == "http-api" { - httpPort = port.Port - hasHTTPPort = true - break - } - } - if !hasHTTPPort { - kc.Logger.Error("service missing HTTP port", "serviceName", service.Name) - continue - } - - if service.Spec.ClusterIP == "" { - kc.Logger.Error("service missing valid ClusterIP", "serviceName", service.Name) - continue + var httpPort int32 + hasHTTPPort := false + for _, port := range service.Spec.Ports { + if port.Name == "http-api" { + httpPort = port.Port + hasHTTPPort = true + break } + } + if !hasHTTPPort { + kc.Logger.Error("service missing HTTP port", "serviceName", service.Name) + continue + } - displayName := "" - description := "" + if service.Spec.ClusterIP == "" { + kc.Logger.Error("service missing valid ClusterIP", "serviceName", service.Name) + continue + } - if service.Annotations != nil { - displayName = service.Annotations["displayName"] - description = service.Annotations["description"] - } + displayName := "" + description := "" - if displayName == "" { - kc.Logger.Warn("service missing displayName annotation", "serviceName", service.Name) - } + if service.Annotations != nil { + displayName = service.Annotations["displayName"] + description = service.Annotations["description"] + } - if description == "" { - kc.Logger.Warn("service missing description annotation", "serviceName", service.Name) - } + if displayName == "" { + kc.Logger.Warn("service missing displayName annotation", "serviceName", service.Name) + } - serviceDetails := ServiceDetails{ - Name: service.Name, - DisplayName: displayName, - Description: description, - ClusterIP: service.Spec.ClusterIP, - HTTPPort: httpPort, - } + if description == "" { + kc.Logger.Warn("service missing description annotation", "serviceName", service.Name) + } - services = append(services, serviceDetails) + serviceDetails := ServiceDetails{ + Name: service.Name, + DisplayName: displayName, + Description: description, + ClusterIP: service.Spec.ClusterIP, + HTTPPort: httpPort, } + + services = append(services, serviceDetails) + } return services, nil @@ -260,6 +250,8 @@ func (kc *KubernetesClient) GetServiceDetailsByName(serviceName string) (Service } func (kc *KubernetesClient) PerformSAR(user string) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() verbs := []string{"get", "list"} resource := "services" @@ -275,7 +267,7 @@ func (kc *KubernetesClient) PerformSAR(user string) (bool, error) { } // Perform the SAR using the native KubernetesNativeClient client - response, err := kc.KubernetesNativeClient.AuthorizationV1().SubjectAccessReviews().Create(context.TODO(), sar, metav1.CreateOptions{}) + response, err := kc.KubernetesNativeClient.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{}) if err != nil { return false, fmt.Errorf("failed to create SubjectAccessReview for verb %q on resource %q: %w", verb, resource, err) } @@ -291,7 +283,7 @@ func (kc *KubernetesClient) PerformSAR(user string) (bool, error) { func (kc *KubernetesClient) IsClusterAdmin(user string) (bool, error) { //using a context here, because checking ClusterRoleBindings could be expensive in large clusters - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() clusterRoleBindings := &rbacv1.ClusterRoleBindingList{} diff --git a/clients/ui/bff/internal/mocks/k8s_mock.go b/clients/ui/bff/internal/mocks/k8s_mock.go index 7d2e86e8..9fcc8a56 100644 --- a/clients/ui/bff/internal/mocks/k8s_mock.go +++ b/clients/ui/bff/internal/mocks/k8s_mock.go @@ -117,6 +117,7 @@ func getProjectRoot() (string, error) { } func setupMock(mockK8sClient client.Client, ctx context.Context) error { + err := createNamespace(mockK8sClient, ctx, "kubeflow") if err != nil { return err @@ -127,15 +128,19 @@ func setupMock(mockK8sClient client.Client, ctx context.Context) error { return err } - err = createService(mockK8sClient, ctx, "model-registry", "kubeflow", "Model Registry", "Model Registry Description", "10.0.0.10") + err = createService(mockK8sClient, ctx, "model-registry", "kubeflow", "Model Registry", "Model Registry Description", "10.0.0.10", "model-registry") + if err != nil { + return err + } + err = createService(mockK8sClient, ctx, "model-registry-dora", "dora-namespace", "Model Registry Dora", "Model Registry Dora description", "10.0.0.11", "model-registry") if err != nil { return err } - err = createService(mockK8sClient, ctx, "model-registry-dora", "dora-namespace", "Model Registry Dora", "Model Registry Dora description", "10.0.0.11") + err = createService(mockK8sClient, ctx, "model-registry-bella", "kubeflow", "Model Registry Bella", "Model Registry Bella description", "10.0.0.12", "model-registry") if err != nil { return err } - err = createService(mockK8sClient, ctx, "model-registry-bella", "kubeflow", "Model Registry Bella", "Model Registry Bella description", "10.0.0.12") + err = createService(mockK8sClient, ctx, "non-model-registry", "kubeflow", "Not a Model Registry", "Not a Model Registry Bella description", "10.0.0.13", "") if err != nil { return err } @@ -183,7 +188,7 @@ func (m *KubernetesClientMock) BearerToken() (string, error) { return "FAKE BEARER TOKEN", nil } -func createService(k8sClient client.Client, ctx context.Context, name string, namespace string, displayName string, description string, clusterIP string) error { +func createService(k8sClient client.Client, ctx context.Context, name string, namespace string, displayName string, description string, clusterIP string, componentLabel string) error { annotations := map[string]string{} @@ -195,15 +200,21 @@ func createService(k8sClient client.Client, ctx context.Context, name string, na annotations["description"] = description } + labels := map[string]string{} + if componentLabel != "" { + labels["component"] = componentLabel + } + service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, Annotations: annotations, + Labels: labels, }, Spec: corev1.ServiceSpec{ Selector: map[string]string{ - "component": k8s.ComponentName, + "component": k8s.ComponentLabelValue, }, Type: corev1.ServiceTypeClusterIP, ClusterIP: clusterIP,