Skip to content

Commit

Permalink
feat(bff): update bff service filtering to match model registry servi…
Browse files Browse the repository at this point in the history
…ces by component: model-registry label (kubeflow#633)

Signed-off-by: Eder Ignatowicz <[email protected]>
  • Loading branch information
ederign authored Dec 13, 2024
1 parent 2fb4d26 commit 8dd89af
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 71 deletions.
35 changes: 35 additions & 0 deletions clients/ui/bff/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,38 @@ curl -i -H "kubeflow-userid: [email protected]" "http://localhost:4000/api/v1/mod
# Get with a page size of 5, order by last update time in descending order.
curl -i -H "kubeflow-userid: [email protected]" "http://localhost:4000/api/v1/model_registry/model-registry/registered_models?pageSize=5&orderBy=LAST_UPDATE_TIME&sortOrder=DESC"
```


### FAQ

#### 1. How do we filter model registry services from other Kubernetes services?

We filter Model Registry services by using the Kubernetes label `component: model-registry. This label helps distinguish Model Registry services from other services in the cluster.

For example, in our service manifest, the `component label is defined as follows:
```yaml
# ...
labels:
# ...
component: model-registry
#...
```
You can view the complete Model Registry service manifest [here](https://github.com/kubeflow/model-registry/blob/main/manifests/kustomize/base/model-registry-service.yaml#L10).

#### 2. What is the structure of the mock Kubernetes environment?

The mock Kubernetes environment is activated when the environment variable `MOCK_K8S_CLIENT` is set to `true`. It is based on `env-test` and is designed to simulate a realistic Kubernetes setup for testing. The mock has the following characteristics:

- **Namespaces**:
- `kubeflow`
- `dora-namespace`

- **Users**:
- `[email protected]` (has `cluster-admin` privileges)
- `[email protected]` (restricted to the `dora-namespace`)

- **Services (Model Registries)**:
- `model-registry`: resides in the `kubeflow` namespace with the label `component: model-registry`.
- `model-registry-dora`: resides in the `dora-namespace` namespace with the label `component: model-registry`.
- `model-registry-bella`: resides in the `kubeflow` namespace with the label `component: model-registry`.
- `non-model-registry`: resides in the `kubeflow` namespace *without* the label `component: model-registry`.
124 changes: 58 additions & 66 deletions clients/ui/bff/internal/integrations/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"log/slog"
Expand All @@ -18,7 +19,7 @@ import (
"time"
)

const ComponentName = "model-registry-server"
const ComponentLabelValue = "model-registry"

type KubernetesClientInterface interface {
GetServiceNames() ([]string, error)
Expand Down Expand Up @@ -150,95 +151,84 @@ func (kc *KubernetesClient) BearerToken() (string, error) {
}

func (kc *KubernetesClient) GetServiceNames() ([]string, error) {
//TODO (ederign) we should consider and rethinking listing all services on cluster
// what if we have thousand of those?
// we should consider label filtering for instance

serviceList := &corev1.ServiceList{}
//TODO (ederign) review the context timeout
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()

err := kc.ControllerRuntimeClient.List(ctx, serviceList, &client.ListOptions{})
services, err := kc.GetServiceDetails()
if err != nil {
return nil, fmt.Errorf("failed to list services: %w", err)
}

var serviceNames []string
for _, service := range serviceList.Items {
if value, ok := service.Spec.Selector["component"]; ok && value == ComponentName {
serviceNames = append(serviceNames, service.Name)
}
return nil, err
}

if len(serviceNames) == 0 {
return nil, fmt.Errorf("no services found with component: %s", ComponentName)
names := make([]string, 0, len(services))
for _, svc := range services {
names = append(names, svc.Name)
}

return serviceNames, nil
return names, nil
}

func (kc *KubernetesClient) GetServiceDetails() ([]ServiceDetails, error) {
//TODO (ederign) review the context timeout
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel() // Ensure the context is canceled to free up resources
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

serviceList := &corev1.ServiceList{}

err := kc.ControllerRuntimeClient.List(ctx, serviceList, &client.ListOptions{})
labelSelector := labels.SelectorFromSet(labels.Set{
"component": ComponentLabelValue,
})

err := kc.ControllerRuntimeClient.List(ctx, serviceList, &client.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return nil, fmt.Errorf("failed to list services: %w", err)
}

var services []ServiceDetails

for _, service := range serviceList.Items {
if svcComponent, exists := service.Spec.Selector["component"]; exists && svcComponent == ComponentName {
var httpPort int32
hasHTTPPort := false
for _, port := range service.Spec.Ports {
if port.Name == "http-api" {
httpPort = port.Port
hasHTTPPort = true
break
}
}
if !hasHTTPPort {
kc.Logger.Error("service missing HTTP port", "serviceName", service.Name)
continue
}

if service.Spec.ClusterIP == "" {
kc.Logger.Error("service missing valid ClusterIP", "serviceName", service.Name)
continue
var httpPort int32
hasHTTPPort := false
for _, port := range service.Spec.Ports {
if port.Name == "http-api" {
httpPort = port.Port
hasHTTPPort = true
break
}
}
if !hasHTTPPort {
kc.Logger.Error("service missing HTTP port", "serviceName", service.Name)
continue
}

displayName := ""
description := ""
if service.Spec.ClusterIP == "" {
kc.Logger.Error("service missing valid ClusterIP", "serviceName", service.Name)
continue
}

if service.Annotations != nil {
displayName = service.Annotations["displayName"]
description = service.Annotations["description"]
}
displayName := ""
description := ""

if displayName == "" {
kc.Logger.Warn("service missing displayName annotation", "serviceName", service.Name)
}
if service.Annotations != nil {
displayName = service.Annotations["displayName"]
description = service.Annotations["description"]
}

if description == "" {
kc.Logger.Warn("service missing description annotation", "serviceName", service.Name)
}
if displayName == "" {
kc.Logger.Warn("service missing displayName annotation", "serviceName", service.Name)
}

serviceDetails := ServiceDetails{
Name: service.Name,
DisplayName: displayName,
Description: description,
ClusterIP: service.Spec.ClusterIP,
HTTPPort: httpPort,
}
if description == "" {
kc.Logger.Warn("service missing description annotation", "serviceName", service.Name)
}

services = append(services, serviceDetails)
serviceDetails := ServiceDetails{
Name: service.Name,
DisplayName: displayName,
Description: description,
ClusterIP: service.Spec.ClusterIP,
HTTPPort: httpPort,
}

services = append(services, serviceDetails)

}

return services, nil
Expand All @@ -260,6 +250,8 @@ func (kc *KubernetesClient) GetServiceDetailsByName(serviceName string) (Service
}

func (kc *KubernetesClient) PerformSAR(user string) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
verbs := []string{"get", "list"}
resource := "services"

Expand All @@ -275,7 +267,7 @@ func (kc *KubernetesClient) PerformSAR(user string) (bool, error) {
}

// Perform the SAR using the native KubernetesNativeClient client
response, err := kc.KubernetesNativeClient.AuthorizationV1().SubjectAccessReviews().Create(context.TODO(), sar, metav1.CreateOptions{})
response, err := kc.KubernetesNativeClient.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, fmt.Errorf("failed to create SubjectAccessReview for verb %q on resource %q: %w", verb, resource, err)
}
Expand All @@ -291,7 +283,7 @@ func (kc *KubernetesClient) PerformSAR(user string) (bool, error) {

func (kc *KubernetesClient) IsClusterAdmin(user string) (bool, error) {
//using a context here, because checking ClusterRoleBindings could be expensive in large clusters
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

clusterRoleBindings := &rbacv1.ClusterRoleBindingList{}
Expand Down
21 changes: 16 additions & 5 deletions clients/ui/bff/internal/mocks/k8s_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func getProjectRoot() (string, error) {
}

func setupMock(mockK8sClient client.Client, ctx context.Context) error {

err := createNamespace(mockK8sClient, ctx, "kubeflow")
if err != nil {
return err
Expand All @@ -127,15 +128,19 @@ func setupMock(mockK8sClient client.Client, ctx context.Context) error {
return err
}

err = createService(mockK8sClient, ctx, "model-registry", "kubeflow", "Model Registry", "Model Registry Description", "10.0.0.10")
err = createService(mockK8sClient, ctx, "model-registry", "kubeflow", "Model Registry", "Model Registry Description", "10.0.0.10", "model-registry")
if err != nil {
return err
}
err = createService(mockK8sClient, ctx, "model-registry-dora", "dora-namespace", "Model Registry Dora", "Model Registry Dora description", "10.0.0.11", "model-registry")
if err != nil {
return err
}
err = createService(mockK8sClient, ctx, "model-registry-dora", "dora-namespace", "Model Registry Dora", "Model Registry Dora description", "10.0.0.11")
err = createService(mockK8sClient, ctx, "model-registry-bella", "kubeflow", "Model Registry Bella", "Model Registry Bella description", "10.0.0.12", "model-registry")
if err != nil {
return err
}
err = createService(mockK8sClient, ctx, "model-registry-bella", "kubeflow", "Model Registry Bella", "Model Registry Bella description", "10.0.0.12")
err = createService(mockK8sClient, ctx, "non-model-registry", "kubeflow", "Not a Model Registry", "Not a Model Registry Bella description", "10.0.0.13", "")
if err != nil {
return err
}
Expand Down Expand Up @@ -183,7 +188,7 @@ func (m *KubernetesClientMock) BearerToken() (string, error) {
return "FAKE BEARER TOKEN", nil
}

func createService(k8sClient client.Client, ctx context.Context, name string, namespace string, displayName string, description string, clusterIP string) error {
func createService(k8sClient client.Client, ctx context.Context, name string, namespace string, displayName string, description string, clusterIP string, componentLabel string) error {

annotations := map[string]string{}

Expand All @@ -195,15 +200,21 @@ func createService(k8sClient client.Client, ctx context.Context, name string, na
annotations["description"] = description
}

labels := map[string]string{}
if componentLabel != "" {
labels["component"] = componentLabel
}

service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Annotations: annotations,
Labels: labels,
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"component": k8s.ComponentName,
"component": k8s.ComponentLabelValue,
},
Type: corev1.ServiceTypeClusterIP,
ClusterIP: clusterIP,
Expand Down

0 comments on commit 8dd89af

Please sign in to comment.