From aa2b7d56457ba66e5239a7c823f1b6e233ab7a25 Mon Sep 17 00:00:00 2001 From: Ishan Khare Date: Thu, 30 Mar 2023 01:44:34 +0530 Subject: [PATCH 1/2] reimplement fake kubelet services for node InternalIP addresses this is needed for proper resolution of kubelet metrics targets for prometheus Signed-off-by: Ishan Khare --- charts/eks/templates/syncer-deployment.yaml | 3 + charts/eks/values.yaml | 1 + charts/k0s/templates/statefulset.yaml | 3 + charts/k0s/values.yaml | 1 + charts/k3s/templates/statefulset.yaml | 3 + charts/k3s/values.yaml | 1 + charts/k8s/templates/syncer-deployment.yaml | 3 + charts/k8s/values.yaml | 1 + cmd/vcluster/context/flag_options.go | 2 + pkg/constants/indices.go | 2 + .../resources/nodes/fake_syncer.go | 56 +++++- .../resources/nodes/fake_syncer_test.go | 12 +- .../nodes/nodeservice/node_service.go | 177 ++++++++++++++++++ pkg/controllers/resources/nodes/register.go | 15 +- pkg/controllers/resources/nodes/syncer.go | 18 +- .../resources/nodes/syncer_test.go | 2 +- pkg/controllers/resources/nodes/translate.go | 21 ++- pkg/controllers/syncer/fake_syncer.go | 1 + pkg/server/cert/syncer.go | 21 +++ pkg/server/filters/nodename.go | 29 ++- pkg/server/server.go | 32 +++- 21 files changed, 377 insertions(+), 27 deletions(-) create mode 100644 pkg/controllers/resources/nodes/nodeservice/node_service.go diff --git a/charts/eks/templates/syncer-deployment.yaml b/charts/eks/templates/syncer-deployment.yaml index a77c02b9e..a3e824f79 100644 --- a/charts/eks/templates/syncer-deployment.yaml +++ b/charts/eks/templates/syncer-deployment.yaml @@ -150,6 +150,9 @@ spec: {{- if .Values.sync.secrets.all }} - --sync-all-secrets=true {{- end }} + {{- if not .Values.sync.nodes.fakeKubeletIPs }} + - --fake-kubelet-ips=false + {{- end }} {{- range $f := .Values.syncer.extraArgs }} - {{ $f | quote }} {{- end }} diff --git a/charts/eks/values.yaml b/charts/eks/values.yaml index 3381889c5..75136dcb8 100644 --- a/charts/eks/values.yaml +++ b/charts/eks/values.yaml @@ -54,6 +54,7 @@ sync: fake-persistentvolumes: enabled: true # will be ignored if persistentvolumes.enabled = true nodes: + fakeKubeletIPs: true enabled: false # If nodes sync is enabled, and syncAllNodes = true, the virtual cluster # will sync all nodes instead of only the ones where some pods are running. diff --git a/charts/k0s/templates/statefulset.yaml b/charts/k0s/templates/statefulset.yaml index 03a126bac..67f53b959 100644 --- a/charts/k0s/templates/statefulset.yaml +++ b/charts/k0s/templates/statefulset.yaml @@ -203,6 +203,9 @@ spec: {{- if .Values.sync.secrets.all }} - --sync-all-secrets=true {{- end }} + {{- if not .Values.sync.nodes.fakeKubeletIPs }} + - --fake-kubelet-ips=false + {{- end }} {{- range $f := .Values.syncer.extraArgs }} - {{ $f | quote }} {{- end }} diff --git a/charts/k0s/values.yaml b/charts/k0s/values.yaml index 88d208a10..ce98c229b 100644 --- a/charts/k0s/values.yaml +++ b/charts/k0s/values.yaml @@ -50,6 +50,7 @@ sync: fake-persistentvolumes: enabled: true # will be ignored if persistentvolumes.enabled = true nodes: + fakeKubeletIPs: true enabled: false # If nodes sync is enabled, and syncAllNodes = true, the virtual cluster # will sync all nodes instead of only the ones where some pods are running. diff --git a/charts/k3s/templates/statefulset.yaml b/charts/k3s/templates/statefulset.yaml index de4ad4ec6..2baccfc34 100644 --- a/charts/k3s/templates/statefulset.yaml +++ b/charts/k3s/templates/statefulset.yaml @@ -267,6 +267,9 @@ spec: {{- if .Values.sync.secrets.all }} - --sync-all-secrets=true {{- end }} + {{- if not .Values.sync.nodes.fakeKubeletIPs }} + - --fake-kubelet-ips=false + {{- end }} {{- range $f := .Values.syncer.extraArgs }} - {{ $f | quote }} {{- end }} diff --git a/charts/k3s/values.yaml b/charts/k3s/values.yaml index 05d15c04e..c31d1b98b 100644 --- a/charts/k3s/values.yaml +++ b/charts/k3s/values.yaml @@ -55,6 +55,7 @@ sync: fake-persistentvolumes: enabled: true # will be ignored if persistentvolumes.enabled = true nodes: + fakeKubeletIPs: true enabled: false # If nodes sync is enabled, and syncAllNodes = true, the virtual cluster # will sync all nodes instead of only the ones where some pods are running. diff --git a/charts/k8s/templates/syncer-deployment.yaml b/charts/k8s/templates/syncer-deployment.yaml index 57e5aedca..b48d7b289 100644 --- a/charts/k8s/templates/syncer-deployment.yaml +++ b/charts/k8s/templates/syncer-deployment.yaml @@ -185,6 +185,9 @@ spec: {{- if .Values.sync.secrets.all }} - --sync-all-secrets=true {{- end }} + {{- if not .Values.sync.nodes.fakeKubeletIPs }} + - --fake-kubelet-ips=false + {{- end }} {{- range $f := .Values.syncer.extraArgs }} - {{ $f | quote }} {{- end }} diff --git a/charts/k8s/values.yaml b/charts/k8s/values.yaml index 4bc22ebab..b4cef5b3f 100644 --- a/charts/k8s/values.yaml +++ b/charts/k8s/values.yaml @@ -54,6 +54,7 @@ sync: fake-persistentvolumes: enabled: true # will be ignored if persistentvolumes.enabled = true nodes: + fakeKubeletIPs: true enabled: false # If nodes sync is enabled, and syncAllNodes = true, the virtual cluster # will sync all nodes instead of only the ones where some pods are running. diff --git a/cmd/vcluster/context/flag_options.go b/cmd/vcluster/context/flag_options.go index 3d283e5c3..c2e259312 100644 --- a/cmd/vcluster/context/flag_options.go +++ b/cmd/vcluster/context/flag_options.go @@ -39,6 +39,7 @@ type VirtualClusterOptions struct { SyncAllNodes bool `json:"syncAllNodes,omitempty"` EnableScheduler bool `json:"enableScheduler,omitempty"` DisableFakeKubelets bool `json:"disableFakeKubelets,omitempty"` + FakeKubeletIPs bool `json:"fakeKubeletIPs,omitempty"` ClearNodeImages bool `json:"clearNodeImages,omitempty"` TranslateImages []string `json:"translateImages,omitempty"` @@ -121,6 +122,7 @@ func AddFlags(flags *pflag.FlagSet, options *VirtualClusterOptions) { flags.BoolVar(&options.SyncAllNodes, "sync-all-nodes", false, "If enabled and --fake-nodes is false, the virtual cluster will sync all nodes instead of only the needed ones") flags.BoolVar(&options.EnableScheduler, "enable-scheduler", false, "If enabled, will expect a scheduler running in the virtual cluster") flags.BoolVar(&options.DisableFakeKubelets, "disable-fake-kubelets", false, "If disabled, the virtual cluster will not create fake kubelet endpoints to support metrics-servers") + flags.BoolVar(&options.FakeKubeletIPs, "fake-kubelet-ips", true, "If enabled, virtual cluster will assign fake ips of type NodeInternalIP to fake the kubelets") flags.BoolVar(&options.ClearNodeImages, "node-clear-image-status", false, "If enabled, when syncing real nodes, the status.images data will be removed from the vcluster nodes") flags.StringSliceVar(&options.TranslateImages, "translate-image", []string{}, "Translates image names from the virtual pod to the physical pod (e.g. coredns/coredns=mirror.io/coredns/coredns)") diff --git a/pkg/constants/indices.go b/pkg/constants/indices.go index 74911aa8f..551cfa64d 100644 --- a/pkg/constants/indices.go +++ b/pkg/constants/indices.go @@ -10,4 +10,6 @@ const ( IndexByConfigMap = "IndexByConfigMap" // IndexByHostName is used to map rewritten hostnames(advertised as node addresses) to nodenames IndexByHostName = "IndexByHostName" + + IndexByClusterIP = "IndexByClusterIP" ) diff --git a/pkg/controllers/resources/nodes/fake_syncer.go b/pkg/controllers/resources/nodes/fake_syncer.go index 5c6d35629..54d7140cb 100644 --- a/pkg/controllers/resources/nodes/fake_syncer.go +++ b/pkg/controllers/resources/nodes/fake_syncer.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/equality" + "github.com/loft-sh/vcluster/pkg/controllers/resources/nodes/nodeservice" podtranslate "github.com/loft-sh/vcluster/pkg/controllers/resources/pods/translate" "github.com/loft-sh/vcluster/pkg/controllers/syncer" synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" @@ -30,14 +31,18 @@ var ( FakeNodesVersion = "v1.19.1" ) -func NewFakeSyncer(ctx *synccontext.RegisterContext) (syncer.Object, error) { +func NewFakeSyncer(ctx *synccontext.RegisterContext, nodeService nodeservice.NodeServiceProvider) (syncer.Object, error) { return &fakeNodeSyncer{ - currentNamespace: ctx.CurrentNamespace, + nodeServiceProvider: nodeService, + currentNamespace: ctx.CurrentNamespace, + fakeKubeletIPs: ctx.Options.FakeKubeletIPs, }, nil } type fakeNodeSyncer struct { - currentNamespace string + nodeServiceProvider nodeservice.NodeServiceProvider + currentNamespace string + fakeKubeletIPs bool } func (r *fakeNodeSyncer) Resource() client.Object { @@ -57,7 +62,7 @@ func (r *fakeNodeSyncer) RegisterIndices(ctx *synccontext.RegisterContext) error var _ syncer.ControllerModifier = &fakeNodeSyncer{} func (r *fakeNodeSyncer) ModifyController(ctx *synccontext.RegisterContext, builder *builder.Builder) (*builder.Builder, error) { - return modifyController(ctx, builder) + return modifyController(ctx, r.nodeServiceProvider, builder) } var _ syncer.FakeSyncer = &fakeNodeSyncer{} @@ -71,7 +76,7 @@ func (r *fakeNodeSyncer) FakeSyncUp(ctx *synccontext.SyncContext, name types.Nam } ctx.Log.Infof("Create fake node %s", name.Name) - return ctrl.Result{}, CreateFakeNode(ctx.Context, r.currentNamespace, ctx.VirtualClient, name) + return ctrl.Result{}, CreateFakeNode(ctx.Context, r.fakeKubeletIPs, r.nodeServiceProvider, r.currentNamespace, ctx.VirtualClient, name) } func (r *fakeNodeSyncer) FakeSync(ctx *synccontext.SyncContext, vObj client.Object) (ctrl.Result, error) { @@ -89,7 +94,7 @@ func (r *fakeNodeSyncer) FakeSync(ctx *synccontext.SyncContext, vObj client.Obje } // check if we need to update node ips - updated := r.updateIfNeeded(node) + updated := r.updateIfNeeded(ctx, node, types.NamespacedName{Name: node.Name, Namespace: r.currentNamespace}) if updated != nil { ctx.Log.Infof("Update fake node %s", node.Name) err := ctx.VirtualClient.Status().Update(ctx.Context, updated) @@ -101,7 +106,7 @@ func (r *fakeNodeSyncer) FakeSync(ctx *synccontext.SyncContext, vObj client.Obje return ctrl.Result{}, nil } -func (r *fakeNodeSyncer) updateIfNeeded(node *corev1.Node) *corev1.Node { +func (r *fakeNodeSyncer) updateIfNeeded(ctx *synccontext.SyncContext, node *corev1.Node, name types.NamespacedName) *corev1.Node { var updated *corev1.Node newAddresses := []corev1.NodeAddress{ @@ -110,6 +115,19 @@ func (r *fakeNodeSyncer) updateIfNeeded(node *corev1.Node) *corev1.Node { Type: corev1.NodeHostName, }, } + + if r.fakeKubeletIPs { + nodeIP, err := r.nodeServiceProvider.GetNodeIP(ctx.Context, name) + if err != nil { + ctx.Log.Errorf("error getting fake node ip: %v", err) + } + + newAddresses = append(newAddresses, corev1.NodeAddress{ + Address: nodeIP, + Type: corev1.NodeInternalIP, + }) + } + if !equality.Semantic.DeepEqual(node.Status.Addresses, newAddresses) { updated = translator.NewIfNil(updated, node) updated.Status.Addresses = newAddresses @@ -127,7 +145,16 @@ func newGUID() string { return random.RandomString(8) + "-" + random.RandomString(4) + "-" + random.RandomString(4) + "-" + random.RandomString(4) + "-" + random.RandomString(12) } -func CreateFakeNode(ctx context.Context, currentNamespace string, virtualClient client.Client, name types.NamespacedName) error { +func CreateFakeNode(ctx context.Context, + fakeKubeletIPs bool, + nodeServiceProvider nodeservice.NodeServiceProvider, + currentNamespace string, + virtualClient client.Client, + name types.NamespacedName) error { + + nodeServiceProvider.Lock() + defer nodeServiceProvider.Unlock() + node := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: name.Name, @@ -228,6 +255,19 @@ func CreateFakeNode(ctx context.Context, currentNamespace string, virtualClient }, Images: []corev1.ContainerImage{}, } + + if fakeKubeletIPs { + nodeIP, err := nodeServiceProvider.GetNodeIP(ctx, name) + if err != nil { + return errors.Wrap(err, "create fake node ip") + } + + node.Status.Addresses = append(node.Status.Addresses, corev1.NodeAddress{ + Address: nodeIP, + Type: corev1.NodeInternalIP, + }) + } + err = virtualClient.Status().Patch(ctx, node, client.MergeFrom(orig)) if err != nil { return err diff --git a/pkg/controllers/resources/nodes/fake_syncer_test.go b/pkg/controllers/resources/nodes/fake_syncer_test.go index cf912b406..1b05f5cc5 100644 --- a/pkg/controllers/resources/nodes/fake_syncer_test.go +++ b/pkg/controllers/resources/nodes/fake_syncer_test.go @@ -1,6 +1,7 @@ package nodes import ( + "context" "testing" "github.com/loft-sh/vcluster/pkg/controllers/syncer" @@ -29,11 +30,20 @@ func newFakeFakeSyncer(t *testing.T, ctx *synccontext.RegisterContext) (*synccon assert.NilError(t, err) syncContext, object := generictesting.FakeStartSyncer(t, ctx, func(ctx *synccontext.RegisterContext) (syncer.Object, error) { - return NewFakeSyncer(ctx) + return NewFakeSyncer(ctx, &fakeNodeServiceProvider{}) }) return syncContext, object.(*fakeNodeSyncer) } +type fakeNodeServiceProvider struct{} + +func (f *fakeNodeServiceProvider) Start(ctx context.Context) {} +func (f *fakeNodeServiceProvider) Lock() {} +func (f *fakeNodeServiceProvider) Unlock() {} +func (f *fakeNodeServiceProvider) GetNodeIP(ctx context.Context, name types.NamespacedName) (string, error) { + return "127.0.0.1", nil +} + func TestFakeSync(t *testing.T) { fakeGUID := newGUID() now := metav1.Now() diff --git a/pkg/controllers/resources/nodes/nodeservice/node_service.go b/pkg/controllers/resources/nodes/nodeservice/node_service.go new file mode 100644 index 000000000..bd22fa482 --- /dev/null +++ b/pkg/controllers/resources/nodes/nodeservice/node_service.go @@ -0,0 +1,177 @@ +package nodeservice + +import ( + "context" + "strings" + "sync" + "time" + + "github.com/loft-sh/vcluster/pkg/util/translate" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + // ServiceClusterLabel identifies to which vcluster the node service belongs if there are multiple in one namespace + ServiceClusterLabel = "vcluster.loft.sh/belongs-to" + // ServiceNodeLabel specifies which node this service represents + ServiceNodeLabel = "vcluster.loft.sh/node" + // KubeletPort is the port we pretend the kubelet is running under + KubeletPort = int32(10250) + // KubeletTargetPort is the port vcluster will run under + KubeletTargetPort = 8443 +) + +type NodeServiceProvider interface { + sync.Locker + // Start starts the node service garbage collector + Start(ctx context.Context) + // GetNodeIP returns a new fake node ip + GetNodeIP(ctx context.Context, name types.NamespacedName) (string, error) +} + +func NewNodeServiceProvider(serviceName, currentNamespace string, currentNamespaceClient client.Client, virtualClient client.Client, uncachedVirtualClient client.Client) NodeServiceProvider { + return &nodeServiceProvider{ + serviceName: serviceName, + currentNamespace: currentNamespace, + currentNamespaceClient: currentNamespaceClient, + virtualClient: virtualClient, + uncachedVirtualClient: uncachedVirtualClient, + } +} + +type nodeServiceProvider struct { + serviceName string + currentNamespace string + currentNamespaceClient client.Client + + virtualClient client.Client + uncachedVirtualClient client.Client + + serviceMutex sync.Mutex +} + +func (n *nodeServiceProvider) Start(ctx context.Context) { + wait.Until(func() { + err := n.cleanupNodeServices(ctx) + if err != nil { + klog.Errorf("error cleaning up node services: %v", err) + } + }, time.Second*4, ctx.Done()) +} + +func (n *nodeServiceProvider) cleanupNodeServices(ctx context.Context) error { + n.serviceMutex.Lock() + defer n.serviceMutex.Unlock() + + serviceList := &corev1.ServiceList{} + err := n.currentNamespaceClient.List(ctx, serviceList, client.InNamespace(n.currentNamespace), client.MatchingLabels{ + ServiceClusterLabel: translate.Suffix, + }) + if err != nil { + return errors.Wrap(err, "list services") + } + + errors := []error{} + for _, s := range serviceList.Items { + exist := false + if s.Labels[ServiceNodeLabel] != "" { + // check if node still exists + err = n.virtualClient.Get(ctx, client.ObjectKey{Name: s.Labels[ServiceNodeLabel]}, &corev1.Node{}) + if err != nil { + if !kerrors.IsNotFound(err) { + klog.Infof("error retrieving node %s: %v", s.Labels[ServiceNodeLabel], err) + continue + } + + // make sure node really does not exist + err = n.uncachedVirtualClient.Get(ctx, client.ObjectKey{Name: s.Labels[ServiceNodeLabel]}, &corev1.Node{}) + if err == nil { + exist = true + } + } else { + exist = true + } + } + + if !exist { + klog.Infof("Cleaning up kubelet service for node %s", s.Labels[ServiceNodeLabel]) + err = n.currentNamespaceClient.Delete(ctx, &s) + if err != nil { + errors = append(errors, err) + } + } + } + + return utilerrors.NewAggregate(errors) +} + +func (n *nodeServiceProvider) Lock() { + n.serviceMutex.Lock() +} + +func (n *nodeServiceProvider) Unlock() { + n.serviceMutex.Unlock() +} + +func (n *nodeServiceProvider) GetNodeIP(ctx context.Context, name types.NamespacedName) (string, error) { + serviceName := translate.SafeConcatName(translate.Suffix, "node", strings.ReplaceAll(name.Name, ".", "-")) + + service := &corev1.Service{} + err := n.currentNamespaceClient.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: n.currentNamespace}, service) + if err != nil && !kerrors.IsNotFound(err) { + return "", errors.Wrap(err, "list services") + } else if err == nil { + return service.Spec.ClusterIP, nil + } + + // find out the labels to select ourself + vclusterService := &corev1.Service{} + err = n.currentNamespaceClient.Get(ctx, types.NamespacedName{Name: n.serviceName, Namespace: n.currentNamespace}, vclusterService) + if err != nil { + return "", errors.Wrap(err, "get vcluster service") + } + + // create the new service + nodeService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: n.currentNamespace, + Name: serviceName, + Labels: map[string]string{ + ServiceClusterLabel: translate.Suffix, + ServiceNodeLabel: name.Name, + }, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Port: int32(KubeletPort), + TargetPort: intstr.FromInt(KubeletTargetPort), + }, + }, + Selector: vclusterService.Spec.Selector, + }, + } + + // set owner if defined + if translate.Owner != nil { + nodeService.SetOwnerReferences(translate.GetOwnerReference(nil)) + } + + // create the service + klog.Infof("Generating kubelet service for node %s", name.Name) + err = n.currentNamespaceClient.Create(ctx, nodeService) + if err != nil { + return "", errors.Wrap(err, "create node service") + } + + return nodeService.Spec.ClusterIP, nil +} diff --git a/pkg/controllers/resources/nodes/register.go b/pkg/controllers/resources/nodes/register.go index d0968a0f3..be3e14893 100644 --- a/pkg/controllers/resources/nodes/register.go +++ b/pkg/controllers/resources/nodes/register.go @@ -1,14 +1,25 @@ package nodes import ( + "github.com/loft-sh/vcluster/pkg/controllers/resources/nodes/nodeservice" "github.com/loft-sh/vcluster/pkg/controllers/syncer" synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" + "sigs.k8s.io/controller-runtime/pkg/client" ) func New(ctx *synccontext.RegisterContext) (syncer.Object, error) { + uncachedVirtualClient, err := client.New(ctx.VirtualManager.GetConfig(), client.Options{ + Scheme: ctx.VirtualManager.GetScheme(), + Mapper: ctx.VirtualManager.GetRESTMapper(), + }) + if err != nil { + return nil, err + } + + nodeService := nodeservice.NewNodeServiceProvider(ctx.Options.ServiceName, ctx.CurrentNamespace, ctx.CurrentNamespaceClient, ctx.VirtualManager.GetClient(), uncachedVirtualClient) if !ctx.Controllers.Has("nodes") { - return NewFakeSyncer(ctx) + return NewFakeSyncer(ctx, nodeService) } - return NewSyncer(ctx) + return NewSyncer(ctx, nodeService) } diff --git a/pkg/controllers/resources/nodes/syncer.go b/pkg/controllers/resources/nodes/syncer.go index 13a9506c1..e903561e7 100644 --- a/pkg/controllers/resources/nodes/syncer.go +++ b/pkg/controllers/resources/nodes/syncer.go @@ -2,10 +2,12 @@ package nodes import ( "context" + "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/cache" "github.com/loft-sh/vcluster/pkg/constants" + "github.com/loft-sh/vcluster/pkg/controllers/resources/nodes/nodeservice" "github.com/loft-sh/vcluster/pkg/controllers/syncer" synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" "github.com/loft-sh/vcluster/pkg/controllers/syncer/translator" @@ -28,7 +30,7 @@ var ( indexPodByRunningNonVClusterNode = "indexpodbyrunningnonvclusternode" ) -func NewSyncer(ctx *synccontext.RegisterContext) (syncer.Object, error) { +func NewSyncer(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.NodeServiceProvider) (syncer.Object, error) { var err error var nodeSelector labels.Selector if ctx.Options.SyncAllNodes { @@ -58,9 +60,11 @@ func NewSyncer(ctx *synccontext.RegisterContext) (syncer.Object, error) { nodeSelector: nodeSelector, clearImages: ctx.Options.ClearNodeImages, useFakeKubelets: !ctx.Options.DisableFakeKubelets, + fakeKubeletIPs: ctx.Options.FakeKubeletIPs, physicalClient: ctx.PhysicalManager.GetClient(), virtualClient: ctx.VirtualManager.GetClient(), + nodeServiceProvider: nodeServiceProvider, enforcedTolerations: tolerations, currentNamespace: ctx.CurrentNamespace, @@ -75,11 +79,13 @@ type nodeSyncer struct { enforceNodeSelector bool nodeSelector labels.Selector useFakeKubelets bool + fakeKubeletIPs bool physicalClient client.Client virtualClient client.Client podCache client.Reader + nodeServiceProvider nodeservice.NodeServiceProvider enforcedTolerations []*corev1.Toleration currentNamespace string } @@ -131,10 +137,14 @@ func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, builder podCache.WaitForCacheSync(ctx.Context) s.podCache = podCache } - return modifyController(ctx, builder) + return modifyController(ctx, s.nodeServiceProvider, builder) } -func modifyController(ctx *synccontext.RegisterContext, builder *builder.Builder) (*builder.Builder, error) { +func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.NodeServiceProvider, builder *builder.Builder) (*builder.Builder, error) { + go func() { + nodeServiceProvider.Start(ctx.Context) + }() + return builder.Watches(source.NewKindWithCache(&corev1.Pod{}, ctx.PhysicalManager.GetCache()), handler.EnqueueRequestsFromMapFunc(func(object client.Object) []reconcile.Request { pod, ok := object.(*corev1.Pod) if !ok || pod == nil || !translate.Default.IsManaged(pod) || pod.Spec.NodeName == "" { @@ -227,7 +237,7 @@ func (s *nodeSyncer) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj return ctrl.Result{}, ctx.VirtualClient.Delete(ctx.Context, vObj) } - updatedVNode, err := s.translateUpdateStatus(pNode, vNode) + updatedVNode, err := s.translateUpdateStatus(ctx, pNode, vNode) if err != nil { return ctrl.Result{}, errors.Wrap(err, "update node status") } else if updatedVNode != nil { diff --git a/pkg/controllers/resources/nodes/syncer_test.go b/pkg/controllers/resources/nodes/syncer_test.go index 6feeb1c29..829129c09 100644 --- a/pkg/controllers/resources/nodes/syncer_test.go +++ b/pkg/controllers/resources/nodes/syncer_test.go @@ -30,7 +30,7 @@ func newFakeSyncer(t *testing.T, ctx *synccontext.RegisterContext) (*synccontext assert.NilError(t, err) syncContext, object := generictesting.FakeStartSyncer(t, ctx, func(ctx *synccontext.RegisterContext) (syncer.Object, error) { - return NewSyncer(ctx) + return NewSyncer(ctx, &fakeNodeServiceProvider{}) }) return syncContext, object.(*nodeSyncer) } diff --git a/pkg/controllers/resources/nodes/translate.go b/pkg/controllers/resources/nodes/translate.go index 84ab43264..b5220ddca 100644 --- a/pkg/controllers/resources/nodes/translate.go +++ b/pkg/controllers/resources/nodes/translate.go @@ -6,8 +6,11 @@ import ( "os" "github.com/loft-sh/vcluster/pkg/constants" + synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" "github.com/loft-sh/vcluster/pkg/controllers/syncer/translator" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/loft-sh/vcluster/pkg/util/stringutil" @@ -120,7 +123,7 @@ func (s *nodeSyncer) translateUpdateBackwards(pNode *corev1.Node, vNode *corev1. return updated } -func (s *nodeSyncer) translateUpdateStatus(pNode *corev1.Node, vNode *corev1.Node) (*corev1.Node, error) { +func (s *nodeSyncer) translateUpdateStatus(ctx *synccontext.SyncContext, pNode *corev1.Node, vNode *corev1.Node) (*corev1.Node, error) { // translate node status first translatedStatus := pNode.Status.DeepCopy() if s.useFakeKubelets { @@ -137,6 +140,22 @@ func (s *nodeSyncer) translateUpdateStatus(pNode *corev1.Node, vNode *corev1.Nod Type: corev1.NodeHostName, }, } + + if s.fakeKubeletIPs { + // create new service for this node + nodeIP, err := s.nodeServiceProvider.GetNodeIP(ctx.Context, types.NamespacedName{ + Name: vNode.Name, + }) + if err != nil { + return nil, errors.Wrap(err, "get vNode IP") + } + + newAddresses = append(newAddresses, corev1.NodeAddress{ + Address: nodeIP, + Type: corev1.NodeInternalIP, + }) + } + for _, oldAddress := range translatedStatus.Addresses { if oldAddress.Type == corev1.NodeInternalIP || oldAddress.Type == corev1.NodeInternalDNS || oldAddress.Type == corev1.NodeHostName { continue diff --git a/pkg/controllers/syncer/fake_syncer.go b/pkg/controllers/syncer/fake_syncer.go index decd98792..9911e5fc1 100644 --- a/pkg/controllers/syncer/fake_syncer.go +++ b/pkg/controllers/syncer/fake_syncer.go @@ -2,6 +2,7 @@ package syncer import ( "context" + "github.com/loft-sh/vcluster/pkg/util/translate" synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" diff --git a/pkg/server/cert/syncer.go b/pkg/server/cert/syncer.go index e741e3aa7..ccb2f2904 100644 --- a/pkg/server/cert/syncer.go +++ b/pkg/server/cert/syncer.go @@ -10,6 +10,7 @@ import ( "time" "github.com/loft-sh/vcluster/pkg/constants" + "github.com/loft-sh/vcluster/pkg/controllers/resources/nodes/nodeservice" ctrlcontext "github.com/loft-sh/vcluster/cmd/vcluster/context" "github.com/loft-sh/vcluster/pkg/util/translate" @@ -35,6 +36,8 @@ func NewSyncer(currentNamespace string, currentNamespaceClient client.Client, op serverCaKey: options.ServerCaKey, serverCaCert: options.ServerCaCert, + fakeKubeletIPs: options.FakeKubeletIPs, + addSANs: options.TLSSANs, listeners: []dynamiccertificates.Listener{}, @@ -56,6 +59,8 @@ type syncer struct { currentNamespace string currentNamespaceCient client.Client + fakeKubeletIPs bool + listeners []dynamiccertificates.Listener currentCertMutex sync.RWMutex @@ -152,6 +157,22 @@ func (s *syncer) getSANs() ([]string, error) { ) } + if s.fakeKubeletIPs { + // get cluster ips of node services + svcs := &corev1.ServiceList{} + err = s.currentNamespaceCient.List(context.TODO(), svcs, client.InNamespace(s.currentNamespace), client.MatchingLabels{nodeservice.ServiceClusterLabel: translate.Suffix}) + if err != nil { + return nil, err + } + for _, svc := range svcs.Items { + if svc.Spec.ClusterIP == "" { + continue + } + + retSANs = append(retSANs, svc.Spec.ClusterIP) + } + } + // make sure other sans are there as well retSANs = append(retSANs, s.addSANs...) sort.Strings(retSANs) diff --git a/pkg/server/filters/nodename.go b/pkg/server/filters/nodename.go index 02b6a9a93..3edd8135f 100644 --- a/pkg/server/filters/nodename.go +++ b/pkg/server/filters/nodename.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/loft-sh/vcluster/pkg/constants" + "github.com/loft-sh/vcluster/pkg/controllers/resources/nodes/nodeservice" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/klog" @@ -19,9 +20,9 @@ type nodeName int // does not conflict with the keys defined in pkg/api. const nodeNameKey nodeName = iota -func WithNodeName(h http.Handler, cli client.Client) http.Handler { +func WithNodeName(h http.Handler, currentNamespace string, fakeKubeletIPs bool, virtualClient, physicalClient client.Client) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - nodeName := nodeNameFromHost(req, cli) + nodeName := nodeNameFromHost(req, currentNamespace, fakeKubeletIPs, virtualClient, physicalClient) if nodeName != "" { req = req.WithContext(context.WithValue(req.Context(), nodeNameKey, nodeName)) } @@ -35,21 +36,37 @@ func NodeNameFrom(ctx context.Context) (string, bool) { return info, ok } -func nodeNameFromHost(req *http.Request, cli client.Client) string { +func nodeNameFromHost(req *http.Request, currentNamespace string, fakeKubeletIPs bool, virtualClient client.Client, physicalClient client.Client) string { splitted := strings.Split(req.Host, ":") if len(splitted) == 2 { hostname := splitted[0] nodeList := &corev1.NodeList{} - err := cli.List(req.Context(), nodeList, client.MatchingFields{constants.IndexByHostName: hostname}) + err := virtualClient.List(req.Context(), nodeList, client.MatchingFields{constants.IndexByHostName: hostname}) if err != nil && !errors.IsNotFound(err) { klog.Error(err, "couldn't fetch nodename for hostname") - return "" } if len(nodeList.Items) == 1 { nodeName := nodeList.Items[0].Name return nodeName } + + if fakeKubeletIPs { + // try to fetch hostname by node service clusterIP + serviceList := &corev1.ServiceList{} + err = physicalClient.List(req.Context(), serviceList, client.InNamespace(currentNamespace), client.MatchingFields{constants.IndexByClusterIP: hostname}) + if err != nil { + klog.Error(err, "couldn't fetch nodename from nodeservice") + return "" + } + + // we found a service? + if len(serviceList.Items) > 0 { + serviceLabels := serviceList.Items[0].Labels + if len(serviceLabels) > 0 && serviceLabels[nodeservice.ServiceNodeLabel] != "" { + return serviceLabels[nodeservice.ServiceNodeLabel] + } + } + } } return "" - } diff --git a/pkg/server/server.go b/pkg/server/server.go index 362317906..521f5a4c8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -17,6 +17,7 @@ import ( "github.com/loft-sh/vcluster/pkg/authorization/kubeletauthorizer" "github.com/loft-sh/vcluster/pkg/constants" "github.com/loft-sh/vcluster/pkg/controllers/resources/nodes" + "github.com/loft-sh/vcluster/pkg/controllers/resources/nodes/nodeservice" "github.com/loft-sh/vcluster/pkg/server/cert" "github.com/loft-sh/vcluster/pkg/server/filters" "github.com/loft-sh/vcluster/pkg/server/handler" @@ -56,7 +57,10 @@ type Server struct { uncachedVirtualClient client.Client cachedVirtualClient client.Client - currentNamespace string + currentNamespace string + currentNamespaceClient client.Client + + fakeKubeletIPs bool certSyncer cert.Syncer handler *http.ServeMux @@ -86,7 +90,24 @@ func NewServer(ctx *context2.ControllerContext, requestHeaderCaFile, clientCaFil return nil, err } - cachedLocalClient, err := createCachedClient(ctx.Context, localConfig, ctx.CurrentNamespace, uncachedLocalClient.RESTMapper(), uncachedLocalClient.Scheme(), nil) + cachedLocalClient, err := createCachedClient(ctx.Context, localConfig, ctx.CurrentNamespace, uncachedLocalClient.RESTMapper(), uncachedLocalClient.Scheme(), func(cache cache.Cache) error { + if ctx.Options.FakeKubeletIPs { + err := cache.IndexField(ctx.Context, &corev1.Service{}, constants.IndexByClusterIP, func(object client.Object) []string { + svc := object.(*corev1.Service) + if len(svc.Labels) == 0 || svc.Labels[nodeservice.ServiceClusterLabel] != translate.Suffix { + return nil + } + + return []string{svc.Spec.ClusterIP} + }) + if err != nil { + return err + } + } + + return nil + }) + if err != nil { return nil, err } @@ -135,7 +156,10 @@ func NewServer(ctx *context2.ControllerContext, requestHeaderCaFile, clientCaFil certSyncer: certSyncer, handler: http.NewServeMux(), - currentNamespace: ctx.CurrentNamespace, + fakeKubeletIPs: ctx.Options.FakeKubeletIPs, + + currentNamespace: ctx.CurrentNamespace, + currentNamespaceClient: cachedLocalClient, requestHeaderCaFile: requestHeaderCaFile, clientCaFile: clientCaFile, @@ -297,7 +321,7 @@ func createCachedClient(ctx context.Context, config *rest.Config, namespace stri func (s *Server) buildHandlerChain(serverConfig *server.Config) http.Handler { defaultHandler := server.DefaultBuildHandlerChain(s.handler, serverConfig) - defaultHandler = filters.WithNodeName(defaultHandler, s.cachedVirtualClient) + defaultHandler = filters.WithNodeName(defaultHandler, s.currentNamespace, s.fakeKubeletIPs, s.cachedVirtualClient, s.currentNamespaceClient) return defaultHandler } From 474e9e85f43afc8bf6395b9cd703d2f7767ff6e2 Mon Sep 17 00:00:00 2001 From: Ishan Khare Date: Thu, 30 Mar 2023 18:33:03 +0530 Subject: [PATCH 2/2] add e2e test for checking node services for fake kubelet IPS Signed-off-by: Ishan Khare --- test/e2e_node/node.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/test/e2e_node/node.go b/test/e2e_node/node.go index 4ed54487c..275aa713e 100644 --- a/test/e2e_node/node.go +++ b/test/e2e_node/node.go @@ -5,6 +5,7 @@ import ( "github.com/loft-sh/vcluster/test/framework" "github.com/onsi/ginkgo" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -30,4 +31,20 @@ var _ = ginkgo.Describe("Node sync", func() { framework.ExpectEqual(true, reflect.DeepEqual(hostNodeLabels, virtualNodeLabels)) }) + + ginkgo.It("fake nodes have fake kubelet service IPs", func() { + virtualNodes, err := f.VclusterClient.CoreV1().Nodes().List(f.Context, metav1.ListOptions{}) + framework.ExpectNoError(err) + + for _, nodes := range virtualNodes.Items { + var foundInternalIPAddress bool + for _, address := range nodes.Status.Addresses { + if address.Type == corev1.NodeInternalIP { + foundInternalIPAddress = true + } + } + + framework.ExpectEqual(foundInternalIPAddress, true) + } + }) })