Skip to content

Commit

Permalink
Index additional printer columns for Gatekeeper constraints (#299)
Browse files Browse the repository at this point in the history
* Use type assertions in GenericInformer

Since the dynamic client is always used, the object will always be of
type *unstructured.Unstructured, so no need to do anything but use type
assertions.

Signed-off-by: mprahl <[email protected]>

* Index additional printer columns for Gatekeeper constraints

Gatekeeper constraints are CRD based but the CRDs are generated based on
user defined ConstraintTemplate objects. Because of this, indexing the
constraint's `spec.enforcementAction` and `status.totalViolations` is a
bit difficult with the current setup. This makes it so that the CRD's
additionalPrinterColumns are automatically indexed.

The scope of this change is limited to the constraints.gatekeeper.sh API
group, but removing that check in the the `GenericResourceBuilder`
function will enable it for all resources that don't have a custom
transform.

Note that the names from the additionalPrinterColumns in the CRD are
converted to lowerCamelCase style when set as node properties. The
separators are spaces and dashes (e.g. "enforcement action" and
"enforcement-action" become "enforcementAction). Duplicate node
properties are ignored, with a priority on the default generic property
names.

As part of this change, it made sense to also make `syncInformers` event
driven so that it only gets run when there is a likely relevent change
in a CRD (i.e. added, deleted, or the spec changed). This makes the
indexing of new custom resources nearly instant while also sharing the
CRD informer for keeping an up to date mapping of GVRs to additional
printer columns.

Lastly, main.go was refactored a bit to wait for the two main goroutines
to end before exiting. This allows for some clean up when the interrupt
signal is sent to the process. More refactoring to let every goroutine
clean up should be considered.

Relates:
https://issues.redhat.com/browse/ACM-13278
https://issues.redhat.com/browse/ACM-12572

Signed-off-by: mprahl <[email protected]>

* Add _isExternal field to Gatekeeper constraints

Signed-off-by: mprahl <[email protected]>

---------

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl authored Sep 9, 2024
1 parent daff1f9 commit aaa8061
Show file tree
Hide file tree
Showing 13 changed files with 945 additions and 169 deletions.
1 change: 0 additions & 1 deletion config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"ClusterName": "local-cluster",
"ClusterNamespace": "local-cluster-ns",
"ReportRateMS": 5000,
"RediscoverRateMS": 60000,
"RuntimeMode": "development",
"DeployedInHub": true
}
49 changes: 47 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"

"github.com/stolostron/search-collector/pkg/config"
Expand All @@ -25,6 +29,26 @@ const (
LeaseDurationSeconds = 60
)

// getMainContext returns a context that is canceled on SIGINT or SIGTERM signals. If a second signal is received,
// it exits directly.
// This was inspired by controller-runtime.
func getMainContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())

c := make(chan os.Signal, 2)

signal.Notify(c, os.Interrupt, syscall.SIGTERM)

go func() {
<-c
cancel()
<-c
os.Exit(1) // Second signal. Exit directly.
}()

return ctx
}

