Skip to content

Commit

Permalink
Add call to reconcile VirtualService
Browse files Browse the repository at this point in the history
Signed-off-by: Matheus Cruz <[email protected]>
  • Loading branch information
mcruzdev committed Sep 14, 2024
1 parent bc4e445 commit e235316
Show file tree
Hide file tree
Showing 7 changed files with 16,795 additions and 32 deletions.
2 changes: 1 addition & 1 deletion workspaces/controller/internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var _ = BeforeSuite(func() {

By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases"), filepath.Join("..", "..", "test", "crd")},
ErrorIfCRDPathMissing: true,

// The BinaryAssetsDirectory is only required if you want to run the tests directly without call the makefile target test.
Expand Down
46 changes: 18 additions & 28 deletions workspaces/controller/internal/controller/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"github.com/kubeflow/notebooks/workspaces/controller/internal/istio"
"reflect"
"strings"

Expand Down Expand Up @@ -53,8 +54,6 @@ const (
workspaceSelectorLabel = "statefulset"

// lengths for resource names
generateNameSuffixLength = 6
maxServiceNameLength = 63
maxStatefulSetNameLength = 52 // https://github.com/kubernetes/kubernetes/issues/64023

// state message formats for Workspace status
Expand Down Expand Up @@ -346,6 +345,19 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// TODO: reconcile the Istio VirtualService to expose the Workspace
// and implement the `spec.podTemplate.httpProxy` options
//
virtualService, err := istio.GenerateIstioVirtualService(workspace, workspaceKind, currentImageConfig, serviceName, log)
if err != nil {
log.Error(err, "unable to generate Istio Virtual Service")
}
log.Info(fmt.Sprintf("VirtualService %s", virtualService))

if err := ctrl.SetControllerReference(workspace, virtualService, r.Scheme); err != nil {
return ctrl.Result{}, err
}

if err := istio.ReconcileVirtualService(ctx, r.Client, virtualService.GetName(), virtualService.GetNamespace(), virtualService, log); err != nil {
return ctrl.Result{}, err
}

// fetch Pod
// NOTE: the first StatefulSet Pod is always called "{statefulSetName}-0"
Expand Down Expand Up @@ -555,25 +567,10 @@ func getPodConfig(workspace *kubefloworgv1beta1.Workspace, workspaceKind *kubefl
}
}

// generateNamePrefix generates a name prefix for a Workspace
// the format is "ws-{WORKSPACE_NAME}-" the workspace name is truncated to fit within the max length
func generateNamePrefix(workspaceName string, maxLength int) string {
namePrefix := fmt.Sprintf("ws-%s", workspaceName)
maxLength = maxLength - generateNameSuffixLength // subtract 6 for the `metadata.generateName` suffix
maxLength = maxLength - 1 // subtract 1 for the trailing "-"
if len(namePrefix) > maxLength {
namePrefix = namePrefix[:min(len(namePrefix), maxLength)]
}
if namePrefix[len(namePrefix)-1] != '-' {
namePrefix = namePrefix + "-"
}
return namePrefix
}

// generateStatefulSet generates a StatefulSet for a Workspace
func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind *kubefloworgv1beta1.WorkspaceKind, imageConfigSpec kubefloworgv1beta1.ImageConfigSpec, podConfigSpec kubefloworgv1beta1.PodConfigSpec) (*appsv1.StatefulSet, error) {
// generate name prefix
namePrefix := generateNamePrefix(workspace.Name, maxStatefulSetNameLength)
namePrefix := helper.GenerateNamePrefix(workspace.Name, maxStatefulSetNameLength)

// generate replica count
replicas := int32(1)
Expand All @@ -594,14 +591,7 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
// define go string template functions
// NOTE: these are used in places like the `extraEnv` values
containerPortsIdMap := make(map[string]kubefloworgv1beta1.ImagePort)
httpPathPrefixFunc := func(portId string) string {
port, ok := containerPortsIdMap[portId]
if ok {
return fmt.Sprintf("/workspace/%s/%s/%s/", workspace.Namespace, workspace.Name, port.Id)
} else {
return ""
}
}
httpPathPrefixFunc := helper.GenerateHttpPathPrefixFunc(workspace, containerPortsIdMap)

// generate container ports
containerPorts := make([]corev1.ContainerPort, len(imageConfigSpec.Ports))
Expand All @@ -627,7 +617,7 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
env := env.DeepCopy() // copy to avoid modifying the original
if env.Value != "" {
rawValue := env.Value
outValue, err := helper.RenderExtraEnvValueTemplate(rawValue, httpPathPrefixFunc)
outValue, err := helper.RenderValueUsingFunc(rawValue, httpPathPrefixFunc)
if err != nil {
return nil, fmt.Errorf("failed to render extraEnv %q: %w", env.Name, err)
}
Expand Down Expand Up @@ -806,7 +796,7 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
// generateService generates a Service for a Workspace
func generateService(workspace *kubefloworgv1beta1.Workspace, imageConfigSpec kubefloworgv1beta1.ImageConfigSpec) (*corev1.Service, error) {
// generate name prefix
namePrefix := generateNamePrefix(workspace.Name, maxServiceNameLength)
namePrefix := helper.GenerateNamePrefix(workspace.Name, helper.MaxServiceNameLength)

// generate service ports
servicePorts := make([]corev1.ServicePort, len(imageConfigSpec.Ports))
Expand Down
68 changes: 68 additions & 0 deletions workspaces/controller/internal/helper/helper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package helper

import (
"fmt"
"os"
"reflect"

kubefloworgv1beta1 "github.com/kubeflow/notebooks/workspaces/controller/api/v1beta1"
Expand All @@ -10,6 +12,11 @@ import (
corev1 "k8s.io/api/core/v1"
)

const (
GenerateNameSuffixLength = 6
MaxServiceNameLength = 63
)

// CopyStatefulSetFields updates a target StatefulSet with the fields from a desired StatefulSet, returning true if an update is required.
func CopyStatefulSetFields(desired *appsv1.StatefulSet, target *appsv1.StatefulSet) bool {
requireUpdate := false
Expand Down Expand Up @@ -166,3 +173,64 @@ func NormalizePodConfigSpec(spec kubefloworgv1beta1.PodConfigSpec) error {

return nil
}

// GenerateNamePrefix generates a name prefix for a Workspace
// the format is "ws-{WORKSPACE_NAME}-" the workspace name is truncated to fit within the max length
func GenerateNamePrefix(workspaceName string, maxLength int) string {
namePrefix := fmt.Sprintf("ws-%s", workspaceName)
maxLength = maxLength - GenerateNameSuffixLength // subtract 6 for the `metadata.generateName` suffix
maxLength = maxLength - 1 // subtract 1 for the trailing "-"
if len(namePrefix) > maxLength {
namePrefix = namePrefix[:min(len(namePrefix), maxLength)]
}
if namePrefix[len(namePrefix)-1] != '-' {
namePrefix = namePrefix + "-"
}
return namePrefix
}

func RemoveTrailingDash(s string) string {
if len(s) > 0 && s[len(s)-1] == '-' {
return s[:len(s)-1]
}
return s
}

func GetEnvOrDefault(name, defaultValue string) string {
if lookupEnv, exists := os.LookupEnv(name); exists {
return lookupEnv
} else {
return defaultValue
}
}

func GenerateHttpPathPrefixFunc(workspace *kubefloworgv1beta1.Workspace, containerPortsIdMap map[string]kubefloworgv1beta1.ImagePort) func(portId string) string {
return func(portId string) string {
port, ok := containerPortsIdMap[portId]
if ok {
return fmt.Sprintf("/workspace/%s/%s/%s/", workspace.Namespace, workspace.Name, port.Id)
} else {
return ""
}
}
}

func GenerateContainerPortsIdMap(imageConfig *kubefloworgv1beta1.ImageConfigValue) (map[string]kubefloworgv1beta1.ImagePort, error) {
containerPortsIdMap := make(map[string]kubefloworgv1beta1.ImagePort)

containerPorts := make([]corev1.ContainerPort, len(imageConfig.Spec.Ports))
seenPorts := make(map[int32]bool)
for i, port := range imageConfig.Spec.Ports {
if seenPorts[port.Port] {
return nil, fmt.Errorf("duplicate port number %d in imageConfig", port.Port)
}
containerPorts[i] = corev1.ContainerPort{
Name: fmt.Sprintf("http-%d", port.Port),
ContainerPort: port.Port,
Protocol: corev1.ProtocolTCP,
}
seenPorts[port.Port] = true
containerPortsIdMap[port.Id] = port
}
return containerPortsIdMap, nil
}
25 changes: 23 additions & 2 deletions workspaces/controller/internal/helper/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"text/template"
)

// RenderExtraEnvValueTemplate renders a single WorkspaceKind `spec.podTemplate.extraEnv[].value` string template
func RenderExtraEnvValueTemplate(rawValue string, httpPathPrefixFunc func(string) string) (string, error) {
// RenderValueUsingFunc renders a single WorkspaceKind `spec.podTemplate.extraEnv[].value` string template
func RenderValueUsingFunc(rawValue string, httpPathPrefixFunc func(string) string) (string, error) {

// Parse the raw value as a template
tmpl, err := template.New("value").
Expand All @@ -28,3 +28,24 @@ func RenderExtraEnvValueTemplate(rawValue string, httpPathPrefixFunc func(string

return buf.String(), nil
}

func TemplateHeaders(requestHeaders map[string]string, httpPathPrefixFunc func(portId string) string) map[string]string {

if len(requestHeaders) == 0 {
return make(map[string]string, 0)
}

headers := make(map[string]string, len(requestHeaders))
for _, header := range requestHeaders {
value := headers[header]
if value != "" {
out, err := RenderValueUsingFunc(header, httpPathPrefixFunc)
if err != nil {
return make(map[string]string, 0)
}
value = out
}
headers[header] = value
}
return headers
}
159 changes: 159 additions & 0 deletions workspaces/controller/internal/istio/istio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package istio

import (
"context"
"fmt"
"github.com/go-logr/logr"
kubefloworgv1beta1 "github.com/kubeflow/notebooks/workspaces/controller/api/v1beta1"
"github.com/kubeflow/notebooks/workspaces/controller/internal/helper"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
ApiVersionIstio = "networking.istio.io/v1"
VirtualServiceKind = "VirtualService"

EnvIstioHost = "ISTIO_HOST"
EnvIstioGateway = "ISTIO_GATEWAY"
ClusterDomain = "CLUSTER_DOMAIN"
)

func GenerateIstioVirtualService(workspace *kubefloworgv1beta1.Workspace, workspaceKind *kubefloworgv1beta1.WorkspaceKind, imageConfig *kubefloworgv1beta1.ImageConfigValue, serviceName string, _ logr.Logger) (*unstructured.Unstructured, error) {

virtualService := &unstructured.Unstructured{}
virtualService.SetAPIVersion(ApiVersionIstio)
virtualService.SetKind(VirtualServiceKind)

prefix := helper.GenerateNamePrefix(workspace.Name, helper.MaxServiceNameLength)
virtualService.SetName(helper.RemoveTrailingDash(prefix))
virtualService.SetNamespace(workspace.Namespace)

// .spec.gateways
istioGateway := helper.GetEnvOrDefault(EnvIstioGateway, "kubeflow/kubeflow-gateway")
if err := unstructured.SetNestedStringSlice(virtualService.Object, []string{istioGateway},
"spec", "gateways"); err != nil {
return nil, fmt.Errorf("set .spec.gateways error: %v", err)
}

istioHost := helper.GetEnvOrDefault(EnvIstioHost, "*")
if err := unstructured.SetNestedStringSlice(virtualService.Object, []string{istioHost},
"spec", "gateways"); err != nil {
return nil, fmt.Errorf("set .spec.hosts error: %v", err)
}

var prefixes []string
for _, imagePort := range imageConfig.Spec.Ports {
prefix := fmt.Sprintf("/workspace/%s/%s/%s", workspace.Namespace, workspace.Name, imagePort.Id)
prefixes = append(prefixes, prefix)
}

var httpRoutes []interface{}

host := fmt.Sprintf("%s.%s.svc.%s", serviceName, workspace.Namespace, helper.GetEnvOrDefault(ClusterDomain, "cluster.local"))

// generate container ports
// TODO: It can be better
containerPortsIdMap, err := helper.GenerateContainerPortsIdMap(imageConfig)
if errContainerPorts := unstructured.SetNestedStringSlice(virtualService.Object, []string{istioHost},
"spec", "gateways"); err != nil {
return nil, fmt.Errorf("set .spec.hosts error: %v", errContainerPorts)
}
httpPathPrefixFunc := helper.GenerateHttpPathPrefixFunc(workspace, containerPortsIdMap)

for _, imagePort := range imageConfig.Spec.Ports {

httpRoute := map[string]interface{}{
"match": []map[string]interface{}{
{
"uri": map[string]interface{}{
"prefix": fmt.Sprintf("/workspace/%s/%s/%s", workspace.Namespace, workspace.Name, imagePort.Id),
},
},
},
"route": []map[string]interface{}{
{
"destination": map[string]interface{}{
"host": host,
"port": map[string]interface{}{
"number": imagePort.Port,
},
},
},
},
}

if *workspaceKind.Spec.PodTemplate.HTTPProxy.RemovePathPrefix {
httpRoute["rewrite"] = map[string]interface{}{"uri": "/"}
}

// templating.spec.http[].math.headers
setHeaders := helper.TemplateHeaders(workspaceKind.Spec.PodTemplate.HTTPProxy.RequestHeaders.Set, httpPathPrefixFunc)
addHeaders := helper.TemplateHeaders(workspaceKind.Spec.PodTemplate.HTTPProxy.RequestHeaders.Add, httpPathPrefixFunc)

removeHeaders := make([]string, len(workspaceKind.Spec.PodTemplate.HTTPProxy.RequestHeaders.Remove))
for i, header := range workspaceKind.Spec.PodTemplate.HTTPProxy.RequestHeaders.Remove {
if header != "" {
out, err := helper.RenderValueUsingFunc(header, httpPathPrefixFunc)
if err != nil {
return nil, fmt.Errorf("failed to render header %q: %w", header, err)
}
header = out
}
removeHeaders[i] = header
}

httpRoute["headers"] = map[string]interface{}{
"request": map[string]interface{}{
"add": setHeaders,
"set": addHeaders,
"remove": removeHeaders,
},
}

httpRoutes = append(httpRoutes, httpRoute)
}

virtualService.Object["spec"] = map[string]interface{}{
"gateways": []string{
istioGateway,
},
"hosts": []string{
istioHost,
},
"http": httpRoutes,
}

return virtualService, nil
}

func ReconcileVirtualService(ctx context.Context, r client.Client, virtualServiceName, namespace string, virtualService *unstructured.Unstructured, log logr.Logger) error {
foundVirtualService := &unstructured.Unstructured{}
foundVirtualService.SetAPIVersion(ApiVersionIstio)
foundVirtualService.SetKind(VirtualServiceKind)
justCreated := false
if err := r.Get(ctx, types.NamespacedName{Name: virtualServiceName, Namespace: namespace}, foundVirtualService); err != nil {
if apierrors.IsNotFound(err) {
log.Info("Creating virtual service", "namespace", namespace, "name", virtualServiceName)
if err := r.Create(ctx, virtualService); err != nil {
log.Error(err, "unable to create virtual service")
return err
}
justCreated = true
} else {
log.Error(err, "error getting virtual service")
return err
}
}
if !justCreated {
log.Info("Updating virtual service", "namespace", namespace, "name", virtualServiceName)
if err := r.Update(ctx, foundVirtualService); err != nil {
log.Error(err, "unable to update virtual service")
return err
}
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func validateExtraEnv(workspaceKind *kubefloworgv1beta1.WorkspaceKind) []*field.
for _, env := range workspaceKind.Spec.PodTemplate.ExtraEnv {
if env.Value != "" {
rawValue := env.Value
_, err := helper.RenderExtraEnvValueTemplate(rawValue, httpPathPrefixFunc)
_, err := helper.RenderValueUsingFunc(rawValue, httpPathPrefixFunc)
if err != nil {
extraEnvPath := field.NewPath("spec", "podTemplate", "extraEnv").Key(env.Name).Child("value")
errs = append(errs, field.Invalid(extraEnvPath, rawValue, err.Error()))
Expand Down
Loading

0 comments on commit e235316

Please sign in to comment.