From 633ae1294fd9c6cfb9b510bd47a1fb83b39148bf Mon Sep 17 00:00:00 2001 From: Guilherme Cassolato Date: Mon, 9 Sep 2024 17:43:28 +0200 Subject: [PATCH] controller: auxiliary sync.Map struct for carrying reconciliation state over throughout chained reconcile function calls Signed-off-by: Guilherme Cassolato --- README.md | 3 +- controller/controller.go | 6 +-- controller/controller_test.go | 3 +- controller/subscriber.go | 5 ++- controller/test_helper.go | 3 +- controller/workflow.go | 8 ++-- examples/kuadrant/main.go | 17 ++++---- .../effective_policies_reconciler.go | 42 +++---------------- .../kuadrant/reconcilers/envoy_gateway.go | 10 +++-- examples/kuadrant/reconcilers/istio.go | 10 +++-- .../reconcilers/topology_file_reconciler.go | 3 +- 11 files changed, 47 insertions(+), 63 deletions(-) diff --git a/README.md b/README.md index f233a0d..43e910f 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,7 @@ Example: ```go import ( "context" + "sync" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -187,7 +188,7 @@ func main() { controller.Start(context.Background()) } -func reconcile(ctx context.Context, events []ResourceEvent, topology *machinery.Topology) { +func reconcile(ctx context.Context, events []ResourceEvent, topology *machinery.Topology, state *sync.Map, err error) { // TODO } ``` diff --git a/controller/controller.go b/controller/controller.go index 2c90bfc..86757dd 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -61,7 +61,7 @@ func WithRunnable(name string, builder RunnableBuilder) ControllerOption { } } -type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology, error) +type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology, *sync.Map, error) func WithReconcile(reconcile ReconcileFunc) ControllerOption { return func(o *ControllerOptions) { @@ -106,7 +106,7 @@ func NewController(f ...ControllerOption) *Controller { name: "controller", logger: logr.Discard(), runnables: map[string]RunnableBuilder{}, - reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error) { + reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, *sync.Map, error) { }, } for _, fn := range f { @@ -250,7 +250,7 @@ func (c *Controller) propagate(resourceEvents []ResourceEvent) { if err != nil { c.logger.Error(err, "error building topology") } - c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology, err) + c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology, &sync.Map{}, err) } func (c *Controller) subscribe() { diff --git a/controller/controller_test.go b/controller/controller_test.go index 1fffbc6..1780565 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -4,6 +4,7 @@ package controller import ( "context" + "sync" "testing" "time" @@ -25,7 +26,7 @@ func TestControllerOptions(t *testing.T) { name: "controller", logger: logr.Discard(), runnables: map[string]RunnableBuilder{}, - reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error) { + reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, *sync.Map, error) { }, } diff --git a/controller/subscriber.go b/controller/subscriber.go index aaa7bfc..b5ec76d 100644 --- a/controller/subscriber.go +++ b/controller/subscriber.go @@ -2,6 +2,7 @@ package controller import ( "context" + "sync" "github.com/samber/lo" @@ -16,7 +17,7 @@ type Subscription struct { Events []ResourceEventMatcher } -func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error) { +func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, state *sync.Map, err error) { matchingEvents := lo.Filter(resourceEvents, func(resourceEvent ResourceEvent, _ int) bool { return lo.ContainsBy(s.Events, func(m ResourceEventMatcher) bool { obj := resourceEvent.OldObject @@ -30,6 +31,6 @@ func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEv }) }) if len(matchingEvents) > 0 && s.ReconcileFunc != nil { - s.ReconcileFunc(ctx, matchingEvents, topology, err) + s.ReconcileFunc(ctx, matchingEvents, topology, state, err) } } diff --git a/controller/test_helper.go b/controller/test_helper.go index d40a49b..6c89a21 100644 --- a/controller/test_helper.go +++ b/controller/test_helper.go @@ -4,6 +4,7 @@ package controller import ( "context" + "sync" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -51,7 +52,7 @@ func init() { Func: func(_ machinery.Object) []machinery.Object { return []machinery.Object{&RuntimeObject{myObjects[0]}} }, } } - testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology, err error) { + testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology, _ *sync.Map, err error) { for _, event := range events { testLogger.Info("reconcile", "kind", event.Kind, diff --git a/controller/workflow.go b/controller/workflow.go index 1a6d467..bacc03f 100644 --- a/controller/workflow.go +++ b/controller/workflow.go @@ -15,10 +15,10 @@ type Workflow struct { Postcondition ReconcileFunc } -func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error) { +func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, state *sync.Map, err error) { // run precondition reconcile function if d.Precondition != nil { - d.Precondition(ctx, resourceEvents, topology, err) + d.Precondition(ctx, resourceEvents, topology, state, err) } // dispatch the event to concurrent tasks @@ -28,13 +28,13 @@ func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topo for _, f := range funcs { go func() { defer waitGroup.Done() - f(ctx, resourceEvents, topology, err) + f(ctx, resourceEvents, topology, state, err) }() } waitGroup.Wait() // run precondition reconcile function if d.Postcondition != nil { - d.Postcondition(ctx, resourceEvents, topology, err) + d.Postcondition(ctx, resourceEvents, topology, state, err) } } diff --git a/examples/kuadrant/main.go b/examples/kuadrant/main.go index 2103a20..c523095 100644 --- a/examples/kuadrant/main.go +++ b/examples/kuadrant/main.go @@ -5,6 +5,7 @@ import ( "log" "os" "strings" + "sync" egv1alpha1 "github.com/envoyproxy/gateway/api/v1alpha1" "github.com/google/go-cmp/cmp" @@ -201,7 +202,9 @@ func controllerOptionsFor(gatewayProviders []string) []controller.ControllerOpti // 3. (gateway deleted) delete SecurityPolicy / (other events) reconcile SecurityPolicies // 3. (gateway deleted) delete AuthorizationPolicy / (other events) reconcile AuthorizationPolicies func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) controller.ReconcileFunc { - effectivePolicyReconciler := &reconcilers.EffectivePoliciesReconciler{Client: client} + effectivePolicyReconciler := &controller.Workflow{ + Precondition: reconcilers.ReconcileEffectivePolicies, + } commonAuthPolicyResourceEventMatchers := []controller.ResourceEventMatcher{ {Kind: ptr.To(machinery.GatewayClassGroupKind)}, @@ -215,11 +218,11 @@ func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) c switch gatewayProvider { case reconcilers.EnvoyGatewayProviderName: envoyGatewayProvider := &reconcilers.EnvoyGatewayProvider{Client: client} - effectivePolicyReconciler.ReconcileFuncs = append(effectivePolicyReconciler.ReconcileFuncs, (&controller.Subscription{ + effectivePolicyReconciler.Tasks = append(effectivePolicyReconciler.Tasks, (&controller.Subscription{ ReconcileFunc: envoyGatewayProvider.ReconcileSecurityPolicies, Events: append(commonAuthPolicyResourceEventMatchers, controller.ResourceEventMatcher{Kind: ptr.To(reconcilers.EnvoyGatewaySecurityPolicyKind)}), }).Reconcile) - effectivePolicyReconciler.ReconcileFuncs = append(effectivePolicyReconciler.ReconcileFuncs, (&controller.Subscription{ + effectivePolicyReconciler.Tasks = append(effectivePolicyReconciler.Tasks, (&controller.Subscription{ ReconcileFunc: envoyGatewayProvider.DeleteSecurityPolicy, Events: []controller.ResourceEventMatcher{ {Kind: ptr.To(machinery.GatewayGroupKind), EventType: ptr.To(controller.DeleteEvent)}, @@ -227,11 +230,11 @@ func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) c }).Reconcile) case reconcilers.IstioGatewayProviderName: istioGatewayProvider := &reconcilers.IstioGatewayProvider{Client: client} - effectivePolicyReconciler.ReconcileFuncs = append(effectivePolicyReconciler.ReconcileFuncs, (&controller.Subscription{ + effectivePolicyReconciler.Tasks = append(effectivePolicyReconciler.Tasks, (&controller.Subscription{ ReconcileFunc: istioGatewayProvider.ReconcileAuthorizationPolicies, Events: append(commonAuthPolicyResourceEventMatchers, controller.ResourceEventMatcher{Kind: ptr.To(reconcilers.IstioAuthorizationPolicyKind)}), }).Reconcile) - effectivePolicyReconciler.ReconcileFuncs = append(effectivePolicyReconciler.ReconcileFuncs, (&controller.Subscription{ + effectivePolicyReconciler.Tasks = append(effectivePolicyReconciler.Tasks, (&controller.Subscription{ ReconcileFunc: istioGatewayProvider.DeleteAuthorizationPolicy, Events: []controller.ResourceEventMatcher{ {Kind: ptr.To(machinery.GatewayGroupKind), EventType: ptr.To(controller.DeleteEvent)}, @@ -241,7 +244,7 @@ func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) c } reconciler := &controller.Workflow{ - Precondition: func(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) { + Precondition: func(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, _ *sync.Map, err error) { logger := controller.LoggerFromContext(ctx).WithName("event logger") for _, event := range resourceEvents { // log the event @@ -263,7 +266,7 @@ func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) c }, Tasks: []controller.ReconcileFunc{ (&reconcilers.TopologyFileReconciler{}).Reconcile, - effectivePolicyReconciler.Reconcile, + effectivePolicyReconciler.Run, }, } diff --git a/examples/kuadrant/reconcilers/effective_policies_reconciler.go b/examples/kuadrant/reconcilers/effective_policies_reconciler.go index e22fab7..1490efb 100644 --- a/examples/kuadrant/reconcilers/effective_policies_reconciler.go +++ b/examples/kuadrant/reconcilers/effective_policies_reconciler.go @@ -8,7 +8,6 @@ import ( "sync" "github.com/samber/lo" - "k8s.io/client-go/dynamic" "github.com/kuadrant/policy-machinery/controller" "github.com/kuadrant/policy-machinery/machinery" @@ -20,15 +19,7 @@ import ( const authPathsKey = "authPaths" -// EffectivePoliciesReconciler works exactly like a controller.Workflow where the precondition reconcile function -// reconciles the effective policies for the given topology paths, occasionally modifying the context that is passed -// as argument to the subsequent concurrent reconcilers. -type EffectivePoliciesReconciler struct { - Client *dynamic.DynamicClient - ReconcileFuncs []controller.ReconcileFunc -} - -func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) { +func ReconcileEffectivePolicies(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, state *sync.Map, err error) { targetables := topology.Targetables() // reconcile policies @@ -47,6 +38,8 @@ func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEve return ok }) + var authPaths [][]machinery.Targetable + for _, gateway := range gateways { // reconcile Gateway -> Listener policies for _, listener := range listeners { @@ -66,7 +59,7 @@ func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEve paths := targetables.Paths(gateway, httpRouteRule) for i := range paths { if p := effectivePolicyForPath[*kuadrantv1beta3.AuthPolicy](ctx, paths[i]); p != nil { - ctx = pathIntoContext(ctx, authPathsKey, paths[i]) + authPaths = append(authPaths, paths[i]) // TODO: reconcile auth effective policy (i.e. create the Authorino AuthConfig) } if p := effectivePolicyForPath[*kuadrantv1beta3.RateLimitPolicy](ctx, paths[i]); p != nil { @@ -76,17 +69,7 @@ func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEve } } - // dispatch the event to subsequent reconcilers - funcs := r.ReconcileFuncs - waitGroup := &sync.WaitGroup{} - defer waitGroup.Wait() - waitGroup.Add(len(funcs)) - for _, f := range funcs { - go func() { - defer waitGroup.Done() - f(ctx, resourceEvents, topology, err) - }() - } + state.Store(authPathsKey, authPaths) } func effectivePolicyForPath[T machinery.Policy](ctx context.Context, path []machinery.Targetable) *T { @@ -121,18 +104,3 @@ func effectivePolicyForPath[T machinery.Policy](ctx context.Context, path []mach concreteEffectivePolicy, _ := effectivePolicy.(T) return &concreteEffectivePolicy } - -func pathIntoContext(ctx context.Context, key string, path []machinery.Targetable) context.Context { - if p := ctx.Value(key); p != nil { - return context.WithValue(ctx, key, append(p.([][]machinery.Targetable), path)) - } - return context.WithValue(ctx, key, [][]machinery.Targetable{path}) -} - -func pathsFromContext(ctx context.Context, key string) [][]machinery.Targetable { - var paths [][]machinery.Targetable - if p := ctx.Value(key); p != nil { - paths = p.([][]machinery.Targetable) - } - return paths -} diff --git a/examples/kuadrant/reconcilers/envoy_gateway.go b/examples/kuadrant/reconcilers/envoy_gateway.go index ef14174..949650a 100644 --- a/examples/kuadrant/reconcilers/envoy_gateway.go +++ b/examples/kuadrant/reconcilers/envoy_gateway.go @@ -3,6 +3,7 @@ package reconcilers import ( "context" "fmt" + "sync" egv1alpha1 "github.com/envoyproxy/gateway/api/v1alpha1" "github.com/samber/lo" @@ -28,11 +29,14 @@ type EnvoyGatewayProvider struct { Client *dynamic.DynamicClient } -func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) { +func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, state *sync.Map, err error) { logger := controller.LoggerFromContext(ctx).WithName("envoy gateway").WithName("securitypolicy") ctx = controller.LoggerIntoContext(ctx, logger) - authPaths := pathsFromContext(ctx, authPathsKey) + var authPaths [][]machinery.Targetable + if untypedAuthPaths, ok := state.Load(authPathsKey); ok { + authPaths = untypedAuthPaths.([][]machinery.Targetable) + } targetables := topology.Targetables() gateways := targetables.Items(func(o machinery.Object) bool { _, ok := o.(*machinery.Gateway) @@ -57,7 +61,7 @@ func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _ } } -func (p *EnvoyGatewayProvider) DeleteSecurityPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) { +func (p *EnvoyGatewayProvider) DeleteSecurityPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, _ *sync.Map, err error) { for _, resourceEvent := range resourceEvents { gateway := resourceEvent.OldObject p.deleteSecurityPolicy(ctx, topology, gateway.GetNamespace(), gateway.GetName(), nil) diff --git a/examples/kuadrant/reconcilers/istio.go b/examples/kuadrant/reconcilers/istio.go index b54d16f..6c26d87 100644 --- a/examples/kuadrant/reconcilers/istio.go +++ b/examples/kuadrant/reconcilers/istio.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "github.com/samber/lo" istioapiv1 "istio.io/api/security/v1" @@ -31,11 +32,14 @@ type IstioGatewayProvider struct { Client *dynamic.DynamicClient } -func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) { +func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, state *sync.Map, err error) { logger := controller.LoggerFromContext(ctx).WithName("istio").WithName("authorizationpolicy") ctx = controller.LoggerIntoContext(ctx, logger) - authPaths := pathsFromContext(ctx, authPathsKey) + var authPaths [][]machinery.Targetable + if untypedAuthPaths, ok := state.Load(authPathsKey); ok { + authPaths = untypedAuthPaths.([][]machinery.Targetable) + } targetables := topology.Targetables() gateways := targetables.Items(func(o machinery.Object) bool { _, ok := o.(*machinery.Gateway) @@ -60,7 +64,7 @@ func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Contex } } -func (p *IstioGatewayProvider) DeleteAuthorizationPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) { +func (p *IstioGatewayProvider) DeleteAuthorizationPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, _ *sync.Map, err error) { for _, resourceEvent := range resourceEvents { gateway := resourceEvent.OldObject p.deleteAuthorizationPolicy(ctx, topology, gateway.GetNamespace(), gateway.GetName(), nil) diff --git a/examples/kuadrant/reconcilers/topology_file_reconciler.go b/examples/kuadrant/reconcilers/topology_file_reconciler.go index 9ac44fc..426b487 100644 --- a/examples/kuadrant/reconcilers/topology_file_reconciler.go +++ b/examples/kuadrant/reconcilers/topology_file_reconciler.go @@ -3,6 +3,7 @@ package reconcilers import ( "context" "os" + "sync" "github.com/kuadrant/policy-machinery/controller" "github.com/kuadrant/policy-machinery/machinery" @@ -12,7 +13,7 @@ const topologyFile = "topology.dot" type TopologyFileReconciler struct{} -func (r *TopologyFileReconciler) Reconcile(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) { +func (r *TopologyFileReconciler) Reconcile(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, _ *sync.Map, err error) { logger := controller.LoggerFromContext(ctx).WithName("topology file") file, err := os.Create(topologyFile)