Skip to content

Commit

Permalink
Merge pull request #29 from Kuadrant/reconciliation-state
Browse files Browse the repository at this point in the history
controller: reconciliation state carryover
  • Loading branch information
guicassolato authored Sep 12, 2024
2 parents e9eb6e5 + 0e50bb8 commit 31fa8bc
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 63 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ Example:
```go
import (
"context"
"sync"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -187,7 +188,7 @@ func main() {
controller.Start(context.Background())
}

func reconcile(ctx context.Context, events []ResourceEvent, topology *machinery.Topology) {
func reconcile(ctx context.Context, events []ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
// TODO
}
```
Expand Down
10 changes: 7 additions & 3 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func WithRunnable(name string, builder RunnableBuilder) ControllerOption {
}
}

type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology, error)
// ReconcileFunc is a function that reconciles a particular state of the world.
// It receives a list of recent events, an immutable copy of the topology as known by the caller after the events,
// an optional error detected before the reconciliation, and a thread-safe map to store transient state across
// chained calls to multiple ReconcileFuncs.
type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map)

func WithReconcile(reconcile ReconcileFunc) ControllerOption {
return func(o *ControllerOptions) {
Expand Down Expand Up @@ -106,7 +110,7 @@ func NewController(f ...ControllerOption) *Controller {
name: "controller",
logger: logr.Discard(),
runnables: map[string]RunnableBuilder{},
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error) {
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) {
},
}
for _, fn := range f {
Expand Down Expand Up @@ -250,7 +254,7 @@ func (c *Controller) propagate(resourceEvents []ResourceEvent) {
if err != nil {
c.logger.Error(err, "error building topology")
}
c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology, err)
c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology, err, &sync.Map{})
}

func (c *Controller) subscribe() {
Expand Down
3 changes: 2 additions & 1 deletion controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package controller

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -25,7 +26,7 @@ func TestControllerOptions(t *testing.T) {
name: "controller",
logger: logr.Discard(),
runnables: map[string]RunnableBuilder{},
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error) {
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) {
},
}

Expand Down
5 changes: 3 additions & 2 deletions controller/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"sync"

"github.com/samber/lo"

Expand All @@ -16,7 +17,7 @@ type Subscription struct {
Events []ResourceEventMatcher
}

func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error) {
func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
matchingEvents := lo.Filter(resourceEvents, func(resourceEvent ResourceEvent, _ int) bool {
return lo.ContainsBy(s.Events, func(m ResourceEventMatcher) bool {
obj := resourceEvent.OldObject
Expand All @@ -30,6 +31,6 @@ func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEv
})
})
if len(matchingEvents) > 0 && s.ReconcileFunc != nil {
s.ReconcileFunc(ctx, matchingEvents, topology, err)
s.ReconcileFunc(ctx, matchingEvents, topology, err, state)
}
}
3 changes: 2 additions & 1 deletion controller/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package controller

