Skip to content

Commit

Permalink
Merge pull request #21 from Kuadrant/ensure_topologies_are_DAG
Browse files Browse the repository at this point in the history
Add validate function
  • Loading branch information
Boomatang authored Sep 9, 2024
2 parents 85837a3 + a4cbf60 commit ece88ae
Show file tree
Hide file tree
Showing 19 changed files with 787 additions and 54 deletions.
36 changes: 22 additions & 14 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (
)

type ControllerOptions struct {
name string
logger logr.Logger
client *dynamic.DynamicClient
manager ctrlruntime.Manager
runnables map[string]RunnableBuilder
reconcile ReconcileFunc
policyKinds []schema.GroupKind
objectKinds []schema.GroupKind
objectLinks []LinkFunc
name string
logger logr.Logger
client *dynamic.DynamicClient
manager ctrlruntime.Manager
runnables map[string]RunnableBuilder
reconcile ReconcileFunc
policyKinds []schema.GroupKind
objectKinds []schema.GroupKind
objectLinks []LinkFunc
allowTopologyLoops bool
}

type ControllerOption func(*ControllerOptions)
Expand Down Expand Up @@ -60,7 +61,7 @@ func WithRunnable(name string, builder RunnableBuilder) ControllerOption {
}
}

type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology)
type ReconcileFunc func(context.Context, []ResourceEvent, *machinery.Topology, error)

