Skip to content

Commit

Permalink
Update hostpathmapper to work with multi namespace mode.
Browse files Browse the repository at this point in the history
- When multiNamespace mode is detected, the localmanager cache's listwatch will watch all namespaces instead of just the target namespace.
- In mapHostPaths, only pods belonging to namespaces managed by the current vcluster will be allowed to be processed.
  • Loading branch information
neogopher committed Feb 20, 2024
1 parent 36be0fd commit 10213db
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 84 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ jobs:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version-file: "go.mod"
cache: false
- uses: actions/checkout@v3
- name: Run golangci-lint
uses: golangci/golangci-lint-action@v3
with:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.20 as builder
FROM golang:1.22 as builder

WORKDIR /vcluster-hpm-dev
ARG TARGETOS
Expand Down
195 changes: 115 additions & 80 deletions cmd/hostpaths/hostpaths.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/loft-sh/vcluster/pkg/controllers/resources/namespaces"
podtranslate "github.com/loft-sh/vcluster/pkg/controllers/resources/pods/translate"
"github.com/loft-sh/vcluster/pkg/util/clienthelper"

Expand All @@ -18,13 +19,12 @@ import (
"github.com/loft-sh/vcluster/pkg/util/translate"
"github.com/pkg/errors"
"github.com/spf13/cobra"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -148,7 +148,7 @@ func Start(ctx context.Context, options *context2.VirtualClusterOptions, init bo

kubeClient, err := kubernetes.NewForConfig(virtualClusterConfig)
if err != nil {
return false, errors.Wrap(err, "create kube client")
return false, fmt.Errorf("create kube client: %w", err)
}

_, err = kubeClient.Discovery().ServerVersion()
Expand All @@ -168,13 +168,17 @@ func Start(ctx context.Context, options *context2.VirtualClusterOptions, init bo
return err
}

localManager, err := ctrl.NewManager(inClusterConfig, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: "0",
LeaderElection: false,
Namespace: options.TargetNamespace,
NewClient: pluginhookclient.NewPhysicalPluginClientFactory(blockingcacheclient.NewCacheClient),
})
kubeClient, err := kubernetes.NewForConfig(inClusterConfig)
if err != nil {
return fmt.Errorf("create kube client: %w", err)
}

err = findVclusterModeAndSetDefaultTranslation(ctx, kubeClient, options)
if err != nil {
return fmt.Errorf("find vcluster mode: %w", err)
}

localManager, err := ctrl.NewManager(inClusterConfig, localManagerCtrlOptions(options))
if err != nil {
return err
}
Expand All @@ -193,11 +197,6 @@ func Start(ctx context.Context, options *context2.VirtualClusterOptions, init bo

startManagers(ctx, localManager, virtualClusterManager)

err = findVclusterModeAndSetDefaultTranslation(ctx, localManager, options)
if err != nil {
return err
}

if init {
klog.Info("is init container mode")
defer ctx.Done()
Expand All @@ -216,49 +215,46 @@ func Start(ctx context.Context, options *context2.VirtualClusterOptions, init bo
return nil
}

func getVclusterObject(ctx context.Context, localManager manager.Manager, vclusterName, vclusterNamespace string, object client.Object) error {
err := localManager.GetClient().Get(ctx, types.NamespacedName{
Name: vclusterName,
Namespace: vclusterNamespace,
}, object)
if err != nil {
return err
}

return nil
}

func getSyncerPodSpec(ctx context.Context, localManager manager.Manager, vclusterName, vclusterNamespace string) (*corev1.PodSpec, error) {
func getSyncerPodSpec(ctx context.Context, kubeClient kubernetes.Interface, vclusterName, vclusterNamespace string) (*corev1.PodSpec, error) {
// try looking for the stateful set first
vclusterSts := &appsv1.StatefulSet{}

err := getVclusterObject(ctx, localManager, vclusterName, vclusterNamespace, vclusterSts)
if err != nil {
vclusterSts, err := kubeClient.AppsV1().StatefulSets(vclusterNamespace).Get(ctx, vclusterName, metav1.GetOptions{})
if kerrors.IsNotFound(err) {
// try looking for deployment - in case of eks/k8s
vclusterDeploy, err := kubeClient.AppsV1().Deployments(vclusterNamespace).Get(ctx, vclusterName, metav1.GetOptions{})
if kerrors.IsNotFound(err) {
// try looking for deployment - in case of eks/k8s
vclusterDeploy := &appsv1.Deployment{}
err := getVclusterObject(ctx, localManager, vclusterName, vclusterNamespace, vclusterDeploy)
if err != nil {
if kerrors.IsNotFound(err) {
klog.Errorf("could not find vcluster either in statefulset or deployment: %v", err)
return nil, err
}

klog.Errorf("error looking for vcluster deployment: %v", err)
return nil, err
}

return &vclusterDeploy.Spec.Template.Spec, nil
klog.Errorf("could not find vcluster either in statefulset or deployment: %v", err)
return nil, err
} else if err != nil {
klog.Errorf("error looking for vcluster deployment: %v", err)
return nil, err
}

return &vclusterDeploy.Spec.Template.Spec, nil
} else if err != nil {
return nil, err
}

return &vclusterSts.Spec.Template.Spec, nil
}

func findVclusterModeAndSetDefaultTranslation(ctx context.Context, localManager manager.Manager, options *context2.VirtualClusterOptions) error {
vclusterPodSpec, err := getSyncerPodSpec(ctx, localManager, options.Name, options.TargetNamespace)
func localManagerCtrlOptions(options *context2.VirtualClusterOptions) manager.Options {
controllerOptions := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: "0",
LeaderElection: false,
NewClient: pluginhookclient.NewPhysicalPluginClientFactory(blockingcacheclient.NewCacheClient),
}

if !options.MultiNamespaceMode {
controllerOptions.Cache.Namespaces = []string{options.TargetNamespace}
}

return controllerOptions
}

func findVclusterModeAndSetDefaultTranslation(ctx context.Context, kubeClient kubernetes.Interface, options *context2.VirtualClusterOptions) error {
vclusterPodSpec, err := getSyncerPodSpec(ctx, kubeClient, options.Name, options.TargetNamespace)
if err != nil {
return err
}
Expand All @@ -268,6 +264,7 @@ func findVclusterModeAndSetDefaultTranslation(ctx context.Context, localManager
// iterate over command args
for _, arg := range container.Args {
if strings.Contains(arg, MultiNamespaceMode) {
options.MultiNamespaceMode = true
translate.Default = translate.NewMultiNamespaceTranslator(options.TargetNamespace)
return nil
}
Expand Down Expand Up @@ -338,22 +335,12 @@ func mapHostPaths(ctx context.Context, pManager, vManager manager.Manager) {
options := ctx.Value(optionsKey).(*context2.VirtualClusterOptions)

wait.Forever(func() {
podList := &corev1.PodList{}
err := pManager.GetClient().List(ctx, podList, &client.ListOptions{
Namespace: options.TargetNamespace,
FieldSelector: fields.SelectorFromSet(fields.Set{
NodeIndexName: os.Getenv(HostpathMapperSelfNodeNameEnvVar),
}),
})
podMappings, err := getPhysicalPodMap(ctx, options, pManager)
if err != nil {
klog.Errorf("unable to list pods: %v", err)
klog.Errorf("unable to get physical pod mapping: %v", err)
return
}

podMappings := make(PhysicalPodMap)

fillUpPodMapping(ctx, podList, podMappings)

vPodList := &corev1.PodList{}
err = vManager.GetClient().List(ctx, vPodList, &client.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{
Expand Down Expand Up @@ -420,6 +407,75 @@ func mapHostPaths(ctx context.Context, pManager, vManager manager.Manager) {
}, time.Second*5)
}

func getPhysicalPodMap(ctx context.Context, options *context2.VirtualClusterOptions, pManager manager.Manager) (PhysicalPodMap, error) {
podListOptions := &client.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{
NodeIndexName: os.Getenv(HostpathMapperSelfNodeNameEnvVar),
}),
}

if !options.MultiNamespaceMode {
podListOptions.Namespace = options.TargetNamespace
}

podList := &corev1.PodList{}
err := pManager.GetClient().List(ctx, podList, podListOptions)
if err != nil {
return nil, fmt.Errorf("unable to list pods: %w", err)
}

var pods []corev1.Pod
if options.MultiNamespaceMode {
// find namespaces managed by the current vcluster
nsList := &corev1.NamespaceList{}
err = pManager.GetClient().List(ctx, nsList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
namespaces.VclusterNamespaceAnnotation: options.TargetNamespace,
}),
})
if err != nil {
return nil, fmt.Errorf("unable to list namespaces: %w", err)
}

vclusterNamespaces := make(map[string]struct{}, len(nsList.Items))
for _, ns := range nsList.Items {
vclusterNamespaces[ns.Name] = struct{}{}
}

// Limit Pods
pods = make([]corev1.Pod, 0, len(podList.Items))
for _, pod := range podList.Items {
if _, ok := vclusterNamespaces[pod.Namespace]; ok {
pods = append(pods, pod)
}
}
} else {
pods = podList.Items
}

podMappings := make(PhysicalPodMap, len(pods))
for _, pPod := range pods {
lookupName := fmt.Sprintf("%s_%s_%s", pPod.Namespace, pPod.Name, pPod.UID)

ok, err := checkIfPathExists(lookupName)
if err != nil {
klog.Errorf("error checking existence for path %s: %v", lookupName, err)
}

if ok {
// check entry in podMapping
if _, ok := podMappings[pPod.Name]; !ok {
podMappings[pPod.Name] = &PodDetail{
Target: lookupName,
PhysicalPod: pPod,
}
}
}
}

return podMappings, nil
}

func cleanupOldContainerPaths(ctx context.Context, existingVPodsWithNS map[string]bool) error {
options := ctx.Value(optionsKey).(*context2.VirtualClusterOptions)

Expand Down Expand Up @@ -591,27 +647,6 @@ func getPhysicalLogFilename(ctx context.Context, physicalContainerFileName strin
return fileName, nil
}

func fillUpPodMapping(ctx context.Context, pPodList *corev1.PodList, podMappings PhysicalPodMap) {
for _, pPod := range pPodList.Items {
lookupName := fmt.Sprintf("%s_%s_%s", pPod.Namespace, pPod.Name, pPod.UID)

ok, err := checkIfPathExists(lookupName)
if err != nil {
klog.Errorf("error checking existence for path %s: %v", lookupName, err)
}

if ok {
// check entry in podMapping
if _, ok := podMappings[pPod.Name]; !ok {
podMappings[pPod.Name] = &PodDetail{
Target: lookupName,
PhysicalPod: pPod,
}
}
}
}
}

// check if folder exists
func checkIfPathExists(path string) (bool, error) {
fullPath := filepath.Join(PodLogsMountPath, path)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/loft-sh/vcluster-hostpath-mapper

go 1.20
go 1.22.0

require (
github.com/go-openapi/loads v0.21.2
Expand Down
Loading

0 comments on commit 10213db

Please sign in to comment.