Skip to content

Commit

Permalink
refactor: events controller
Browse files Browse the repository at this point in the history
  • Loading branch information
FabianKramm committed Feb 1, 2024
1 parent 793604a commit 7e6dca1
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 108 deletions.
219 changes: 121 additions & 98 deletions pkg/controllers/resources/events/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package events

import (
"context"
"fmt"
"strings"

"github.com/loft-sh/vcluster/pkg/util/translate"
"k8s.io/apimachinery/pkg/api/equality"

"github.com/loft-sh/vcluster/pkg/constants"
synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context"
"github.com/loft-sh/vcluster/pkg/controllers/syncer/translator"
syncer "github.com/loft-sh/vcluster/pkg/types"

"github.com/loft-sh/vcluster/pkg/constants"
"github.com/loft-sh/vcluster/pkg/util/clienthelper"
"github.com/loft-sh/vcluster/pkg/util/translate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -33,11 +34,13 @@ var AcceptedKinds = map[schema.GroupVersionKind]bool{
func New(ctx *synccontext.RegisterContext) (syncer.Object, error) {
return &eventSyncer{
virtualClient: ctx.VirtualManager.GetClient(),
hostClient: ctx.PhysicalManager.GetClient(),
}, nil
}

type eventSyncer struct {
virtualClient client.Client
hostClient client.Client
}

func (s *eventSyncer) Resource() client.Object {
Expand All @@ -48,93 +51,119 @@ func (s *eventSyncer) Name() string {
return "event"
}

func (s *eventSyncer) IsManaged(context.Context, client.Object) (bool, error) {
return true, nil
func (s *eventSyncer) IsManaged(ctx context.Context, pObj client.Object) (bool, error) {
return s.HostToVirtual(ctx, types.NamespacedName{Namespace: pObj.GetNamespace(), Name: pObj.GetName()}, pObj).Name != "", nil
}

func (s *eventSyncer) VirtualToHost(context.Context, types.NamespacedName, client.Object) types.NamespacedName {
// we ignore virtual events here, we only react on host events and sync them to the virtual cluster
return types.NamespacedName{}
}

func (s *eventSyncer) HostToVirtual(_ context.Context, req types.NamespacedName, _ client.Object) types.NamespacedName {
return req
func (s *eventSyncer) HostToVirtual(ctx context.Context, req types.NamespacedName, pObj client.Object) types.NamespacedName {
involvedObject, err := s.getInvolvedObject(ctx, pObj)
if err != nil {
klog.Infof("Error retrieving involved object for %s/%s: %v", req.Namespace, req.Name, err)
return types.NamespacedName{}
} else if involvedObject == nil {
return types.NamespacedName{}
}

return types.NamespacedName{
Namespace: involvedObject.GetNamespace(),
Name: req.Name,
}
}

var _ syncer.Starter = &eventSyncer{}
var _ syncer.Syncer = &eventSyncer{}

func (s *eventSyncer) ReconcileStart(ctx *synccontext.SyncContext, req ctrl.Request) (bool, error) {
return true, s.reconcile(ctx, req) // true will tell the syncer to return after this reconcile
func (s *eventSyncer) SyncToHost(_ *synccontext.SyncContext, _ client.Object) (ctrl.Result, error) {
// this should never happen since we ignore virtual events and don't handle objects we can't find
panic("unimplemented")
}

func (s *eventSyncer) reconcile(ctx *synccontext.SyncContext, req ctrl.Request) error {
pObj := s.Resource()
err := ctx.PhysicalClient.Get(ctx.Context, req.NamespacedName, pObj)
if err != nil {
if !kerrors.IsNotFound(err) && !strings.Contains(err.Error(), "because of unknown namespace for the cache") {
return err
}
func (s *eventSyncer) Sync(ctx *synccontext.SyncContext, pObj client.Object, vObj client.Object) (ctrl.Result, error) {
// convert current events
pEvent := pObj.(*corev1.Event)
vEvent := vObj.(*corev1.Event)

return nil
// update event
vOldEvent := vEvent.DeepCopy()
vEvent, err := s.buildVirtualEvent(ctx.Context, pEvent)
if err != nil {
return ctrl.Result{}, err
} else if vEvent == nil {
return ctrl.Result{}, nil
}

pEvent, ok := pObj.(*corev1.Event)
if !ok {
return nil
}
// reset metadata
vEvent.TypeMeta = vOldEvent.TypeMeta
vEvent.ObjectMeta = vOldEvent.ObjectMeta

// check if the involved object is accepted
gvk := pEvent.InvolvedObject.GroupVersionKind()
if !AcceptedKinds[gvk] {
return nil
// update existing event only if changed
if equality.Semantic.DeepEqual(vEvent, vOldEvent) {
return ctrl.Result{}, nil
}

vInvolvedObj, err := ctx.VirtualClient.Scheme().New(gvk)
// check if updated
ctx.Log.Infof("update virtual event %s/%s", vEvent.Namespace, vEvent.Name)
translator.PrintChanges(vOldEvent, vEvent, ctx.Log)
err = ctx.VirtualClient.Update(ctx.Context, vEvent)
if err != nil {
return err
return ctrl.Result{}, err
}

index := ""
switch pEvent.InvolvedObject.Kind {
case "Pod":
index = constants.IndexByPhysicalName
case "Service":
index = constants.IndexByPhysicalName
case "Endpoint":
index = constants.IndexByPhysicalName
case "Secret":
index = constants.IndexByPhysicalName
case "ConfigMap":
index = constants.IndexByPhysicalName
default:
return nil
return ctrl.Result{}, nil
}

var _ syncer.ToVirtualSyncer = &eventSyncer{}

func (s *eventSyncer) SyncToVirtual(ctx *synccontext.SyncContext, pObj client.Object) (ctrl.Result, error) {
// build the virtual event
vObj, err := s.buildVirtualEvent(ctx.Context, pObj.(*corev1.Event))
if err != nil {
return ctrl.Result{}, err
} else if vObj == nil {
return ctrl.Result{}, nil
}

// get involved object
err = clienthelper.GetByIndex(ctx.Context, ctx.VirtualClient, vInvolvedObj, index, pEvent.Namespace+"/"+pEvent.InvolvedObject.Name)
// make sure namespace is not being deleted
namespace := &corev1.Namespace{}
err = ctx.VirtualClient.Get(ctx.Context, client.ObjectKey{Name: vObj.Namespace}, namespace)
if err != nil {
if kerrors.IsNotFound(err) {
return nil
return ctrl.Result{}, nil
}

return err
return ctrl.Result{}, err
} else if namespace.DeletionTimestamp != nil {
// cannot create events in terminating namespaces
return ctrl.Result{}, nil
}

// we found the related object
m, err := meta.Accessor(vInvolvedObj)
ctx.Log.Infof("create virtual event %s/%s", vObj.Namespace, vObj.Name)
return ctrl.Result{}, ctx.VirtualClient.Create(ctx.Context, vObj)
}

func (s *eventSyncer) buildVirtualEvent(ctx context.Context, pEvent *corev1.Event) (*corev1.Event, error) {
// retrieve involved object
involvedObject, err := s.getInvolvedObject(ctx, pEvent)
if err != nil {
return err
return nil, err
} else if involvedObject == nil {
return nil, nil
}

// copy physical object
vObj := pEvent.DeepCopy()
translate.ResetObjectMetadata(vObj)

// set the correct involved object meta
vObj.Namespace = m.GetNamespace()
vObj.InvolvedObject.Namespace = m.GetNamespace()
vObj.InvolvedObject.Name = m.GetName()
vObj.InvolvedObject.UID = m.GetUID()
vObj.InvolvedObject.ResourceVersion = m.GetResourceVersion()
vObj.Namespace = involvedObject.GetNamespace()
vObj.InvolvedObject.Namespace = involvedObject.GetNamespace()
vObj.InvolvedObject.Name = involvedObject.GetName()
vObj.InvolvedObject.UID = involvedObject.GetUID()
vObj.InvolvedObject.ResourceVersion = involvedObject.GetResourceVersion()

// replace name of object
if strings.HasPrefix(vObj.Name, pEvent.InvolvedObject.Name) {
Expand All @@ -144,59 +173,53 @@ func (s *eventSyncer) reconcile(ctx *synccontext.SyncContext, req ctrl.Request)
// we replace namespace/name & name in messages so that it seems correct
vObj.Message = strings.ReplaceAll(vObj.Message, pEvent.InvolvedObject.Namespace+"/"+pEvent.InvolvedObject.Name, vObj.InvolvedObject.Namespace+"/"+vObj.InvolvedObject.Name)
vObj.Message = strings.ReplaceAll(vObj.Message, pEvent.InvolvedObject.Name, vObj.InvolvedObject.Name)
return vObj, nil
}

// make sure namespace is not being deleted
namespace := &corev1.Namespace{}
err = ctx.VirtualClient.Get(ctx.Context, client.ObjectKey{Name: vObj.Namespace}, namespace)
func (s *eventSyncer) getInvolvedObject(ctx context.Context, pObj client.Object) (metav1.Object, error) {
if pObj == nil {
return nil, nil
}

pEvent, ok := pObj.(*corev1.Event)
if !ok {
return nil, fmt.Errorf("object is not of type event")
}

// check if the involved object is accepted
gvk := pEvent.InvolvedObject.GroupVersionKind()
if !AcceptedKinds[gvk] {
return nil, nil
}

vInvolvedObj, err := s.virtualClient.Scheme().New(gvk)
if err != nil {
if kerrors.IsNotFound(err) {
return nil
}
return nil, err
}

return err
} else if namespace.DeletionTimestamp != nil {
// cannot create events in terminating namespaces
return nil
index := ""
switch pEvent.InvolvedObject.Kind {
case "Pod", "Service", "Endpoint", "Secret", "ConfigMap":
index = constants.IndexByPhysicalName
default:
return nil, nil
}

// check if there is such an event already
vOldObj := &corev1.Event{}
err = ctx.VirtualClient.Get(ctx.Context, types.NamespacedName{
Namespace: vObj.Namespace,
Name: vObj.Name,
}, vOldObj)
// get involved object
err = clienthelper.GetByIndex(ctx, s.virtualClient, vInvolvedObj, index, pEvent.Namespace+"/"+pEvent.InvolvedObject.Name)
if err != nil {
if !kerrors.IsNotFound(err) {
return err
return nil, err
}

ctx.Log.Infof("create virtual event %s/%s", vObj.Namespace, vObj.Name)
return ctx.VirtualClient.Create(ctx.Context, vObj)
return nil, nil
}

// copy metadata
vObj.ObjectMeta = *vOldObj.ObjectMeta.DeepCopy()

// update existing event only if changed
if equality.Semantic.DeepEqual(vObj, vOldObj) {
return nil
// we found the related object
m, err := meta.Accessor(vInvolvedObj)
if err != nil {
return nil, err
}

ctx.Log.Infof("update virtual event %s/%s", vObj.Namespace, vObj.Name)
translator.PrintChanges(vOldObj, vObj, ctx.Log)
return ctx.VirtualClient.Update(ctx.Context, vObj)
}

var _ syncer.Syncer = &eventSyncer{}

func (s *eventSyncer) SyncToHost(*synccontext.SyncContext, client.Object) (ctrl.Result, error) {
// Noop, we do nothing here
return ctrl.Result{}, nil
return m, nil
}

func (s *eventSyncer) Sync(*synccontext.SyncContext, client.Object, client.Object) (ctrl.Result, error) {
// Noop, we do nothing here
return ctrl.Result{}, nil
}

func (s *eventSyncer) ReconcileEnd() {}
12 changes: 2 additions & 10 deletions pkg/controllers/resources/events/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -103,10 +101,7 @@ func TestSync(t *testing.T) {
},
Sync: func(registerContext *synccontext.RegisterContext) {
syncContext, syncer := newFakeSyncer(t, registerContext)
_, err := syncer.ReconcileStart(syncContext, ctrl.Request{NamespacedName: types.NamespacedName{
Namespace: pEvent.Namespace,
Name: pEvent.Name,
}})
_, err := syncer.SyncToVirtual(syncContext, pEvent)
assert.NilError(t, err)
},
},
Expand All @@ -128,10 +123,7 @@ func TestSync(t *testing.T) {
},
Sync: func(registerContext *synccontext.RegisterContext) {
syncContext, syncer := newFakeSyncer(t, registerContext)
_, err := syncer.ReconcileStart(syncContext, ctrl.Request{NamespacedName: types.NamespacedName{
Namespace: pEvent.Namespace,
Name: pEvent.Name,
}})
_, err := syncer.Sync(syncContext, pEventUpdated, vEvent)
assert.NilError(t, err)
},
},
Expand Down

0 comments on commit 7e6dca1

Please sign in to comment.