Skip to content

Commit

Permalink
Merge pull request Kuadrant#17 from Kuadrant/controller-runtime
Browse files Browse the repository at this point in the history
[controller] State-of-the-world reconciliation
  • Loading branch information
guicassolato authored Aug 16, 2024
2 parents 01dce39 + 3bf7d93 commit 5fffd8e
Show file tree
Hide file tree
Showing 32 changed files with 1,754 additions and 466 deletions.
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ Example:

```go
import (
"context"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -175,17 +177,17 @@ func main() {
// create a controller with a built-in gateway api topology
controller := controller.NewController(
controller.WithClient(client),
controller.WithInformer("gateway", controller.For[*gwapiv1.Gateway](gwapiv1.SchemeGroupVersion.WithResource("gateways"), metav1.NamespaceAll)),
controller.WithInformer("httproute", controller.For[*gwapiv1.HTTPRoute](gwapiv1.SchemeGroupVersion.WithResource("httproutes"), metav1.NamespaceAll)),
controller.WithInformer("mypolicy", controller.For[*mypolicy.MyPolicy](mypolicy.SchemeGroupVersion.WithResource("mypolicies"), metav1.NamespaceAll)),
controller.WithRunnable("gateway", controller.Watch(*gwapiv1.Gateway{}, gwapiv1.SchemeGroupVersion.WithResource("gateways"), metav1.NamespaceAll)),
controller.WithRunnable("httproute", controller.Watch(*gwapiv1.HTTPRoute{}, gwapiv1.SchemeGroupVersion.WithResource("httproutes"), metav1.NamespaceAll)),
controller.WithRunnable("mypolicy", controller.Watch(*mypolicy.MyPolicy{}, mypolicy.SchemeGroupVersion.WithResource("mypolicies"), metav1.NamespaceAll)),
controller.WithPolicyKinds(schema.GroupKind{Group: mypolicy.SchemeGroupVersion.Group, Kind: "MyPolicy"}),
controller.WithCallback(reconcile),
controller.WithReconcile(reconcile),
)

controller.Start()
controller.Start(context.Background())
}

func reconcile(eventType controller.EventType, oldObj, newObj controller.RuntimeObject, topology *machinery.Topology) {
func reconcile(ctx context.Context, events []ResourceEvent, topology *machinery.Topology) {
// TODO
}
```
Expand Down
125 changes: 91 additions & 34 deletions controller/cache.go
Original file line number Diff line number Diff line change
@@ -1,62 +1,119 @@
package controller

import (
"reflect"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"github.com/samber/lo"
"github.com/telepresenceio/watchable"
"k8s.io/apimachinery/pkg/runtime/schema"
)

type RuntimeObject interface {
runtime.Object
metav1.Object
type Store map[string]Object

func (s Store) Filter(predicates ...func(Object) bool) []Object {
var objects []Object
for _, object := range s {
if lo.EveryBy(predicates, func(p func(Object) bool) bool { return p(object) }) {
objects = append(objects, object)
}
}
return objects
}

func (s Store) FilterByGroupKind(gk schema.GroupKind) []Object {
return s.Filter(func(o Object) bool {
return o.GetObjectKind().GroupVersionKind().GroupKind() == gk
})
}

type Store map[schema.GroupKind]map[string]RuntimeObject
type Cache interface {
List() Store
Add(obj Object)
Delete(obj Object)
Replace(Store)
}

type cacheStore struct {
mu sync.RWMutex
sync.RWMutex
store Store
}

func newCacheStore() *cacheStore {
return &cacheStore{
store: make(Store),
func (c *cacheStore) List() Store {
c.RLock()
defer c.RUnlock()

ret := make(Store, len(c.store))
for k, v := range c.store {
ret[k] = v.DeepCopyObject().(Object)
}
return ret
}

func (c *cacheStore) List() Store {
c.mu.RLock()
defer c.mu.RUnlock()
func (c *cacheStore) Add(obj Object) {
c.Lock()
defer c.Unlock()

cm := make(Store, len(c.store))
for gk, objs := range c.store {
if _, ok := cm[gk]; !ok {
cm[gk] = map[string]RuntimeObject{}
}
for url, obj := range objs {
cm[gk][url] = obj
}
c.store[string(obj.GetUID())] = obj
}

func (c *cacheStore) Delete(obj Object) {
c.Lock()
defer c.Unlock()

delete(c.store, string(obj.GetUID()))
}

func (c *cacheStore) Replace(store Store) {
c.Lock()
defer c.Unlock()

c.store = make(Store, len(store))
for k, v := range store {
c.store[k] = v.DeepCopyObject().(Object)
}
return cm
}

func (c *cacheStore) Add(obj RuntimeObject) {
c.mu.Lock()
defer c.mu.Unlock()
type watchableCacheStore struct {
watchable.Map[string, watchableCacheEntry]
}

gk := obj.GetObjectKind().GroupVersionKind().GroupKind()
if _, ok := c.store[gk]; !ok {
c.store[gk] = map[string]RuntimeObject{}
func (c *watchableCacheStore) List() Store {
entries := c.LoadAll()
store := make(Store, len(entries))
for uid, obj := range entries {
store[uid] = obj.Object
}
c.store[gk][string(obj.GetUID())] = obj
return store
}

func (c *watchableCacheStore) Add(obj Object) {
c.Store(string(obj.GetUID()), watchableCacheEntry{obj})
}

func (c *watchableCacheStore) Delete(obj Object) {
c.Map.Delete(string(obj.GetUID()))
}

func (c *cacheStore) Delete(obj RuntimeObject) {
c.mu.Lock()
defer c.mu.Unlock()
func (c *watchableCacheStore) Replace(store Store) {
for uid, obj := range store {
c.Store(uid, watchableCacheEntry{obj})
}
for uid := range c.LoadAll() {
if _, ok := store[uid]; !ok {
c.Map.Delete(uid)
}
}
}

type watchableCacheEntry struct {
Object
}

func (e watchableCacheEntry) DeepCopy() watchableCacheEntry {
return watchableCacheEntry{e.DeepCopyObject().(Object)}
}

gk := obj.GetObjectKind().GroupVersionKind().GroupKind()
delete(c.store[gk], string(obj.GetUID()))
func (e watchableCacheEntry) Equal(other watchableCacheEntry) bool {
return reflect.DeepEqual(e, other)
}
Loading

0 comments on commit 5fffd8e

Please sign in to comment.