import (
"context"
"sync"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -51,7 +52,7 @@ func init() {
Func: func(_ machinery.Object) []machinery.Object { return []machinery.Object{&RuntimeObject{myObjects[0]}} },
}
}
testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology, err error) {
testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) {
for _, event := range events {
testLogger.Info("reconcile",
"kind", event.Kind,
Expand Down
8 changes: 4 additions & 4 deletions controller/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type Workflow struct {
Postcondition ReconcileFunc
}

func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error) {
func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
// run precondition reconcile function
if d.Precondition != nil {
d.Precondition(ctx, resourceEvents, topology, err)
d.Precondition(ctx, resourceEvents, topology, err, state)
}

// dispatch the event to concurrent tasks
Expand All @@ -28,13 +28,13 @@ func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topo
for _, f := range funcs {
go func() {
defer waitGroup.Done()
f(ctx, resourceEvents, topology, err)
f(ctx, resourceEvents, topology, err, state)
}()
}
waitGroup.Wait()

// run precondition reconcile function
if d.Postcondition != nil {
d.Postcondition(ctx, resourceEvents, topology, err)
d.Postcondition(ctx, resourceEvents, topology, err, state)
}
}
17 changes: 10 additions & 7 deletions examples/kuadrant/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"os"
"strings"
"sync"

egv1alpha1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -201,7 +202,9 @@ func controllerOptionsFor(gatewayProviders []string) []controller.ControllerOpti
// 3. (gateway deleted) delete SecurityPolicy / (other events) reconcile SecurityPolicies
// 3. (gateway deleted) delete AuthorizationPolicy / (other events) reconcile AuthorizationPolicies
func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) controller.ReconcileFunc {
effectivePolicyReconciler := &reconcilers.EffectivePoliciesReconciler{Client: client}
effectivePolicyReconciler := &controller.Workflow{
Precondition: reconcilers.ReconcileEffectivePolicies,
}

commonAuthPolicyResourceEventMatchers := []controller.ResourceEventMatcher{
{Kind: ptr.To(machinery.GatewayClassGroupKind)},
Expand All @@ -215,23 +218,23 @@ func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) c
switch gatewayProvider {
case reconcilers.EnvoyGatewayProviderName:
envoyGatewayProvider := &reconcilers.EnvoyGatewayProvider{Client: client}
effectivePolicyReconciler.ReconcileFuncs = append(effectivePolicyReconciler.ReconcileFuncs, (&controller.Subscription{
effectivePolicyReconciler.Tasks = append(effectivePolicyReconciler.Tasks, (&controller.Subscription{
ReconcileFunc: envoyGatewayProvider.ReconcileSecurityPolicies,
Events: append(commonAuthPolicyResourceEventMatchers, controller.ResourceEventMatcher{Kind: ptr.To(reconcilers.EnvoyGatewaySecurityPolicyKind)}),
}).Reconcile)
effectivePolicyReconciler.ReconcileFuncs = append(effectivePolicyReconciler.ReconcileFuncs, (&controller.Subscription{
effectivePolicyReconciler.Tasks = append(effectivePolicyReconciler.Tasks, (&controller.Subscription{
ReconcileFunc: envoyGatewayProvider.DeleteSecurityPolicy,
Events: []controller.ResourceEventMatcher{
{Kind: ptr.To(machinery.GatewayGroupKind), EventType: ptr.To(controller.DeleteEvent)},
},
}).Reconcile)
case reconcilers.IstioGatewayProviderName:
istioGatewayProvider := &reconcilers.IstioGatewayProvider{Client: client}
effectivePolicyReconciler.ReconcileFuncs = append(effectivePolicyReconciler.ReconcileFuncs, (&controller.Subscription{
effectivePolicyReconciler.Tasks = append(effectivePolicyReconciler.Tasks, (&controller.Subscription{
ReconcileFunc: istioGatewayProvider.ReconcileAuthorizationPolicies,
Events: append(commonAuthPolicyResourceEventMatchers, controller.ResourceEventMatcher{Kind: ptr.To(reconcilers.IstioAuthorizationPolicyKind)}),
}).Reconcile)
effectivePolicyReconciler.ReconcileFuncs = append(effectivePolicyReconciler.ReconcileFuncs, (&controller.Subscription{
effectivePolicyReconciler.Tasks = append(effectivePolicyReconciler.Tasks, (&controller.Subscription{
ReconcileFunc: istioGatewayProvider.DeleteAuthorizationPolicy,
Events: []controller.ResourceEventMatcher{
{Kind: ptr.To(machinery.GatewayGroupKind), EventType: ptr.To(controller.DeleteEvent)},
Expand All @@ -241,7 +244,7 @@ func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) c
}

reconciler := &controller.Workflow{
Precondition: func(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
Precondition: func(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) {
logger := controller.LoggerFromContext(ctx).WithName("event logger")
for _, event := range resourceEvents {
// log the event
Expand All @@ -263,7 +266,7 @@ func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) c
},
Tasks: []controller.ReconcileFunc{
(&reconcilers.TopologyFileReconciler{}).Reconcile,
effectivePolicyReconciler.Reconcile,
effectivePolicyReconciler.Run,
},
}

Expand Down
42 changes: 5 additions & 37 deletions examples/kuadrant/reconcilers/effective_policies_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"

"github.com/samber/lo"
"k8s.io/client-go/dynamic"

"github.com/kuadrant/policy-machinery/controller"
"github.com/kuadrant/policy-machinery/machinery"
Expand All @@ -20,15 +19,7 @@ import (

const authPathsKey = "authPaths"

// EffectivePoliciesReconciler works exactly like a controller.Workflow where the precondition reconcile function
// reconciles the effective policies for the given topology paths, occasionally modifying the context that is passed
// as argument to the subsequent concurrent reconcilers.
type EffectivePoliciesReconciler struct {
Client *dynamic.DynamicClient
ReconcileFuncs []controller.ReconcileFunc
}

func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
func ReconcileEffectivePolicies(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
targetables := topology.Targetables()

// reconcile policies
Expand All @@ -47,6 +38,8 @@ func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEve
return ok
})

var authPaths [][]machinery.Targetable

for _, gateway := range gateways {
// reconcile Gateway -> Listener policies
for _, listener := range listeners {
Expand All @@ -66,7 +59,7 @@ func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEve
paths := targetables.Paths(gateway, httpRouteRule)
for i := range paths {
if p := effectivePolicyForPath[*kuadrantv1beta3.AuthPolicy](ctx, paths[i]); p != nil {
ctx = pathIntoContext(ctx, authPathsKey, paths[i])
authPaths = append(authPaths, paths[i])
// TODO: reconcile auth effective policy (i.e. create the Authorino AuthConfig)
}
if p := effectivePolicyForPath[*kuadrantv1beta3.RateLimitPolicy](ctx, paths[i]); p != nil {
Expand All @@ -76,17 +69,7 @@ func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEve
}
}

// dispatch the event to subsequent reconcilers
funcs := r.ReconcileFuncs
waitGroup := &sync.WaitGroup{}
defer waitGroup.Wait()
waitGroup.Add(len(funcs))
for _, f := range funcs {
go func() {
defer waitGroup.Done()
f(ctx, resourceEvents, topology, err)
}()
}
state.Store(authPathsKey, authPaths)
}

func effectivePolicyForPath[T machinery.Policy](ctx context.Context, path []machinery.Targetable) *T {
Expand Down Expand Up @@ -121,18 +104,3 @@ func effectivePolicyForPath[T machinery.Policy](ctx context.Context, path []mach
concreteEffectivePolicy, _ := effectivePolicy.(T)
return &concreteEffectivePolicy
}

func pathIntoContext(ctx context.Context, key string, path []machinery.Targetable) context.Context {
if p := ctx.Value(key); p != nil {
return context.WithValue(ctx, key, append(p.([][]machinery.Targetable), path))
}
return context.WithValue(ctx, key, [][]machinery.Targetable{path})
}

func pathsFromContext(ctx context.Context, key string) [][]machinery.Targetable {
var paths [][]machinery.Targetable
if p := ctx.Value(key); p != nil {
paths = p.([][]machinery.Targetable)
}
return paths
}
10 changes: 7 additions & 3 deletions examples/kuadrant/reconcilers/envoy_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reconcilers
import (
"context"
"fmt"
"sync"

egv1alpha1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/samber/lo"
Expand All @@ -28,11 +29,14 @@ type EnvoyGatewayProvider struct {
Client *dynamic.DynamicClient
}

func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) {
func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
logger := controller.LoggerFromContext(ctx).WithName("envoy gateway").WithName("securitypolicy")
ctx = controller.LoggerIntoContext(ctx, logger)

authPaths := pathsFromContext(ctx, authPathsKey)
var authPaths [][]machinery.Targetable
if untypedAuthPaths, ok := state.Load(authPathsKey); ok {
authPaths = untypedAuthPaths.([][]machinery.Targetable)
}
targetables := topology.Targetables()
gateways := targetables.Items(func(o machinery.Object) bool {
_, ok := o.(*machinery.Gateway)
Expand All @@ -57,7 +61,7 @@ func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _
}
}

func (p *EnvoyGatewayProvider) DeleteSecurityPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
func (p *EnvoyGatewayProvider) DeleteSecurityPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) {
for _, resourceEvent := range resourceEvents {
gateway := resourceEvent.OldObject
p.deleteSecurityPolicy(ctx, topology, gateway.GetNamespace(), gateway.GetName(), nil)
Expand Down
10 changes: 7 additions & 3 deletions examples/kuadrant/reconcilers/istio.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"

"github.com/samber/lo"
istioapiv1 "istio.io/api/security/v1"
Expand Down Expand Up @@ -31,11 +32,14 @@ type IstioGatewayProvider struct {
Client *dynamic.DynamicClient
}

func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) {
func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
logger := controller.LoggerFromContext(ctx).WithName("istio").WithName("authorizationpolicy")
ctx = controller.LoggerIntoContext(ctx, logger)

authPaths := pathsFromContext(ctx, authPathsKey)
var authPaths [][]machinery.Targetable
if untypedAuthPaths, ok := state.Load(authPathsKey); ok {
authPaths = untypedAuthPaths.([][]machinery.Targetable)
}
targetables := topology.Targetables()
gateways := targetables.Items(func(o machinery.Object) bool {
_, ok := o.(*machinery.Gateway)
Expand All @@ -60,7 +64,7 @@ func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Contex
}
}

func (p *IstioGatewayProvider) DeleteAuthorizationPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
func (p *IstioGatewayProvider) DeleteAuthorizationPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) {
for _, resourceEvent := range resourceEvents {
gateway := resourceEvent.OldObject
p.deleteAuthorizationPolicy(ctx, topology, gateway.GetNamespace(), gateway.GetName(), nil)
Expand Down
3 changes: 2 additions & 1 deletion examples/kuadrant/reconcilers/topology_file_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reconcilers
import (
"context"
"os"
"sync"

"github.com/kuadrant/policy-machinery/controller"
"github.com/kuadrant/policy-machinery/machinery"
Expand All @@ -12,7 +13,7 @@ const topologyFile = "topology.dot"

type TopologyFileReconciler struct{}

func (r *TopologyFileReconciler) Reconcile(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) {
func (r *TopologyFileReconciler) Reconcile(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) {
logger := controller.LoggerFromContext(ctx).WithName("topology file")

file, err := os.Create(topologyFile)
Expand Down

0 comments on commit 31fa8bc

Please sign in to comment.