Skip to content

Commit

Permalink
Merge pull request #31 from Kuadrant/reconcile-err
Browse files Browse the repository at this point in the history
controller: reconcile func returns an error
  • Loading branch information
guicassolato authored Sep 24, 2024
2 parents 31fa8bc + cb09dfa commit 585f109
Show file tree
Hide file tree
Showing 16 changed files with 293 additions and 30 deletions.
10 changes: 7 additions & 3 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func WithRunnable(name string, builder RunnableBuilder) ControllerOption {
// It receives a list of recent events, an immutable copy of the topology as known by the caller after the events,
// an optional error detected before the reconciliation, and a thread-safe map to store transient state across
// chained calls to multiple ReconcileFuncs.
type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map)
// If a ReconcileFunc returns an error, a chained sequence of ReconcileFuncs must be interrupted.
type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) error

func WithReconcile(reconcile ReconcileFunc) ControllerOption {
return func(o *ControllerOptions) {
Expand Down Expand Up @@ -110,7 +111,8 @@ func NewController(f ...ControllerOption) *Controller {
name: "controller",
logger: logr.Discard(),
runnables: map[string]RunnableBuilder{},
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) {
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) error {
return nil
},
}
for _, fn := range f {
Expand Down Expand Up @@ -254,7 +256,9 @@ func (c *Controller) propagate(resourceEvents []ResourceEvent) {
if err != nil {
c.logger.Error(err, "error building topology")
}
c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology, err, &sync.Map{})
if err := c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology, err, &sync.Map{}); err != nil {
c.logger.Error(err, "reconciliation error")
}
}

func (c *Controller) subscribe() {
Expand Down
3 changes: 2 additions & 1 deletion controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func TestControllerOptions(t *testing.T) {
name: "controller",
logger: logr.Discard(),
runnables: map[string]RunnableBuilder{},
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) {
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) error {
return nil
},
}

Expand Down
5 changes: 3 additions & 2 deletions controller/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Subscription struct {
Events []ResourceEventMatcher
}