func main() {
// init logs
flag.Parse()
Expand Down Expand Up @@ -73,8 +97,22 @@ func main() {

informersInitialized := make(chan interface{})

mainCtx := getMainContext()

wg := sync.WaitGroup{}
wg.Add(1)

// Start a routine to keep our informers up to date.
go informer.RunInformers(informersInitialized, upsertTransformer, reconciler)
go func() {
err := informer.RunInformers(mainCtx, informersInitialized, upsertTransformer, reconciler)
if err != nil {
glog.Errorf("Failed to run the informers: %v", err)

os.Exit(1)
}

wg.Done()
}()

// Wait here until informers have collected the full state of the cluster.
// The initial payload must have the complete state to avoid unecessary deletion
Expand All @@ -83,5 +121,12 @@ func main() {
<-informersInitialized

glog.Info("Starting the sender.")
sender.StartSendLoop()
wg.Add(1)

go func() {
sender.StartSendLoop(mainCtx)
wg.Done()
}()

wg.Wait()
}
70 changes: 39 additions & 31 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,38 @@ import (

// Out of box defaults
const (
COLLECTOR_API_VERSION = "2.11.0"
DEFAULT_AGGREGATOR_URL = "https://localhost:3010" // this will be deprecated in the future
DEFAULT_AGGREGATOR_HOST = "https://localhost"
DEFAULT_AGGREGATOR_PORT = "3010"
DEFAULT_COLLECT_ANNOTATIONS = false
DEFAULT_CLUSTER_NAME = "local-cluster"
DEFAULT_POD_NAMESPACE = "open-cluster-management"
DEFAULT_HEARTBEAT_MS = 300000 // 5 min
DEFAULT_MAX_BACKOFF_MS = 600000 // 10 min
DEFAULT_REDISCOVER_RATE_MS = 120000 // 2 min
DEFAULT_REPORT_RATE_MS = 5000 // 5 seconds
DEFAULT_RETRY_JITTER_MS = 5000 // 5 seconds
DEFAULT_RUNTIME_MODE = "production"
COLLECTOR_API_VERSION = "2.11.0"
DEFAULT_AGGREGATOR_URL = "https://localhost:3010" // this will be deprecated in the future
DEFAULT_AGGREGATOR_HOST = "https://localhost"
DEFAULT_AGGREGATOR_PORT = "3010"
DEFAULT_CLUSTER_NAME = "local-cluster"
DEFAULT_POD_NAMESPACE = "open-cluster-management"
DEFAULT_HEARTBEAT_MS = 300000 // 5 min
DEFAULT_MAX_BACKOFF_MS = 600000 // 10 min
DEFAULT_REDISCOVER_RATE_MS = 120000 // 2 min
DEFAULT_REPORT_RATE_MS = 5000 // 5 seconds
DEFAULT_RETRY_JITTER_MS = 5000 // 5 seconds
DEFAULT_RUNTIME_MODE = "production"
)

// Configuration options for the search-collector.
type Config struct {
AggregatorConfig *rest.Config // Config object for hub. Used to get TLS credentials.
AggregatorConfigFile string `env:"HUB_CONFIG"` // Config file for hub. Will be mounted in a secret.
AggregatorURL string `env:"AGGREGATOR_URL"` // URL of the Aggregator, includes port but not any path
AggregatorHost string `env:"AGGREGATOR_HOST"` // Host of the Aggregator
AggregatorPort string `env:"AGGREGATOR_PORT"` // Port of the Aggregator
CollectAnnotations bool `env:"COLLECT_ANNOTATIONS"` // Collect all annotations with values <=64 characters
ClusterName string `env:"CLUSTER_NAME"` // The name of of the cluster where this pod is running
PodNamespace string `env:"POD_NAMESPACE"` // The namespace of this pod
DeployedInHub bool `env:"DEPLOYED_IN_HUB"` // Tracks if deployed in the Hub or Managed cluster
HeartbeatMS int `env:"HEARTBEAT_MS"` // Interval(ms) to send empty payload to ensure connection
KubeConfig string `env:"KUBECONFIG"` // Local kubeconfig path
MaxBackoffMS int `env:"MAX_BACKOFF_MS"` // Maximum backoff in ms to wait after error
RediscoverRateMS int `env:"REDISCOVER_RATE_MS"` // Interval(ms) to poll for changes to CRDs
RetryJitterMS int `env:"RETRY_JITTER_MS"` // Random jitter added to backoff wait.
ReportRateMS int `env:"REPORT_RATE_MS"` // Interval(ms) to send changes to the aggregator
RuntimeMode string `env:"RUNTIME_MODE"` // Running mode (development or production)
AggregatorConfig *rest.Config // Config object for hub. Used to get TLS credentials.
AggregatorConfigFile string `env:"HUB_CONFIG"` // Config file for hub. Will be mounted in a secret.
AggregatorURL string `env:"AGGREGATOR_URL"` // URL of the Aggregator, includes port but not any path
AggregatorHost string `env:"AGGREGATOR_HOST"` // Host of the Aggregator
AggregatorPort string `env:"AGGREGATOR_PORT"` // Port of the Aggregator
CollectAnnotations bool `env:"COLLECT_ANNOTATIONS"` // Collect all annotations with values <=64 characters
CollectCRDPrinterColumns bool `env:"COLLECT_CRD_PRINTER_COLUMNS"` // Enable collecting additional printer columns in the CRD
ClusterName string `env:"CLUSTER_NAME"` // The name of of the cluster where this pod is running
PodNamespace string `env:"POD_NAMESPACE"` // The namespace of this pod
DeployedInHub bool `env:"DEPLOYED_IN_HUB"` // Tracks if deployed in the Hub or Managed cluster
HeartbeatMS int `env:"HEARTBEAT_MS"` // Interval(ms) to send empty payload to ensure connection
KubeConfig string `env:"KUBECONFIG"` // Local kubeconfig path
MaxBackoffMS int `env:"MAX_BACKOFF_MS"` // Maximum backoff in ms to wait after error
RetryJitterMS int `env:"RETRY_JITTER_MS"` // Random jitter added to backoff wait.
ReportRateMS int `env:"REPORT_RATE_MS"` // Interval(ms) to send changes to the aggregator
RuntimeMode string `env:"RUNTIME_MODE"` // Running mode (development or production)
}

var Cfg = Config{}
Expand Down Expand Up @@ -93,7 +92,7 @@ func InitConfig() {
aggHost, aggHostPresent := os.LookupEnv("AGGREGATOR_HOST")
aggPort, aggPortPresent := os.LookupEnv("AGGREGATOR_PORT")

//If environment variables are set for aggregator host and port, use those to set the AggregatorURL
// If environment variables are set for aggregator host and port, use those to set the AggregatorURL
if aggHostPresent && aggPortPresent && aggHost != "" && aggPort != "" {
Cfg.AggregatorURL = net.JoinHostPort(aggHost, aggPort)
setDefault(&Cfg.AggregatorURL, "", net.JoinHostPort(DEFAULT_AGGREGATOR_HOST, DEFAULT_AGGREGATOR_PORT))
Expand All @@ -103,7 +102,6 @@ func InitConfig() {

setDefaultInt(&Cfg.HeartbeatMS, "HEARTBEAT_MS", DEFAULT_HEARTBEAT_MS)
setDefaultInt(&Cfg.MaxBackoffMS, "MAX_BACKOFF_MS", DEFAULT_MAX_BACKOFF_MS)
setDefaultInt(&Cfg.RediscoverRateMS, "REDISCOVER_RATE_MS", DEFAULT_REDISCOVER_RATE_MS)
setDefaultInt(&Cfg.ReportRateMS, "REPORT_RATE_MS", DEFAULT_REPORT_RATE_MS)
setDefaultInt(&Cfg.RetryJitterMS, "RETRY_JITTER_MS", DEFAULT_RETRY_JITTER_MS)

Expand Down Expand Up @@ -138,6 +136,16 @@ func InitConfig() {
}
setDefault(&Cfg.AggregatorConfigFile, "HUB_CONFIG", "")

if collectCRDPrinterCols := os.Getenv("COLLECT_CRD_PRINTER_COLUMNS"); collectCRDPrinterCols != "" {
glog.Infof("Using COLLECT_CRD_PRINTER_COLUMNS from environment: %s", collectCRDPrinterCols)

var err error
Cfg.CollectCRDPrinterColumns, err = strconv.ParseBool(collectCRDPrinterCols)
if err != nil {
glog.Errorf("Error parsing env COLLECT_CRD_PRINTER_COLUMNS, defaulting to false: %v", err)
}
}

if Cfg.DeployedInHub && Cfg.AggregatorConfigFile != "" {
glog.Fatal("Config mismatch: DEPLOYED_IN_HUB is true, but HUB_CONFIG is set to connect to another hub")
} else if !Cfg.DeployedInHub && Cfg.AggregatorConfigFile == "" {
Expand Down
40 changes: 19 additions & 21 deletions pkg/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -45,10 +44,10 @@ func InformerForResource(res schema.GroupVersionResource) (GenericInformer, erro
}

// Run runs the informer.
func (inform *GenericInformer) Run(stopper chan struct{}) {
func (inform *GenericInformer) Run(ctx context.Context) {
for {
select {
case <-stopper:
case <-ctx.Done():
glog.Info("Informer stopped. ", inform.gvr.String())
for key := range inform.resourceIndex {
glog.V(5).Infof("Stopping informer %s and removing resource with UID: %s", inform.gvr.Resource, key)
Expand All @@ -72,7 +71,7 @@ func (inform *GenericInformer) Run(stopper chan struct{}) {
err := inform.listAndResync()
if err == nil {
inform.initialized = true
inform.watch(stopper)
inform.watch(ctx.Done())
}
}
}
Expand Down Expand Up @@ -149,8 +148,7 @@ func (inform *GenericInformer) listAndResync() error {
}

// Watch resources and process events.
func (inform *GenericInformer) watch(stopper chan struct{}) {

func (inform *GenericInformer) watch(stopper <-chan struct{}) {
watch, watchError := inform.client.Resource(inform.gvr).Watch(context.TODO(), metav1.ListOptions{})
if watchError != nil {
glog.Warningf("Error watching resources for %s. Error: %s", inform.gvr.String(), watchError)
Expand All @@ -175,35 +173,35 @@ func (inform *GenericInformer) watch(stopper chan struct{}) {
switch event.Type {
case "ADDED":
glog.V(5).Infof("Received ADDED event. Kind: %s ", inform.gvr.Resource)
o, error := runtime.UnstructuredConverter.ToUnstructured(runtime.DefaultUnstructuredConverter, &event.Object)
if error != nil {
glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on ADDED event. %s",
inform.gvr.Resource, error)
obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on ADDED event.",
inform.gvr.Resource)
continue
}
obj := &unstructured.Unstructured{Object: o}
inform.AddFunc(obj)
inform.resourceIndex[string(obj.GetUID())] = obj.GetResourceVersion()

case "MODIFIED":
glog.V(5).Infof("Received MODIFY event. Kind: %s ", inform.gvr.Resource)
o, error := runtime.UnstructuredConverter.ToUnstructured(runtime.DefaultUnstructuredConverter, &event.Object)
if error != nil {
glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on MODIFIED event. %s",
inform.gvr.Resource, error)
obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on MODIFIED event",
inform.gvr.Resource)
continue
}
obj := &unstructured.Unstructured{Object: o}

inform.UpdateFunc(nil, obj)
inform.resourceIndex[string(obj.GetUID())] = obj.GetResourceVersion()

case "DELETED":
glog.V(5).Infof("Received DELETED event. Kind: %s ", inform.gvr.Resource)
o, error := runtime.UnstructuredConverter.ToUnstructured(runtime.DefaultUnstructuredConverter, &event.Object)
if error != nil {
glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on DELETED event. %s",
inform.gvr.Resource, error)
obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
glog.Warningf("Error converting %s event.Object to unstructured.Unstructured on DELETED event",
inform.gvr.Resource)
continue
}
obj := &unstructured.Unstructured{Object: o}

inform.DeleteFunc(obj)
delete(inform.resourceIndex, string(obj.GetUID()))
Expand Down
23 changes: 13 additions & 10 deletions pkg/informer/informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
var gvr = schema.GroupVersionResource{Group: "open-cluster-management.io", Version: "v1", Resource: "thekinds"}

func fakeDynamicClient() *fake.FakeDynamicClient {
var gvrMapToKind = map[schema.GroupVersionResource]string{}
gvrMapToKind := map[schema.GroupVersionResource]string{}
gvrMapToKind[gvr] = "thekindsList"
scheme := runtime.NewScheme()
scheme.AddKnownTypes(gvr.GroupVersion())
Expand Down Expand Up @@ -149,14 +149,14 @@ func Test_StoppedInformer_ValidateDeleteFunc(t *testing.T) {
}

// start informer
stopper := make(chan struct{})
go informer.Run(stopper)
ctx, cancel := context.WithCancel(context.Background())
go informer.Run(ctx)
time.Sleep(10 * time.Millisecond)

//exist informer to trigger DeleteFunc
close(stopper)
// exit informer to trigger DeleteFunc
cancel()

//allow test to process
// allow test to process
time.Sleep(10 * time.Millisecond)

// Verify that the informer.DeleteFunc was called with uid=id-999 and uid=id-100
Expand All @@ -173,14 +173,14 @@ func Test_Run(t *testing.T) {
informer, addFuncCount, deleteFuncCount, updateFuncCount := initInformer()

// Start informer routine
stopper := make(chan struct{})
go informer.Run(stopper)
ctx, cancel := context.WithCancel(context.Background())
go informer.Run(ctx)
time.Sleep(10 * time.Millisecond)

generateSimpleEvent(informer, t)
time.Sleep(10 * time.Millisecond)

close(stopper)
cancel()

// Verify that informer.AddFunc is called for each of the mocked resources (6 times).
if *addFuncCount != 6 {
Expand All @@ -207,7 +207,10 @@ func Test_Run_retryBackoff(t *testing.T) {
informer.AddFunc = func(interface{}) { retryTime = time.Now() }

// Execute function
go informer.Run(make(chan struct{}))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go informer.Run(ctx)
time.Sleep(2010 * time.Millisecond)

// Verify backoff logic waits 2 seconds before retrying.
Expand Down
Loading

0 comments on commit aaa8061

Please sign in to comment.