func WithReconcile(reconcile ReconcileFunc) ControllerOption {
return func(o *ControllerOptions) {
Expand Down Expand Up @@ -94,12 +95,18 @@ func ManagedBy(manager ctrlruntime.Manager) ControllerOption {
}
}

func AllowLoops() ControllerOption {
return func(o *ControllerOptions) {
o.allowTopologyLoops = true
}
}

func NewController(f ...ControllerOption) *Controller {
opts := &ControllerOptions{
name: "controller",
logger: logr.Discard(),
runnables: map[string]RunnableBuilder{},
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology) {
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error) {
},
}
for _, fn := range f {
Expand All @@ -112,7 +119,7 @@ func NewController(f ...ControllerOption) *Controller {
client: opts.client,
manager: opts.manager,
cache: &watchableCacheStore{},
topology: newGatewayAPITopologyBuilder(opts.policyKinds, opts.objectKinds, opts.objectLinks),
topology: newGatewayAPITopologyBuilder(opts.policyKinds, opts.objectKinds, opts.objectLinks, opts.allowTopologyLoops),
runnables: map[string]Runnable{},
reconcile: opts.reconcile,
}
Expand Down Expand Up @@ -239,8 +246,9 @@ func (c *Controller) delete(obj Object) {
}

func (c *Controller) propagate(resourceEvents []ResourceEvent) {
topology := c.topology.Build(c.cache.List())
c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology)
topology, err := c.topology.Build(c.cache.List())
c.logger.Error(err, "error building topology")
c.reconcile(LoggerIntoContext(context.TODO(), c.logger), resourceEvents, topology, err)
}

func (c *Controller) subscribe() {
Expand Down
7 changes: 6 additions & 1 deletion controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestControllerOptions(t *testing.T) {
name: "controller",
logger: logr.Discard(),
runnables: map[string]RunnableBuilder{},
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology) {
reconcile: func(context.Context, []ResourceEvent, *machinery.Topology, error) {
},
}

Expand Down Expand Up @@ -93,6 +93,11 @@ func TestControllerOptions(t *testing.T) {
if opts.reconcile == nil {
t.Errorf("expected reconcile func, got nil")
}

AllowLoops()(opts)
if opts.allowTopologyLoops == false {
t.Errorf("expected allowTopologyLoops true, got false")
}
}

func TestNewController(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions controller/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Subscription struct {
Events []ResourceEventMatcher
}

func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology) {
func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error) {
matchingEvents := lo.Filter(resourceEvents, func(resourceEvent ResourceEvent, _ int) bool {
return lo.ContainsBy(s.Events, func(m ResourceEventMatcher) bool {
obj := resourceEvent.OldObject
Expand All @@ -30,6 +30,6 @@ func (s Subscription) Reconcile(ctx context.Context, resourceEvents []ResourceEv
})
})
if len(matchingEvents) > 0 && s.ReconcileFunc != nil {
s.ReconcileFunc(ctx, matchingEvents, topology)
s.ReconcileFunc(ctx, matchingEvents, topology, err)
}
}
2 changes: 1 addition & 1 deletion controller/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func init() {
Func: func(_ machinery.Object) []machinery.Object { return []machinery.Object{&RuntimeObject{myObjects[0]}} },
}
}
testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology) {
testReconcileFunc = func(_ context.Context, events []ResourceEvent, topology *machinery.Topology, err error) {
for _, event := range events {
testLogger.Info("reconcile",
"kind", event.Kind,
Expand Down
22 changes: 14 additions & 8 deletions controller/topology_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,23 @@ import (
"github.com/kuadrant/policy-machinery/machinery"
)

func newGatewayAPITopologyBuilder(policyKinds, objectKinds []schema.GroupKind, objectLinks []LinkFunc) *gatewayAPITopologyBuilder {
func newGatewayAPITopologyBuilder(policyKinds, objectKinds []schema.GroupKind, objectLinks []LinkFunc, allowTopologyLoops bool) *gatewayAPITopologyBuilder {
return &gatewayAPITopologyBuilder{
policyKinds: policyKinds,
objectKinds: objectKinds,
objectLinks: objectLinks,
policyKinds: policyKinds,
objectKinds: objectKinds,
objectLinks: objectLinks,
allowTopologyLoops: allowTopologyLoops,
}
}

type gatewayAPITopologyBuilder struct {
policyKinds []schema.GroupKind
objectKinds []schema.GroupKind
objectLinks []LinkFunc
policyKinds []schema.GroupKind
objectKinds []schema.GroupKind
objectLinks []LinkFunc
allowTopologyLoops bool
}

func (t *gatewayAPITopologyBuilder) Build(objs Store) *machinery.Topology {
func (t *gatewayAPITopologyBuilder) Build(objs Store) (*machinery.Topology, error) {
gatewayClasses := lo.Map(objs.FilterByGroupKind(machinery.GatewayClassGroupKind), ObjectAs[*gwapiv1.GatewayClass])
gateways := lo.Map(objs.FilterByGroupKind(machinery.GatewayGroupKind), ObjectAs[*gwapiv1.Gateway])
httpRoutes := lo.Map(objs.FilterByGroupKind(machinery.HTTPRouteGroupKind), ObjectAs[*gwapiv1.HTTPRoute])
Expand All @@ -44,6 +46,10 @@ func (t *gatewayAPITopologyBuilder) Build(objs Store) *machinery.Topology {
machinery.WithGatewayAPITopologyLinks(linkFuncs...),
}

if t.allowTopologyLoops {
opts = append(opts, machinery.AllowTopologyLoops())
}

for i := range t.policyKinds {
policyKind := t.policyKinds[i]
policies := lo.Map(objs.FilterByGroupKind(policyKind), ObjectAs[machinery.Policy])
Expand Down
8 changes: 4 additions & 4 deletions controller/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type Workflow struct {
Postcondition ReconcileFunc
}

func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology) {
func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topology *machinery.Topology, err error) {
// run precondition reconcile function
if d.Precondition != nil {
d.Precondition(ctx, resourceEvents, topology)
d.Precondition(ctx, resourceEvents, topology, err)
}

// dispatch the event to concurrent tasks
Expand All @@ -28,13 +28,13 @@ func (d *Workflow) Run(ctx context.Context, resourceEvents []ResourceEvent, topo
for _, f := range funcs {
go func() {
defer waitGroup.Done()
f(ctx, resourceEvents, topology)
f(ctx, resourceEvents, topology, err)
}()
}
waitGroup.Wait()

// run precondition reconcile function
if d.Postcondition != nil {
d.Postcondition(ctx, resourceEvents, topology)
d.Postcondition(ctx, resourceEvents, topology, err)
}
}
6 changes: 5 additions & 1 deletion examples/color_policy/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,18 @@ func TestKuadrantMergeBasedOnTopology(t *testing.T) {
}),
}

topology := machinery.NewGatewayAPITopology(
topology, err := machinery.NewGatewayAPITopology(
machinery.WithGateways(gateway),
machinery.WithHTTPRoutes(httpRoutes...),
machinery.ExpandHTTPRouteRules(),
machinery.WithServices(services...),
machinery.WithGatewayAPITopologyPolicies(policies...),
)

if err != nil {
t.Fatal(err)
}

machinery.SaveToOutputDir(t, topology.ToDot(), "../../tests/out", ".dot")

targetables := topology.Targetables()
Expand Down
6 changes: 5 additions & 1 deletion examples/json_patch/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,18 @@ func TestJSONPatchMergeBasedOnTopology(t *testing.T) {
}),
}

topology := machinery.NewGatewayAPITopology(
topology, err := machinery.NewGatewayAPITopology(
machinery.WithGateways(gateway),
machinery.WithHTTPRoutes(httpRoutes...),
machinery.ExpandHTTPRouteRules(),
machinery.WithServices(services...),
machinery.WithGatewayAPITopologyPolicies(policies...),
)

if err != nil {
t.Fatal(err)
}

machinery.SaveToOutputDir(t, topology.ToDot(), "../../tests/out", ".dot")

targetables := topology.Targetables()
Expand Down
2 changes: 1 addition & 1 deletion examples/kuadrant/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func buildReconciler(gatewayProviders []string, client *dynamic.DynamicClient) c
}

reconciler := &controller.Workflow{
Precondition: func(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology) {
Precondition: func(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
logger := controller.LoggerFromContext(ctx).WithName("event logger")
for _, event := range resourceEvents {
// log the event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type EffectivePoliciesReconciler struct {
ReconcileFuncs []controller.ReconcileFunc
}

func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology) {
func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
targetables := topology.Targetables()

// reconcile policies
Expand Down Expand Up @@ -84,7 +84,7 @@ func (r *EffectivePoliciesReconciler) Reconcile(ctx context.Context, resourceEve
for _, f := range funcs {
go func() {
defer waitGroup.Done()
f(ctx, resourceEvents, topology)
f(ctx, resourceEvents, topology, err)
}()
}
}
Expand Down
4 changes: 2 additions & 2 deletions examples/kuadrant/reconcilers/envoy_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type EnvoyGatewayProvider struct {
Client *dynamic.DynamicClient
}

func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology) {
func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) {
logger := controller.LoggerFromContext(ctx).WithName("envoy gateway").WithName("securitypolicy")
ctx = controller.LoggerIntoContext(ctx, logger)

Expand Down Expand Up @@ -57,7 +57,7 @@ func (p *EnvoyGatewayProvider) ReconcileSecurityPolicies(ctx context.Context, _
}
}

func (p *EnvoyGatewayProvider) DeleteSecurityPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology) {
func (p *EnvoyGatewayProvider) DeleteSecurityPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
for _, resourceEvent := range resourceEvents {
gateway := resourceEvent.OldObject
p.deleteSecurityPolicy(ctx, topology, gateway.GetNamespace(), gateway.GetName(), nil)
Expand Down
4 changes: 2 additions & 2 deletions examples/kuadrant/reconcilers/istio.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type IstioGatewayProvider struct {
Client *dynamic.DynamicClient
}

func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology) {
func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) {
logger := controller.LoggerFromContext(ctx).WithName("istio").WithName("authorizationpolicy")
ctx = controller.LoggerIntoContext(ctx, logger)

Expand Down Expand Up @@ -60,7 +60,7 @@ func (p *IstioGatewayProvider) ReconcileAuthorizationPolicies(ctx context.Contex
}
}

func (p *IstioGatewayProvider) DeleteAuthorizationPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology) {
func (p *IstioGatewayProvider) DeleteAuthorizationPolicy(ctx context.Context, resourceEvents []controller.ResourceEvent, topology *machinery.Topology, err error) {
for _, resourceEvent := range resourceEvents {
gateway := resourceEvent.OldObject
p.deleteAuthorizationPolicy(ctx, topology, gateway.GetNamespace(), gateway.GetName(), nil)
Expand Down
2 changes: 1 addition & 1 deletion examples/kuadrant/reconcilers/topology_file_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const topologyFile = "topology.dot"

type TopologyFileReconciler struct{}

func (r *TopologyFileReconciler) Reconcile(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology) {
func (r *TopologyFileReconciler) Reconcile(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, err error) {
logger := controller.LoggerFromContext(ctx).WithName("topology file")

file, err := os.Create(topologyFile)
Expand Down
Loading

0 comments on commit ece88ae

Please sign in to comment.