func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) error {
matchingEvents := lo.Filter(resourceEvents, func(resourceEvent ResourceEvent, _ int) bool {
return lo.ContainsBy(s.Events, func(m ResourceEventMatcher) bool {
obj := resourceEvent.OldObject
Expand All @@ -31,6 +31,7 @@ func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEv
})
})
if len(matchingEvents) > 0 && s.ReconcileFunc != nil {
s.ReconcileFunc(ctx, matchingEvents, topology, err, state)
return s.ReconcileFunc(ctx, matchingEvents, topology, err, state)
}
return nil
}
3 changes: 2 additions & 1 deletion controller/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func init() {
Func: func(_ machinery.Object) []machinery.Object { return []machinery.Object{&RuntimeObject{myObjects[0]}} },
}
}
testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) {
testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology, err error, _ *sync.Map) error {
for _, event := range events {
testLogger.Info("reconcile",
"kind", event.Kind,
Expand All @@ -62,6 +62,7 @@ func init() {
"objects", len(topology.Objects().Items()),
)
}
return nil
}
testScheme = runtime.NewScheme()
corev1.AddToScheme(testScheme)
Expand Down
56 changes: 42 additions & 14 deletions controller/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,67 @@ package controller

import (
"context"
"errors"
"sync"

"golang.org/x/sync/errgroup"

"github.com/kuadrant/policy-machinery/machinery"
)

// Workflow runs an optional precondition reconciliation function, then dispatches the reconciliation event to
// a list of concurrent reconciliation tasks, and runs an optional postcondition reconciliation function.
//
// If any of the reconciliation functions returns an error, the error is handled by an optional error handler.
// The error passed to the error handler func is conflated with any occasional error carried over into the call
// to the workflow in the first place. It is up to the error handler to decide how to handle the error and whether
// to supress it or raise it again. Supressed errors cause the workflow to continue running, while raised errors
// interrupt the workflow. If the error handler is nil, the error is raised.
type Workflow struct {
Precondition ReconcileFunc
Tasks []ReconcileFunc
Postcondition ReconcileFunc
ErrorHandler ReconcileFunc
}

func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) {
func (w *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error, state *sync.Map) error {
// run precondition reconcile function
if d.Precondition != nil {
d.Precondition(ctx, resourceEvents, topology, err, state)
if w.Precondition != nil {
if preconditionErr := w.Precondition(ctx, resourceEvents, topology, err, state); preconditionErr != nil {
if err := w.handle(ctx, resourceEvents, topology, err, state, preconditionErr); err != nil {
return err
}
}
}

// dispatch the event to concurrent tasks
funcs := d.Tasks
waitGroup := &sync.WaitGroup{}
waitGroup.Add(len(funcs))
for _, f := range funcs {
go func() {
defer waitGroup.Done()
f(ctx, resourceEvents, topology, err, state)
}()
g, groupCtx := errgroup.WithContext(ctx)
for _, f := range w.Tasks {
g.Go(func() error {
return f(groupCtx, resourceEvents, topology, err, state)
})
}
if taskErr := g.Wait(); taskErr != nil {
if err := w.handle(ctx, resourceEvents, topology, err, state, taskErr); err != nil {
return err
}
}
waitGroup.Wait()

// run precondition reconcile function
if d.Postcondition != nil {
d.Postcondition(ctx, resourceEvents, topology, err, state)
if w.Postcondition != nil {
if postconditionErr := w.Postcondition(ctx, resourceEvents, topology, err, state); postconditionErr != nil {
if err := w.handle(ctx, resourceEvents, topology, err, state, postconditionErr); err != nil {
return err
}
}
}

return nil
}

func (w *Workflow) handle(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, carryoverErr error, state *sync.Map, workflowErr error) error {
if w.ErrorHandler != nil {
return w.ErrorHandler(ctx, resourceEvents, topology, errors.Join(carryoverErr, workflowErr), state)
}
return workflowErr
}
210 changes: 210 additions & 0 deletions controller/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package controller

import (
"context"
"fmt"
"strings"
"sync"
"testing"

"github.com/samber/lo"

"github.com/kuadrant/policy-machinery/machinery"
)

func TestWorkflow(t *testing.T) {
reconcileFuncFor := func(flag *bool, err error) ReconcileFunc {
return func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) error {
*flag = true
return err
}
}

var preconditionCalled, task1Called, task2Called, postconditionCalled, errorHandled bool

precondition := reconcileFuncFor(&preconditionCalled, nil)
preconditionWithError := reconcileFuncFor(&preconditionCalled, fmt.Errorf("precondition error"))
task1 := reconcileFuncFor(&task1Called, nil)
task1WithError := reconcileFuncFor(&task1Called, fmt.Errorf("task1 error"))
task2 := reconcileFuncFor(&task2Called, nil)
task2WithError := reconcileFuncFor(&task2Called, fmt.Errorf("task2 error"))
postcondition := reconcileFuncFor(&postconditionCalled, nil)
postconditionWithError := reconcileFuncFor(&postconditionCalled, fmt.Errorf("postcondition error"))

handleErrorAndSupress := func(context.Context, []ResourceEvent, *machinery.Topology, error, *sync.Map) error {
errorHandled = true
return nil
}

handleErrorAndRaise := func(_ context.Context, _ []ResourceEvent, _ *machinery.Topology, err error, _ *sync.Map) error {
errorHandled = true
return err
}

testCases := []struct {
name string
workflow *Workflow
expectedPreconditionCalled bool
expectedTask1Called bool
expectedTask2Called bool
expectedPostconditionCalled bool
possibleErrs []error
expectedErrorHandled bool
}{
{
name: "empty workflow",
workflow: &Workflow{},
},
{
name: "precondition",
workflow: &Workflow{
Precondition: precondition,
},
expectedPreconditionCalled: true,
},
{
name: "precondition and tasks",
workflow: &Workflow{
Precondition: precondition,
Tasks: []ReconcileFunc{task1, task2},
},
expectedPreconditionCalled: true,
expectedTask1Called: true,
expectedTask2Called: true,
},
{
name: "precondition with error",
workflow: &Workflow{
Precondition: preconditionWithError,
Tasks: []ReconcileFunc{task1, task2},
},
expectedPreconditionCalled: true,
expectedTask1Called: false,
expectedTask2Called: false,
possibleErrs: []error{fmt.Errorf("precondition error")},
},
{
name: "task1 with error",
workflow: &Workflow{
Tasks: []ReconcileFunc{task1WithError, task2},
Postcondition: postcondition,
},
expectedTask1Called: true,
expectedTask2Called: true,
expectedPostconditionCalled: false,
possibleErrs: []error{fmt.Errorf("task1 error")},
},
{
name: "task2 with error",
workflow: &Workflow{
Tasks: []ReconcileFunc{task1, task2WithError},
Postcondition: postcondition,
},
expectedTask1Called: true,
expectedTask2Called: true,
expectedPostconditionCalled: false,
possibleErrs: []error{fmt.Errorf("task2 error")},
},
{
name: "task1 and task2 with error",
workflow: &Workflow{
Tasks: []ReconcileFunc{task1WithError, task2WithError},
Postcondition: postcondition,
},
expectedTask1Called: true,
expectedTask2Called: true,
expectedPostconditionCalled: false,
possibleErrs: []error{
fmt.Errorf("task1 error"),
fmt.Errorf("task2 error"),
},
},
{
name: "postcondition",
workflow: &Workflow{
Precondition: precondition,
Tasks: []ReconcileFunc{task1, task2},
Postcondition: postcondition,
},
expectedPreconditionCalled: true,
expectedTask1Called: true,
expectedTask2Called: true,
expectedPostconditionCalled: true,
},
{
name: "postconditions with error",
workflow: &Workflow{
Precondition: precondition,
Tasks: []ReconcileFunc{task1, task2},
Postcondition: postconditionWithError,
},
expectedPreconditionCalled: true,
expectedTask1Called: true,
expectedTask2Called: true,
expectedPostconditionCalled: true,
possibleErrs: []error{fmt.Errorf("postcondition error")},
},
{
name: "handle error and suppress",
workflow: &Workflow{
Precondition: preconditionWithError,
Postcondition: postconditionWithError,
ErrorHandler: handleErrorAndSupress,
},
expectedPreconditionCalled: true,
expectedPostconditionCalled: true,
expectedErrorHandled: true,
},
{
name: "handle error and raise",
workflow: &Workflow{
Precondition: preconditionWithError,
Postcondition: postconditionWithError,
ErrorHandler: handleErrorAndRaise,
},
expectedPreconditionCalled: true,
expectedErrorHandled: true,
possibleErrs: []error{fmt.Errorf("precondition error")},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// reset
preconditionCalled = false
task1Called = false
task2Called = false
postconditionCalled = false
errorHandled = false

err := tc.workflow.Run(context.Background(), nil, nil, nil, nil)
possibleErrs := lo.Map(tc.possibleErrs, func(err error, _ int) string { return err.Error() })

if tc.expectedPreconditionCalled != preconditionCalled {
t.Errorf("expected precondition to be called: %t, got %t", tc.expectedPreconditionCalled, preconditionCalled)
}
if tc.expectedTask1Called != task1Called {
t.Errorf("expected task1 to be called: %t, got %t", tc.expectedTask1Called, task1Called)
}
if tc.expectedTask2Called != task2Called {
t.Errorf("expected task2 to be called: %t, got %t", tc.expectedTask2Called, task2Called)
}
if tc.expectedPostconditionCalled != postconditionCalled {
t.Errorf("expected postcondition to be called: %t, got %t", tc.expectedPostconditionCalled, postconditionCalled)
}
if len(possibleErrs) > 0 && err == nil {
t.Errorf("expected one of the following errors (%v), got nil", strings.Join(possibleErrs, " / "))
}
if len(possibleErrs) == 0 && err != nil {
t.Errorf("expected no error, got %v", err)
}
if len(possibleErrs) > 0 && err != nil && !lo.ContainsBy(possibleErrs, func(possibleErr string) bool { return possibleErr == err.Error() }) {
t.Errorf("expected error of the following errors (%v), got %v", strings.Join(possibleErrs, " / "), err)
}
if tc.expectedErrorHandled != errorHandled {
t.Errorf("expected error handler called: %t, got %t", tc.expectedErrorHandled, errorHandled)
}
})
}

}
1 change: 1 addition & 0 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
Loading

0 comments on commit 585f109

Please sign in to comment.