Skip to content

Commit

Permalink
Lift the DAG error into the reconcile function.
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Fitzpatrick <[email protected]>
  • Loading branch information
Boomatang committed Sep 9, 2024
1 parent 19be67e commit 5399e1e
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 27 deletions.
15 changes: 4 additions & 11 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func WithRunnable(name string, builder RunnableBuilder) ControllerOption {
}
}

type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology)
type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology, error)

func WithReconcile(reconcile ReconcileFunc) ControllerOption {
return func(o *ControllerOptions) {
Expand Down Expand Up @@ -106,24 +106,20 @@ func NewController(f ...ControllerOption) *Controller {
name: "controller",
logger: logr.Discard(),
runnables: map[string]RunnableBuilder{},
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology) {
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error) {
},
}
for _, fn := range f {
fn(opts)
}

allowTopologyLoops := false
if opts.allowTopologyLoops {
allowTopologyLoops = true
}
controller := &Controller{
name: opts.name,
logger: opts.logger,
client: opts.client,
manager: opts.manager,
cache: &watchableCacheStore{},
topology: newGatewayAPITopologyBuilder(opts.policyKinds, opts.objectKinds, opts.objectLinks, allowTopologyLoops),
topology: newGatewayAPITopologyBuilder(opts.policyKinds, opts.objectKinds, opts.objectLinks, opts.allowTopologyLoops),
runnables: map[string]Runnable{},
reconcile: opts.reconcile,
}
Expand Down Expand Up @@ -251,10 +247,7 @@ func (c *Controller) delete(obj Object) {

func (c *Controller) propagate(resourceEvents []ResourceEvent) {
topology, err := c.topology.Build(c.cache.List())
if !c.topology.allowTopologyLoops && err != nil {
panic(err)
}
c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology)
c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology, err)
}

func (c *Controller) subscribe() {
Expand Down
2 changes: 1 addition & 1 deletion controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestControllerOptions(t *testing.T) {
name: "controller",
logger: logr.Discard(),
runnables: map[string]RunnableBuilder{},
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology) {
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error) {
},
}

Expand Down
4 changes: 2 additions & 2 deletions controller/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Subscription struct {
Events []ResourceEventMatcher
}

func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology) {
func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error) {
matchingEvents := lo.Filter(resourceEvents, func(resourceEvent ResourceEvent, _ int) bool {
return lo.ContainsBy(s.Events, func(m ResourceEventMatcher) bool {
obj := resourceEvent.OldObject
Expand All @@ -30,6 +30,6 @@ func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEv
})
})
if len(matchingEvents) > 0 && s.ReconcileFunc != nil {
s.ReconcileFunc(ctx, matchingEvents, topology)
s.ReconcileFunc(ctx, matchingEvents, topology, nil)
}
}
2 changes: 1 addition & 1 deletion controller/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func init() {
Func: func(_ machinery.Object) []machinery.Object { return []machinery.Object{&RuntimeObject{myObjects[0]}} },
}
}
testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology) {
testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology, err error) {
for _, event := range events {
testLogger.Info("reconcile",
"kind", event.Kind,
Expand Down
8 changes: 4 additions & 4 deletions controller/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type Workflow struct {
Postcondition ReconcileFunc
}

func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology) {
func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error) {
// run precondition reconcile function
if d.Precondition != nil {
d.Precondition(ctx, resourceEvents, topology)
d.Precondition(ctx, resourceEvents, topology, err)
}

// dispatch the event to concurrent tasks
Expand All @@ -28,13 +28,13 @@ func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topo
for _, f := range funcs {
go func() {
defer waitGroup.Done()
f(ctx, resourceEvents, topology)
f(ctx, resourceEvents, topology, err)
}()
}
waitGroup.Wait()

// run precondition reconcile function
if d.Postcondition != nil {
d.Postcondition(ctx, resourceEvents, topology)
d.Postcondition(ctx, resourceEvents, topology, err)
}
}
2 changes: 1 addition & 1 deletion examples/kuadrant/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) c
}

reconciler := &controller.Workflow{
Precondition: func(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology) {
Precondition: func(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
logger := controller.LoggerFromContext(ctx).WithName("event logger")
for _, event := range resourceEvents {
// log the event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type EffectivePoliciesReconciler struct {
ReconcileFuncs []controller.ReconcileFunc
}

func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology) {
func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
targetables := topology.Targetables()

// reconcile policies
Expand Down Expand Up @@ -84,7 +84,7 @@ func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEve
for _, f := range funcs {
go func() {
defer waitGroup.Done()
f(ctx, resourceEvents, topology)
f(ctx, resourceEvents, topology, err)
}()
}
}
Expand Down
4 changes: 2 additions & 2 deletions examples/kuadrant/reconcilers/envoy_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type EnvoyGatewayProvider struct {
Client *dynamic.DynamicClient
}

func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology) {
func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) {
logger := controller.LoggerFromContext(ctx).WithName("envoy gateway").WithName("securitypolicy")
ctx = controller.LoggerIntoContext(ctx, logger)

Expand Down Expand Up @@ -57,7 +57,7 @@ func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _
}
}

func (p *EnvoyGatewayProvider) DeleteSecurityPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology) {
func (p *EnvoyGatewayProvider) DeleteSecurityPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
for _, resourceEvent := range resourceEvents {
gateway := resourceEvent.OldObject
p.deleteSecurityPolicy(ctx, topology, gateway.GetNamespace(), gateway.GetName(), nil)
Expand Down
4 changes: 2 additions & 2 deletions examples/kuadrant/reconcilers/istio.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type IstioGatewayProvider struct {
Client *dynamic.DynamicClient
}

func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology) {
func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) {
logger := controller.LoggerFromContext(ctx).WithName("istio").WithName("authorizationpolicy")
ctx = controller.LoggerIntoContext(ctx, logger)

Expand Down Expand Up @@ -60,7 +60,7 @@ func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Contex
}
}

func (p *IstioGatewayProvider) DeleteAuthorizationPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology) {
func (p *IstioGatewayProvider) DeleteAuthorizationPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
for _, resourceEvent := range resourceEvents {
gateway := resourceEvent.OldObject
p.deleteAuthorizationPolicy(ctx, topology, gateway.GetNamespace(), gateway.GetName(), nil)
Expand Down
2 changes: 1 addition & 1 deletion examples/kuadrant/reconcilers/topology_file_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const topologyFile = "topology.dot"

type TopologyFileReconciler struct{}

func (r *TopologyFileReconciler) Reconcile(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology) {
func (r *TopologyFileReconciler) Reconcile(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) {
logger := controller.LoggerFromContext(ctx).WithName("topology file")

file, err := os.Create(topologyFile)
Expand Down

0 comments on commit 5399e1e

Please sign in to comment.