From 619f186889b7928fa505a1c0f54dc2a7fb2ac829 Mon Sep 17 00:00:00 2001 From: Sachin Lobo <87601990+neogopher@users.noreply.github.com> Date: Tue, 30 Jan 2024 16:22:18 +0530 Subject: [PATCH] Sync endpoint updates for service mappings of headless services (#1481) * Add watch for updates to from.endpoints * Add rbac for endpoints if mapServices.Host exists * Add e2e tests --- charts/eks/templates/rbac/clusterrole.yaml | 2 +- charts/k0s/templates/rbac/clusterrole.yaml | 2 +- charts/k3s/templates/rbac/clusterrole.yaml | 2 +- charts/k8s/templates/rbac/clusterrole.yaml | 2 +- pkg/controllers/servicesync/servicesync.go | 46 +++- test/commonValues.yaml | 4 + test/e2e/servicesync/servicesync.go | 291 ++++++++++++++++++++- test/e2e_rootless/values.yaml | 4 + 8 files changed, 337 insertions(+), 16 deletions(-) diff --git a/charts/eks/templates/rbac/clusterrole.yaml b/charts/eks/templates/rbac/clusterrole.yaml index 62cb87f45..684500c3d 100644 --- a/charts/eks/templates/rbac/clusterrole.yaml +++ b/charts/eks/templates/rbac/clusterrole.yaml @@ -83,7 +83,7 @@ rules: {{- include "vcluster.generic.clusterRoleExtraRules" . | indent 2 }} {{- if (not (empty (include "vcluster.serviceMapping.fromHost" . ))) }} - apiGroups: [""] - resources: ["services"] + resources: ["services", "endpoints"] verbs: ["get", "watch", "list"] {{- end }} {{- if .Values.multiNamespaceMode.enabled }} diff --git a/charts/k0s/templates/rbac/clusterrole.yaml b/charts/k0s/templates/rbac/clusterrole.yaml index 62cb87f45..684500c3d 100644 --- a/charts/k0s/templates/rbac/clusterrole.yaml +++ b/charts/k0s/templates/rbac/clusterrole.yaml @@ -83,7 +83,7 @@ rules: {{- include "vcluster.generic.clusterRoleExtraRules" . | indent 2 }} {{- if (not (empty (include "vcluster.serviceMapping.fromHost" . ))) }} - apiGroups: [""] - resources: ["services"] + resources: ["services", "endpoints"] verbs: ["get", "watch", "list"] {{- end }} {{- if .Values.multiNamespaceMode.enabled }} diff --git a/charts/k3s/templates/rbac/clusterrole.yaml b/charts/k3s/templates/rbac/clusterrole.yaml index 62cb87f45..684500c3d 100644 --- a/charts/k3s/templates/rbac/clusterrole.yaml +++ b/charts/k3s/templates/rbac/clusterrole.yaml @@ -83,7 +83,7 @@ rules: {{- include "vcluster.generic.clusterRoleExtraRules" . | indent 2 }} {{- if (not (empty (include "vcluster.serviceMapping.fromHost" . ))) }} - apiGroups: [""] - resources: ["services"] + resources: ["services", "endpoints"] verbs: ["get", "watch", "list"] {{- end }} {{- if .Values.multiNamespaceMode.enabled }} diff --git a/charts/k8s/templates/rbac/clusterrole.yaml b/charts/k8s/templates/rbac/clusterrole.yaml index 62cb87f45..684500c3d 100644 --- a/charts/k8s/templates/rbac/clusterrole.yaml +++ b/charts/k8s/templates/rbac/clusterrole.yaml @@ -83,7 +83,7 @@ rules: {{- include "vcluster.generic.clusterRoleExtraRules" . | indent 2 }} {{- if (not (empty (include "vcluster.serviceMapping.fromHost" . ))) }} - apiGroups: [""] - resources: ["services"] + resources: ["services", "endpoints"] verbs: ["get", "watch", "list"] {{- end }} {{- if .Values.multiNamespaceMode.enabled }} diff --git a/pkg/controllers/servicesync/servicesync.go b/pkg/controllers/servicesync/servicesync.go index 4b2cdd634..ed3fad330 100644 --- a/pkg/controllers/servicesync/servicesync.go +++ b/pkg/controllers/servicesync/servicesync.go @@ -62,6 +62,20 @@ func (e *ServiceSyncer) Register() error { return []reconcile.Request{{NamespacedName: from}} })). + WatchesRawSource(source.Kind(e.From.GetCache(), &corev1.Endpoints{}), handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request { + if object == nil { + return nil + } + + _, ok := e.SyncServices[object.GetNamespace()+"/"+object.GetName()] + if !ok { + return nil + } + + return []reconcile.Request{{ + NamespacedName: types.NamespacedName{Namespace: object.GetNamespace(), Name: object.GetName()}, + }} + })). Complete(e) } @@ -81,7 +95,7 @@ func (e *ServiceSyncer) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R } // make sure the to service is deleted - e.Log.Infof("Delete target service %s/%s because from service is missing", to.Name, to.Namespace) + e.Log.Infof("Delete target service %s/%s because from service is missing", to.Namespace, to.Name) err = e.To.GetClient().Delete(ctx, &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: to.Name, @@ -281,16 +295,32 @@ func (e *ServiceSyncer) syncServiceAndEndpoints(ctx context.Context, fromService } // check if update is needed - expectedSubsets := []corev1.EndpointSubset{ - { - Addresses: []corev1.EndpointAddress{ - { - IP: fromService.Spec.ClusterIP, + var expectedSubsets []corev1.EndpointSubset + if fromService.Spec.ClusterIP == corev1.ClusterIPNone { + // fetch the corresponding endpoint and assign address from there to here + fromEndpoint := &corev1.Endpoints{} + err = e.From.GetClient().Get(ctx, types.NamespacedName{ + Name: fromService.GetName(), + Namespace: fromService.GetNamespace(), + }, fromEndpoint) + if err != nil { + return ctrl.Result{}, err + } + + expectedSubsets = fromEndpoint.Subsets + } else { + expectedSubsets = []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: fromService.Spec.ClusterIP, + }, }, + Ports: convertPorts(toService.Spec.Ports), }, - Ports: convertPorts(toService.Spec.Ports), - }, + } } + if !apiequality.Semantic.DeepEqual(toEndpoints.Subsets, expectedSubsets) { e.Log.Infof("Update target endpoints %s/%s because subsets are different", to.Namespace, to.Name) toEndpoints.Subsets = expectedSubsets diff --git a/test/commonValues.yaml b/test/commonValues.yaml index 3bc0d2900..e17fdf15b 100644 --- a/test/commonValues.yaml +++ b/test/commonValues.yaml @@ -34,9 +34,13 @@ mapServices: fromVirtual: - from: test/test to: test + - from: test/nginx + to: nginx fromHost: - from: test/test to: default/test + - from: test/nginx + to: default/nginx init: helm: diff --git a/test/e2e/servicesync/servicesync.go b/test/e2e/servicesync/servicesync.go index ffd0af819..661cc20b6 100644 --- a/test/e2e/servicesync/servicesync.go +++ b/test/e2e/servicesync/servicesync.go @@ -7,7 +7,9 @@ import ( "github.com/loft-sh/vcluster/pkg/util/translate" "github.com/loft-sh/vcluster/test/framework" "github.com/onsi/ginkgo/v2" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -44,11 +46,41 @@ var _ = ginkgo.Describe("map services from host to virtual cluster and vice vers // virtual -> physical testMapping(ctx, f.VclusterClient, "test", "test", f.HostClient, f.VclusterNamespace, "test", f.MultiNamespaceMode) }) + + ginkgo.Context("Should sync endpoint updates for a headless service", func() { + ginkgo.It("in host -> vcluster service mapping", func() { + checkEndpointsSync(f.Context, f.HostClient, "test", "nginx", f.VclusterClient, "default", "nginx") + }) + + ginkgo.It("in vcluster -> host service mapping", func() { + checkEndpointsSync(f.Context, f.VclusterClient, "test", "nginx", f.HostClient, f.VclusterNamespace, "nginx") + }) + }) }) func testMapping(ctx context.Context, fromClient kubernetes.Interface, fromNamespace, fromName string, toClient kubernetes.Interface, toNamespace, toName string, checkEndpoints bool) { + _, _ = fromClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: fromNamespace}}, metav1.CreateOptions{}) + // clean up the namespace + defer func() { + err := fromClient.CoreV1().Namespaces().Delete(ctx, fromNamespace, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // wait for namespace to be deleted successfully + waitErr := wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { + _, err = fromClient.CoreV1().Namespaces().Get(ctx, fromNamespace, metav1.GetOptions{}) + if err != nil { + if kerrors.IsNotFound(err) { + return true, nil + } + return false, err + } + + return false, nil + }) + framework.ExpectNoError(waitErr) + }() + // create physical service - _, _ = fromClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: fromName}}, metav1.CreateOptions{}) fromService, err := fromClient.CoreV1().Services(fromNamespace).Create(ctx, &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: fromName, @@ -76,14 +108,22 @@ func testMapping(ctx context.Context, fromClient kubernetes.Interface, fromNames toEndpoints *corev1.Endpoints toService *corev1.Service ) - waitErr := wait.PollUntilContextTimeout(ctx, time.Millisecond*100, time.Second*10, true, func(ctx context.Context) (done bool, err error) { + waitErr := wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { toService, err = toClient.CoreV1().Services(toNamespace).Get(ctx, toName, metav1.GetOptions{}) if err != nil { + if !kerrors.IsNotFound(err) { + return false, err + } + return false, nil } toEndpoints, err = toClient.CoreV1().Endpoints(toNamespace).Get(ctx, toName, metav1.GetOptions{}) if err != nil { + if !kerrors.IsNotFound(err) { + return false, err + } + return false, nil } @@ -102,9 +142,13 @@ func testMapping(ctx context.Context, fromClient kubernetes.Interface, fromNames } else { // wait for vcluster service var toService *corev1.Service - waitErr := wait.PollUntilContextTimeout(ctx, time.Millisecond*100, time.Second*10, true, func(ctx context.Context) (done bool, err error) { + waitErr := wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { toService, err = toClient.CoreV1().Services(toNamespace).Get(ctx, toName, metav1.GetOptions{}) if err != nil { + if !kerrors.IsNotFound(err) { + return false, err + } + return false, nil } @@ -128,7 +172,7 @@ func testMapping(ctx context.Context, fromClient kubernetes.Interface, fromNames framework.ExpectNoError(err) // verify service gets deleted in vcluster - waitErr := wait.PollUntilContextTimeout(ctx, time.Millisecond*100, time.Second*10, true, func(ctx context.Context) (done bool, err error) { + waitErr := wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { _, err = toClient.CoreV1().Services(toNamespace).Get(ctx, toName, metav1.GetOptions{}) if err == nil { return false, nil @@ -143,3 +187,242 @@ func testMapping(ctx context.Context, fromClient kubernetes.Interface, fromNames }) framework.ExpectNoError(waitErr) } + +func checkEndpointsSync(ctx context.Context, fromClient kubernetes.Interface, fromNamespace, fromName string, toClient kubernetes.Interface, toNamespace, toName string) { + var ( + fromEndpoints *corev1.Endpoints + toEndpoints *corev1.Endpoints + deployment *appsv1.Deployment + err error + + two int32 = 2 + zero int32 + ) + + _, err = fromClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: fromNamespace}}, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + // clean up the namespace + defer func() { + _ = fromClient.CoreV1().Namespaces().Delete(ctx, fromNamespace, metav1.DeleteOptions{}) + + // wait for namespace to be deleted successfully + waitErr := wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { + _, err = fromClient.CoreV1().Namespaces().Get(ctx, fromNamespace, metav1.GetOptions{}) + if err != nil { + if kerrors.IsNotFound(err) { + return true, nil + } + return false, err + } + + return false, nil + }) + framework.ExpectNoError(waitErr) + }() + + _, err = fromClient.AppsV1().Deployments(fromNamespace).Create(ctx, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: fromName, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &two, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "nginx", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx", + Namespace: fromNamespace, + Labels: map[string]string{ + "app": "nginx", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + }, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Creating the 'from' service targeting the deployment via label selectors") + _, err = fromClient.CoreV1().Services(fromNamespace).Create(ctx, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: fromName, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "nginx", + }, + ClusterIP: corev1.ClusterIPNone, + }, + }, metav1.CreateOptions{}) + + ginkgo.By("Ensuring the 'to' service has been created") + waitErr := wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { + _, err = toClient.CoreV1().Services(toNamespace).Get(ctx, toName, metav1.GetOptions{}) + if err != nil { + if !kerrors.IsNotFound(err) { + return false, err + } + return false, nil + } + + return true, nil + }) + framework.ExpectNoError(err) + framework.ExpectNoError(waitErr) + + // verify the endpoints + waitErr = wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { + fromEndpoints, err = fromClient.CoreV1().Endpoints(fromNamespace).Get(ctx, fromName, metav1.GetOptions{}) + if err != nil { + if !kerrors.IsNotFound(err) { + return false, err + } + + return false, nil + } else if fromEndpoints.Subsets == nil || len(fromEndpoints.Subsets[0].Addresses) != int(two) { + return false, nil + } + + return true, nil + }) + framework.ExpectNoError(err) + framework.ExpectNoError(waitErr) + + waitErr = wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { + toEndpoints, err = toClient.CoreV1().Endpoints(toNamespace).Get(ctx, toName, metav1.GetOptions{}) + if err != nil { + if !kerrors.IsNotFound(err) { + return false, err + } + + return false, nil + } else if toEndpoints.Subsets == nil || len(toEndpoints.Subsets[0].Addresses) != int(two) { + return false, nil + } + + return true, nil + }) + framework.ExpectNoError(err) + framework.ExpectNoError(waitErr) + + ginkgo.By("Ensuring the endpoints are synced") + framework.ExpectEqual(IPAddresses(fromEndpoints.Subsets[0].Addresses), IPAddresses(toEndpoints.Subsets[0].Addresses)) + + ginkgo.By("Scaling down the deployments to 0") + waitErr = wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { + // scale down the deployment + deployment, err = fromClient.AppsV1().Deployments(fromNamespace).Get(ctx, fromName, metav1.GetOptions{}) + if err != nil { + return false, nil + } + + deployment.Spec.Replicas = &zero + + _, err = fromClient.AppsV1().Deployments(fromNamespace).Update(ctx, deployment, metav1.UpdateOptions{}) + if err != nil { + return false, err + } + + return true, nil + }) + framework.ExpectNoError(err) + framework.ExpectNoError(waitErr) + + ginkgo.By("Waiting until all endpoints are cleared") + waitErr = wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { + fromEndpoints, err = fromClient.CoreV1().Endpoints(fromNamespace).Get(ctx, fromName, metav1.GetOptions{}) + if err != nil { + if !kerrors.IsNotFound(err) { + return false, err + } + + return false, nil + } else if fromEndpoints.Subsets != nil { + return false, nil + } + + return true, nil + }) + framework.ExpectNoError(err) + framework.ExpectNoError(waitErr) + + ginkgo.By("Scaling up the deployments back") + waitErr = wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { + // scale up the deployment + deployment, err = fromClient.AppsV1().Deployments(fromNamespace).Get(ctx, fromName, metav1.GetOptions{}) + if err != nil { + return false, err + } + + deployment.Spec.Replicas = &two + + _, err = fromClient.AppsV1().Deployments(fromNamespace).Update(ctx, deployment, metav1.UpdateOptions{}) + if err != nil { + return false, err + } + + return true, nil + }) + framework.ExpectNoError(err) + framework.ExpectNoError(waitErr) + + // verify the endpoints + ginkgo.By("Waiting until all endpoints are registered") + waitErr = wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { + fromEndpoints, err = fromClient.CoreV1().Endpoints(fromNamespace).Get(ctx, fromName, metav1.GetOptions{}) + if err != nil { + if !kerrors.IsNotFound(err) { + return false, err + } + + return false, nil + } else if fromEndpoints.Subsets == nil || len(fromEndpoints.Subsets[0].Addresses) != int(two) { + return false, nil + } + + return true, nil + }) + framework.ExpectNoError(err) + framework.ExpectNoError(waitErr) + + waitErr = wait.PollUntilContextTimeout(ctx, time.Millisecond*500, framework.PollTimeout, true, func(ctx context.Context) (done bool, err error) { + toEndpoints, err = toClient.CoreV1().Endpoints(toNamespace).Get(ctx, toName, metav1.GetOptions{}) + if err != nil { + if !kerrors.IsNotFound(err) { + return false, err + } + + return false, nil + } else if toEndpoints.Subsets == nil || len(toEndpoints.Subsets[0].Addresses) != int(two) { + return false, nil + } + + return true, nil + }) + framework.ExpectNoError(err) + framework.ExpectNoError(waitErr) + + ginkgo.By("Ensuring the endpoints are still synced") + framework.ExpectEqual(IPAddresses(fromEndpoints.Subsets[0].Addresses), IPAddresses(toEndpoints.Subsets[0].Addresses)) +} + +func IPAddresses(endpointAddresses []corev1.EndpointAddress) []string { + xIP := make([]string, 0) + + for _, address := range endpointAddresses { + xIP = append(xIP, address.IP) + } + + return xIP +} diff --git a/test/e2e_rootless/values.yaml b/test/e2e_rootless/values.yaml index 0a3e07dae..5da246c08 100644 --- a/test/e2e_rootless/values.yaml +++ b/test/e2e_rootless/values.yaml @@ -18,9 +18,13 @@ mapServices: fromVirtual: - from: test/test to: test + - from: test/nginx + to: nginx fromHost: - from: test/test to: default/test + - from: test/nginx + to: default/nginx sync: nodes: