Skip to content

Commit

Permalink
Gateway API Controller based on k8s.io/apimachinery's informers pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
guicassolato committed Jul 9, 2024
1 parent 66032b1 commit dba08b8
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 22 deletions.
62 changes: 62 additions & 0 deletions controller/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package controller

import (
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

type RuntimeObject interface {
runtime.Object
metav1.Object
}

type cacheMap map[schema.GroupKind]map[string]RuntimeObject

type cacheStore struct {
mu sync.RWMutex
store cacheMap
}

func newCacheStore() *cacheStore {
return &cacheStore{
store: make(cacheMap),
}
}

func (c *cacheStore) List() cacheMap {
c.mu.RLock()
defer c.mu.RUnlock()

cm := make(cacheMap, len(c.store))
for gk, objs := range c.store {
if _, ok := cm[gk]; !ok {
cm[gk] = map[string]RuntimeObject{}
}
for url, obj := range objs {
cm[gk][url] = obj
}
}
return cm
}

func (c *cacheStore) Add(obj RuntimeObject) {
c.mu.Lock()
defer c.mu.Unlock()

gk := obj.GetObjectKind().GroupVersionKind().GroupKind()
if _, ok := c.store[gk]; !ok {
c.store[gk] = map[string]RuntimeObject{}
}
c.store[gk][string(obj.GetUID())] = obj
}

func (c *cacheStore) Delete(obj RuntimeObject) {
c.mu.Lock()
defer c.mu.Unlock()

gk := obj.GetObjectKind().GroupVersionKind().GroupKind()
delete(c.store[gk], string(obj.GetUID()))
}
137 changes: 137 additions & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package controller

import (
"log"
"sync"
"time"

"github.com/kuadrant/policy-machinery/machinery"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
)

type ControllerOptions struct {
client *dynamic.DynamicClient
informers map[string]InformerBuilder
callback CallbackFunc
policyKinds []schema.GroupKind
}

type ControllerOptionFunc func(*ControllerOptions)
type CallbackFunc func(EventType, RuntimeObject, RuntimeObject, *machinery.Topology)

func WithClient(client *dynamic.DynamicClient) ControllerOptionFunc {
return func(o *ControllerOptions) {
o.client = client
}
}

func WithInformer(name string, informer InformerBuilder) ControllerOptionFunc {
return func(o *ControllerOptions) {
o.informers[name] = informer
}
}

func WithCallback(callback CallbackFunc) ControllerOptionFunc {
return func(o *ControllerOptions) {
o.callback = callback
}
}

func WithPolicyKinds(policyKinds ...schema.GroupKind) ControllerOptionFunc {
return func(o *ControllerOptions) {
o.policyKinds = policyKinds
}
}

func NewController(f ...ControllerOptionFunc) *Controller {
opts := &ControllerOptions{
informers: map[string]InformerBuilder{},
callback: func(EventType, RuntimeObject, RuntimeObject, *machinery.Topology) {},
}

for _, fn := range f {
fn(opts)
}

controller := &Controller{
client: opts.client,
cache: newCacheStore(),
topology: NewGatewayAPITopology(opts.policyKinds...),
informers: map[string]cache.SharedInformer{},
callback: opts.callback,
}

for name, builder := range opts.informers {
controller.informers[name] = builder(controller)
}

return controller
}

type Controller struct {
mu sync.Mutex
client *dynamic.DynamicClient
cache *cacheStore
topology *GatewayAPITopology
informers map[string]cache.SharedInformer
callback CallbackFunc
}

// Starts starts the informers and blocks until a stop signal is received
func (c *Controller) Start() {
stopCh := make(chan struct{}, len(c.informers))

for name := range c.informers {
defer close(stopCh)
log.Printf("Starting %s informer", name)
go c.informers[name].Run(stopCh)
}

// wait for stop signal
for name := range c.informers {
if !cache.WaitForCacheSync(stopCh, c.informers[name].HasSynced) {
log.Fatalf("Error waiting for %s cache sync", name)
}
}

// keep the thread alive
wait.Until(func() {}, time.Second, stopCh)
}

func (c *Controller) add(obj RuntimeObject) {
c.mu.Lock()
defer c.mu.Unlock()

c.cache.Add(obj)
c.topology.Refresh(c.cache.List())
c.propagate(CreateEvent, nil, obj)
}

func (c *Controller) update(oldObj, newObj RuntimeObject) {
c.mu.Lock()
defer c.mu.Unlock()

if oldObj.GetGeneration() == newObj.GetGeneration() {
return
}

c.cache.Add(newObj)
c.topology.Refresh(c.cache.List())
c.propagate(UpdateEvent, oldObj, newObj)
}

func (c *Controller) delete(obj RuntimeObject) {
c.mu.Lock()
defer c.mu.Unlock()

c.cache.Delete(obj)
c.topology.Refresh(c.cache.List())
c.propagate(DeleteEvent, obj, nil)
}

func (c *Controller) propagate(eventType EventType, oldObj, newObj RuntimeObject) {
c.callback(eventType, oldObj, newObj, c.topology.Get())
}
74 changes: 74 additions & 0 deletions controller/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package controller

import (
"context"
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)

type EventType int

const (
CreateEvent EventType = iota
UpdateEvent
DeleteEvent
)

func (t *EventType) String() string {
return [...]string{"create", "update", "delete"}[*t]
}

type InformerBuilder func(controller *Controller) cache.SharedInformer

func For[T RuntimeObject](resource schema.GroupVersionResource, namespace string) InformerBuilder {
return func(controller *Controller) cache.SharedInformer {
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return controller.client.Resource(resource).Namespace(namespace).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return controller.client.Resource(resource).Namespace(namespace).Watch(context.Background(), options)
},
},
&unstructured.Unstructured{},
time.Minute*10,
)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o any) {
obj := o.(T)
controller.add(obj)
},
UpdateFunc: func(o, newO any) {
oldObj := o.(T)
newObj := newO.(T)
controller.update(oldObj, newObj)
},
DeleteFunc: func(o any) {
obj := o.(T)
controller.delete(obj)
},
})
informer.SetTransform(restructure[T])
return informer
}
}

func restructure[T any](obj any) (any, error) {
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("unexpected object type: %T", obj)
}
o := *new(T)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.UnstructuredContent(), &o); err != nil {
return nil, err
}
return o, nil
}
94 changes: 94 additions & 0 deletions controller/topology.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package controller

import (
"sync"

"github.com/samber/lo"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/kuadrant/policy-machinery/machinery"
)

func NewGatewayAPITopology(policyKinds ...schema.GroupKind) *GatewayAPITopology {
return &GatewayAPITopology{
topology: machinery.NewTopology(),
policyKinds: policyKinds,
}
}

type GatewayAPITopology struct {
mu sync.RWMutex
topology *machinery.Topology
policyKinds []schema.GroupKind
}

func (t *GatewayAPITopology) Refresh(objs cacheMap) {
t.mu.Lock()
defer t.mu.Unlock()

gatewayClasses := lo.FilterMap(lo.Values(objs[schema.GroupKind{Group: gwapiv1.GroupVersion.Group, Kind: "GatewayClass"}]), func(obj RuntimeObject, _ int) (*gwapiv1.GatewayClass, bool) {
gc, ok := obj.(*gwapiv1.GatewayClass)
if !ok {
return nil, false
}
return gc, true
})

gateways := lo.FilterMap(lo.Values(objs[schema.GroupKind{Group: gwapiv1.GroupVersion.Group, Kind: "Gateway"}]), func(obj RuntimeObject, _ int) (*gwapiv1.Gateway, bool) {
gw, ok := obj.(*gwapiv1.Gateway)
if !ok {
return nil, false
}
return gw, true
})

httpRoutes := lo.FilterMap(lo.Values(objs[schema.GroupKind{Group: gwapiv1.GroupVersion.Group, Kind: "HTTPRoute"}]), func(obj RuntimeObject, _ int) (*gwapiv1.HTTPRoute, bool) {
httpRoute, ok := obj.(*gwapiv1.HTTPRoute)
if !ok {
return nil, false
}
return httpRoute, true
})

services := lo.FilterMap(lo.Values(objs[schema.GroupKind{Group: core.GroupName, Kind: "Service"}]), func(obj RuntimeObject, _ int) (*core.Service, bool) {
service, ok := obj.(*core.Service)
if !ok {
return nil, false
}
return service, true
})

opts := []machinery.GatewayAPITopologyOptionsFunc{
machinery.WithGatewayClasses(gatewayClasses...),
machinery.WithGateways(gateways...),
machinery.WithHTTPRoutes(httpRoutes...),
machinery.WithServices(services...),
machinery.ExpandGatewayListeners(),
machinery.ExpandHTTPRouteRules(),
machinery.ExpandServicePorts(),
}

for i := range t.policyKinds {
policyKind := t.policyKinds[i]
policies := lo.FilterMap(lo.Values(objs[policyKind]), func(obj RuntimeObject, _ int) (machinery.Policy, bool) {
policy, ok := obj.(machinery.Policy)
return policy, ok
})

opts = append(opts, machinery.WithGatewayAPITopologyPolicies(policies...))
}

t.topology = machinery.NewGatewayAPITopology(opts...)
}

func (t *GatewayAPITopology) Get() *machinery.Topology {
t.mu.RLock()
defer t.mu.RUnlock()
if t.topology == nil {
return nil
}
topology := *t.topology
return &topology
}
Loading

0 comments on commit dba08b8

Please sign in to comment.